As of version 2.1 Spring Integration introduces support for MongoDB: a "high-performance, open source, document-oriented database". This support comes in the form of a MongoDB-based MessageStore.
To download, install, and run MongoDB please refer to the MongoDB documentation.
To begin interacting with MongoDB you first need to connect to it. Spring Integration builds on the support provided by another
Spring project, Spring Data MongoDB, which provides a factory
class called MongoDbFactory that simplifies integration with the MongoDB Client API.
MongoDbFactory
To connect to MongoDB you can use an implementation of the MongoDbFactory interface:
public interface MongoDbFactory { /** * Creates a default {@link DB} instance. * * @return the DB instance * @throws DataAccessException */ DB getDb() throws DataAccessException; /** * Creates a {@link DB} instance to access the database with the given name. * * @param dbName must not be {@literal null} or empty. * * @return the DB instance * @throws DataAccessException */ DB getDb(String dbName) throws DataAccessException; }
The example below shows SimpleMongoDbFactory, the out-of-the-box implementation:
In Java:
MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(new Mongo(), "test");
Or in Spring's XML configuration:
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoDbFactory"> <constructor-arg> <bean class="com.mongodb.Mongo"/> </constructor-arg> <constructor-arg value="test"/> </bean>
As you can see SimpleMongoDbFactory takes two arguments: 1) a Mongo instance and
2) a String specifying the name of the database. If you need to configure properties such as host, port, etc,
you can pass those using one of the constructors provided by the underlying Mongo class.
For more information on how to configure MongoDB, please refer to the
Spring-Data-Document reference.
As described in EIP, a Message Store allows you to persist Messages. This can be very useful when dealing with components that have a capability to buffer messages (QueueChannel, Aggregator, Resequencer, etc.) if reliability is a concern. In Spring Integration, the MessageStore strategy also provides the foundation for the ClaimCheck pattern, which is described in EIP as well.
Spring Integration's MongoDB module provides the MongoDbMessageStore which is an implementation of both
the MessageStore strategy (mainly used by the ClaimCheckpattern)
and the MessageGroupStore strategy (mainly used by the Aggregator and
Resequencer patterns).
<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore"> <constructor-arg ref="mongoDbFactory"/> </bean> <int:channel id="somePersistentQueueChannel"> <int:queue message-store="mongoDbMessageStore"/> <int:channel> <int:aggregator input-channel="inputChannel" output-channel="outputChannel" message-store="mongoDbMessageStore"/>
Above is a sample MongoDbMessageStore configuration that shows its usage by a QueueChannel
and an Aggregator. As you can see it is a simple bean configuration, and it expects a
MongoDbFactory as a constructor argument.
The MongoDbMessageStore expands the Message as a Mongo document
with all nested properties using the Spring Data Mongo Mapping mechanism. It is useful when you need to have access to
the payload or headers for auditing or analytics, for example, against stored messages.
![]() | Important |
|---|---|
|
The |
Spring Integration 3.0 introduced the ConfigurableMongoDbMessageStore -
MessageStore and MessageGroupStore implementation.
This class can receive, as a constructor argument, a MongoTemplate, with which you can
configure with a custom WriteConcern, for example. Another constructor requires a
MappingMongoConverter, and a MongoDbFactory,
which allows you to provide some custom conversions for Messages and their properties.
Note, by default, the ConfigurableMongoDbMessageStore uses standard Java serialization
to write/read Messages to/from MongoDB and relies on default values for other
properties from MongoTemplate, which is built from the provided
MongoDbFactory and MappingMongoConverter.
The default name for the collection stored by the ConfigurableMongoDbMessageStore is
configurableStoreMessages. It is recommended to use this implementation for robust and flexible solutions
when messages contain complex data types.
Starting with version 4.0, the new MongoDbChannelMessageStore
has been introduced; it is an optimized MessageGroupStore for use
in QueueChannels.
With priorityEnabled = true, it can be used in
<int:priority-queue>s to achieve priority order polling for
persisted messages. The priority MonogDB document field is populated from the
IntegrationMessageHeaderAccessor.PRIORITY (priority)
message header.
In addition, all MongoDB MessageStores now have a sequence
field for MessageGroup
documents. The sequence value is the result of an $inc operation for a simple
sequence
document from the same collection, which is created on demand. The sequence field is used in
poll operations to provide first-in-first-out (FIFO) message order
(within priority if configured) when messages are stored within the same millisecond.
![]() | Note |
|---|---|
|
It is not recommended to use the same |
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore"> <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/> </bean> <int:channel id="queueChannel"> <int:queue message-store="store"/> </int:channel> <bean id="priorityStore" parent="channelStore"> <property name="priorityEnabled" value="true"/> </bean> <int:channel id="priorityChannel"> <int:priority-queue message-store="priorityStore"/> </int:channel>
The MongoDb Inbound Channel Adapter is a polling consumer that reads data from MongoDb and sends it as a Message payload.
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter" channel="replyChannel" query="{'name' : 'Bob'}" entity-class="java.lang.Object" auto-startup="false"> <int:poller fixed-rate="100"/> </int-mongodb:inbound-channel-adapter>
As you can see from the configuration above, you configure a MongoDb Inbound Channel Adapter using
the inbound-channel-adapter element, providing values for various attributes such as:
query or query-expression - a
JSON query (see MongoDb Querying)
entity-class - the type of the payload object; if not supplied, a
com.mongodb.DBObject will be returned.
collection-name or collection-name-expression -
Identifies the name of the MongoDb collection to use.
mongodb-factory -
reference to an instance of o.s.data.mongodb.MongoDbFactory
mongo-template -
reference to an instance of o.s.data.mongodb.core.MongoTemplate
and other attributes that are common across all other inbound adapters (e.g., 'channel').
![]() | Note |
|---|---|
You cannot set both mongo-template and mongodb-factory.
|
The example above is relatively simple and static since it has a literal value for the query and uses
the default name for a collection. Sometimes you may need to change those values at runtime, based on some condition.
To do that, simply use their -expression equivalents (query-expression and
collection-name-expression) where the provided expression can be any valid SpEL expression.
Also, you may wish to do some post-processing to the successfully processed data that was read from the MongoDb. For example; you may want to move or remove a document after its been processed. You can do this using Transaction Synchronization feature that was added with Spring Integration 2.2.
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter" channel="replyChannel" query="{'name' : 'Bob'}" entity-class="java.lang.Object" auto-startup="false"> <int:poller fixed-rate="200" max-messages-per-poll="1"> <int:transactional synchronization-factory="syncFactory"/> </int:poller> </int-mongodb:inbound-channel-adapter> <int:transaction-synchronization-factory id="syncFactory"> <int:after-commit expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)" channe="someChannel"/> </int:transaction-synchronization-factory> <bean id="documentCleaner" class="foo.bar.DocumentCleaner"/> <bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
public class DocumentCleaner { public void remove(MongoOperations mongoOperations, Object target, String collectionName) { if (target instanceof List<?>){ List<?> documents = (List<?>) target; for (Object document : documents) { mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName); } } } }
As you can see from the above, all you need to do is declare your poller to be transactional with a transactional element.
This element can reference a real transaction manager (for example if some other part of your flow invokes JDBC).
If you don't have a 'real' transaction, you can use a
org.springframework.integration.transaction.PseudoTransactionManager which is an implementation
of Spring's PlatformTransactionManager and enables the use of the transaction synchronization
features of the mongo adapter when there is no actual transaction.
![]() | Important |
|---|---|
| This does NOT make MongoDB itself transactional, it simply allows the synchronization of actions to be taken before/after success (commit) or after failure (rollback). |
Once your poller is transactional all you need to do is set an instance of the
org.springframework.integration.transaction.TransactionSynchronizationFactory on the transactional element.
TransactionSynchronizationFactory will create an instance of the TransactioinSynchronization.
For your convenience, we've exposed a default SpEL-based TransactionSynchronizationFactory which allows
you to configure SpEL expressions, with their execution being coordinated (synchronized) with a transaction.
Expressions for before-commit, after-commit, and after-rollback are supported, together with a channel for each where the
evaluation result (if any) will be sent. For each sub-element you can specify expression and/or channel
attributes. If only the channel attribute is present the received Message will be sent there as part of the particular
synchronization scenario. If only the expression attribute is present and the result of an expression is a non-Null
value, a Message with the result as the payload will be generated and sent to a default channel (NullChannel) and will appear in the
logs (DEBUG). If you want the evaluation result to go to a specific channel add a channel attribute. If the result of an
expression is null or void, no Message will be generated.
For more information about transaction synchronization, see Section C.3, “Transaction Synchronization”.
The MongoDb Outbound Channel Adapter allows you to write the Message payload to a MongoDb document store
<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression" collection-name="myCollection" mongo-converter="mongoConverter" mongodb-factory="mongoDbFactory" />
As you can see from the configuration above, you configure a MongoDb Outbound Channel Adapter using
the outbound-channel-adapter element, providing values for various attributes such as:
collection-name or collection-name-expression -
Identifies the name of the MongoDb collection to use.
mongo-converter -
reference to an instance of o.s.data.mongodb.core.convert.MongoConverter to assist with
converting a raw java object to a JSON document representation
mongodb-factory -
reference to an instance of o.s.data.mongodb.MongoDbFactory
mongo-template -
reference to an instance of o.s.data.mongodb.core.MongoTemplate
(NOTE: you can not have both mongo-template and mongodb-factory set)
and other attributes that are common across all other inbound adapters (e.g., 'channel').
The example above is relatively simple and static since it has a literal value for the collection-name.
Sometimes you may need to change this value at runtime based on some condition.
To do that, simply use collection-name-expression
where the provided expression can be any valid SpEL expression.