As of version 2.1 Spring Integration introduces support for MongoDB: a "high-performance, open source, document-oriented database". This support comes in the form of a MongoDB-based MessageStore.
To download, install, and run MongoDB please refer to the MongoDB documentation.
To begin interacting with MongoDB you first need to connect to it.
Spring Integration builds on the support provided by another Spring project, Spring Data MongoDB, which provides a factory class called MongoDbFactory
that simplifies integration with the MongoDB Client API.
MongoDbFactory
To connect to MongoDB you can use an implementation of the MongoDbFactory
interface:
public interface MongoDbFactory { /** * Creates a default {@link DB} instance. * * @return the DB instance * @throws DataAccessException */ DB getDb() throws DataAccessException; /** * Creates a {@link DB} instance to access the database with the given name. * * @param dbName must not be {@literal null} or empty. * * @return the DB instance * @throws DataAccessException */ DB getDb(String dbName) throws DataAccessException; }
The example below shows SimpleMongoDbFactory
, the out-of-the-box implementation:
In Java:
MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(new Mongo(), "test");
Or in Spring’s XML configuration:
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoDbFactory"> <constructor-arg> <bean class="com.mongodb.Mongo"/> </constructor-arg> <constructor-arg value="test"/> </bean>
As you can see SimpleMongoDbFactory
takes two arguments: 1) a Mongo
instance and 2) a String specifying the name of the database.
If you need to configure properties such as host
, port
, etc, you can pass those using one of the constructors provided by the underlying Mongo
class.
For more information on how to configure MongoDB, please refer to the Spring-Data-MongoDB reference.
As described in EIP, a Message Store allows you to persist Messages.
This can be very useful when dealing with components that have a capability to buffer messages (QueueChannel, Aggregator, Resequencer, etc.) if reliability is a concern.
In Spring Integration, the MessageStore
strategy also provides the foundation for the ClaimCheck pattern, which is described in EIP as well.
Spring Integration’s MongoDB module provides the MongoDbMessageStore
which is an implementation of both the MessageStore
strategy (mainly used by the ClaimCheck pattern) and the MessageGroupStore
strategy (mainly used by the Aggregator and Resequencer patterns).
<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore"> <constructor-arg ref="mongoDbFactory"/> </bean> <int:channel id="somePersistentQueueChannel"> <int:queue message-store="mongoDbMessageStore"/> <int:channel> <int:aggregator input-channel="inputChannel" output-channel="outputChannel" message-store="mongoDbMessageStore"/>
Above is a sample MongoDbMessageStore
configuration that shows its usage by a QueueChannel and an Aggregator.
As you can see it is a simple bean configuration, and it expects a MongoDbFactory
as a constructor argument.
The MongoDbMessageStore
expands the Message
as a Mongo document with all nested properties using the Spring Data Mongo Mapping mechanism.
It is useful when you need to have access to the payload
or headers
for auditing or analytics, for example, against stored messages.
Important | |
---|---|
The |
Spring Integration 3.0 introduced the ConfigurableMongoDbMessageStore
- MessageStore
and MessageGroupStore
implementation.
This class can receive, as a constructor argument, a MongoTemplate
, with which you can configure with a custom WriteConcern
, for example.
Another constructor requires a MappingMongoConverter
, and a MongoDbFactory
, which allows you to provide some custom conversions for Message
s and their properties.
Note, by default, the ConfigurableMongoDbMessageStore
uses standard Java serialization to write/read Message
s to/from MongoDB (see MongoDbMessageBytesConverter
) and relies on default values for other properties from MongoTemplate
, which is built from the provided MongoDbFactory
and MappingMongoConverter
.
The default name for the collection stored by the ConfigurableMongoDbMessageStore
is configurableStoreMessages
.
It is recommended to use this implementation for robust and flexible solutions when messages contain complex data types.
Starting with version 4.0, the new MongoDbChannelMessageStore
has been introduced; it is an optimized MessageGroupStore
for use in QueueChannel
s.
With priorityEnabled = true
, it can be used in <int:priority-queue>
s to achieve priority order polling for persisted messages.
The priority MongoDB document field is populated from the IntegrationMessageHeaderAccessor.PRIORITY
(priority
) message header.
In addition, all MongoDB MessageStore
s now have a sequence
field for MessageGroup documents.
The sequence
value is the result of an $inc
operation for a simple sequence
document from the same collection, which is created on demand.
The sequence
field is used in poll
operations to provide first-in-first-out (FIFO) message order (within priority if configured) when messages are stored within the same millisecond.
Note | |
---|---|
It is not recommended to use the same |
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore"> <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/> </bean> <int:channel id="queueChannel"> <int:queue message-store="store"/> </int:channel> <bean id="priorityStore" parent="channelStore"> <property name="priorityEnabled" value="true"/> </bean> <int:channel id="priorityChannel"> <int:priority-queue message-store="priorityStore"/> </int:channel>
As of Spring Integration 4.2, a new MongoDB-based MetadataStore
(Section 10.5, “Metadata Store”) implementation is available.
The MongoDbMetadataStore
can be used to maintain metadata state across application restarts.
This new MetadataStore
implementation can be used with adapters such as:
In order to instruct these adapters to use the new MongoDbMetadataStore
, simply declare a Spring bean using the
bean name metadataStore. The Twitter Inbound Channel Adapter and the Feed Inbound Channel Adapter will both
automatically pick up and use the declared MongoDbMetadataStore
:
@Bean public MetadataStore metadataStore(MongoDbFactory factory) { return new MongoDbMetadataStore(factory, "integrationMetadataStore"); }
The MongoDbMetadataStore
also implements ConcurrentMetadataStore
, allowing it to be reliably shared across multiple
application instances where only one instance will be allowed to store or modify a key’s value.
All these operations are atomic via MongoDB guarantees.
The MongoDb Inbound Channel Adapter is a polling consumer that reads data from MongoDb and sends it as a Message payload.
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter" channel="replyChannel" query="{'name' : 'Bob'}" entity-class="java.lang.Object" auto-startup="false"> <int:poller fixed-rate="100"/> </int-mongodb:inbound-channel-adapter>
As you can see from the configuration above, you configure a MongoDb Inbound Channel Adapter using the inbound-channel-adapter
element, providing values for various attributes such as:
query
- a JSON query (see MongoDb Querying)
query-expression
- A SpEL expression that is evaluated to a JSON query String (as the query
attribute above), or to an instance of o.s.data.mongodb.core.query.Query
. Mutually exclusive with query
attribute.
entity-class
- the type of the payload object; if not supplied, a com.mongodb.DBObject
will be returned.
collection-name
or collection-name-expression
- Identifies the name of the MongoDb collection to use.
mongodb-factory
- reference to an instance of o.s.data.mongodb.MongoDbFactory
mongo-template
- reference to an instance of o.s.data.mongodb.core.MongoTemplate
and other attributes that are common across all other inbound adapters (e.g., 'channel').
Note | |
---|---|
You cannot set both |
The example above is relatively simple and static since it has a literal value for the query
and uses the default name for a collection
.
Sometimes you may need to change those values at runtime, based on some condition.
To do that, simply use their -expression
equivalents (query-expression
and collection-name-expression
) where the provided expression can be any valid SpEL expression.
Also, you may wish to do some post-processing to the successfully processed data that was read from the MongoDb. For example; you may want to move or remove a document after its been processed. You can do this using Transaction Synchronization feature that was added with Spring Integration 2.2.
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter" channel="replyChannel" query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)" entity-class="java.lang.Object" auto-startup="false"> <int:poller fixed-rate="200" max-messages-per-poll="1"> <int:transactional synchronization-factory="syncFactory"/> </int:poller> </int-mongodb:inbound-channel-adapter> <int:transaction-synchronization-factory id="syncFactory"> <int:after-commit expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)" channe="someChannel"/> </int:transaction-synchronization-factory> <bean id="documentCleaner" class="foo.bar.DocumentCleaner"/> <bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
public class DocumentCleaner { public void remove(MongoOperations mongoOperations, Object target, String collectionName) { if (target instanceof List<?>){ List<?> documents = (List<?>) target; for (Object document : documents) { mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName); } } } }
As you can see from the above, all you need to do is declare your poller to be transactional with a transactional
element.
This element can reference a real transaction manager (for example if some other part of your flow invokes JDBC).
If you don’t have a real transaction, you can use a org.springframework.integration.transaction.PseudoTransactionManager
which is an implementation of Spring’s PlatformTransactionManager
and enables the use of the transaction synchronization features of the mongo adapter when there is no actual transaction.
Important | |
---|---|
This does NOT make MongoDB itself transactional, it simply allows the synchronization of actions to be taken before/after success (commit) or after failure (rollback). |
Once your poller is transactional all you need to do is set an instance of the o.s.i.transaction.TransactionSynchronizationFactory
on the transactional
element.
TransactionSynchronizationFactory
will create an instance of the TransactionSynchronization
.
For your convenience, we’ve exposed a default SpEL-based TransactionSynchronizationFactory
which allows you to configure SpEL expressions, with their execution being coordinated (synchronized) with a transaction.
Expressions for before-commit, after-commit, and after-rollback are supported, together with a channel for each where the evaluation result (if any) will be sent.
For each sub-element you can specify expression
and/or channel
attributes.
If only the channel
attribute is present the received Message will be sent there as part of the particular synchronization scenario.
If only the expression
attribute is present and the result of an expression is a non-Null value, a Message with the result as the payload will be generated and sent to a default channel (NullChannel) and will appear in the logs (DEBUG).
If you want the evaluation result to go to a specific channel add a channel
attribute.
If the result of an expression is null or void, no Message will be generated.
For more information about transaction synchronization, see Section C.3, “Transaction Synchronization”.
The MongoDb Outbound Channel Adapter allows you to write the Message payload to a MongoDb document store
<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression" collection-name="myCollection" mongo-converter="mongoConverter" mongodb-factory="mongoDbFactory" />
As you can see from the configuration above, you configure a MongoDb Outbound Channel Adapter using the outbound-channel-adapter
element, providing values for various attributes such as:
collection-name
or collection-name-expression
- Identifies the name of the MongoDb collection to use.
mongo-converter
- reference to an instance of o.s.data.mongodb.core.convert.MongoConverter
to assist with converting a raw java object to a JSON document representation
mongodb-factory
- reference to an instance of o.s.data.mongodb.MongoDbFactory
mongo-template
- reference to an instance of o.s.data.mongodb.core.MongoTemplate
(NOTE: you can not have both mongo-template and mongodb-factory set)
and other attributes that are common across all other inbound adapters (e.g., channel).
The example above is relatively simple and static since it has a literal value for the collection-name
.
Sometimes you may need to change this value at runtime based on some condition.
To do that, simply use collection-name-expression
where the provided expression can be any valid SpEL expression.
Starting with version 5.0, the MongoDb Outbound Gateway is provided and it allows you to query a database by sending a Message to its request channel. The gateway will then send the response to the reply channel. The Message payload and headers can be used to specify the query, as well as collection name.
<int-mongodb:outbound-gateway id="gatewayQuery" mongodb-factory="mongoDbFactory" mongo-converter="mongoConverter" query="{firstName: 'Bob'}" collection-name="foo" request-channel="in" reply-channel="out" entity-class="org.springframework.integration.mongodb.test.entity$Person"/>
collection-name
or collection-name-expression
- identifies the name of the MongoDb collection to use;
mongo-converter
- reference to an instance of o.s.data.mongodb.core.convert.MongoConverter
to assist with converting a raw java object to a JSON document representation
mongodb-factory
- reference to an instance of o.s.data.mongodb.MongoDbFactory
mongo-template
- reference to an instance of o.s.data.mongodb.core.MongoTemplate
(NOTE: you can not have both mongo-template and mongodb-factory set)
entity-class
- the fully qualified name of the entity class to be passed to find(..)
or findOne(..)
method in MongoTemplate.
If this attribute is not provided the default value is org.bson.Document
;
query
or query-expression
- specifies the MongoDb query.
Please refer to MongoDB documentation for more query samples.
collection-callback
- reference to an instance of org.springframework.data.mongodb.core.CollectionCallback
(NOTE: you can not have both collection-callback and any of the query attributes).
The following Spring Boot application provides an example of configuring the outbound gateway using Java configuration:
@SpringBootApplication public class MongoDbJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MongoDbJavaApplication.class) .web(false) .run(args); } @Autowired private MongoDbFactory mongoDbFactory; @Bean @ServiceActivator(inputChannel = "requestChannel") public MessageHandler mongoDbOutboundGateway() { MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory); gateway.setCollectionNameExpressionString("'foo'"); gateway.setQueryExpressionString("'{''name'' : ''Bob''}'"); gateway.setExpectSingleResult(true); gateway.setEntityClass(Person.class); gateway.setOutputChannelName("replyChannel"); return gateway; } @Bean @ServiceActivator(inputChannel = "replyChannel") public MessageHandler handler() { return message -> System.out.println(message.getPayload()); } }
The following Spring Boot application provides an example of configuring the Outbound Gateway using the Java DSL:
@SpringBootApplication public class MongoDbJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MongoDbJavaApplication.class) .web(false) .run(args); } @Autowired private MongoDbFactory; @Autowired private MongoConverter; @Bean public IntegrationFlow gatewaySingleQueryFlow() { return f -> f .handle(queryOutboundGateway()) .channel(c -> c.queue("retrieveResults")); } private MongoDbOutboundGatewaySpec queryOutboundGateway() { return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter) .query("{name : 'Bob'}") .collectionNameFunction(m -> m.getHeaders().get("collection")) .expectSingleResult(true) .entityClass(Person.class); } }
Alternatively to the query
and query-expression
properties, you can specify other database operations through
the collectionCallback
property.
The following example specifies a count operation:
private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() { return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter) .collectionCallback(MongoCollection::count) .collectionName("foo"); }