As of version 2.1 Spring Integration introduces support for MongoDB: a "high-performance, open source, document-oriented database". This support comes in the form of a MongoDB-based MessageStore.
To download, install, and run MongoDB please refer to the MongoDB documentation.
To begin interacting with MongoDB you first need to connect to it. Spring Integration builds on the support provided by another
Spring project, Spring Data MongoDB, which provides a factory
class called MongoDbFactory
that simplifies integration with the MongoDB Client API.
MongoDbFactory
To connect to MongoDB you can use an implementation of the MongoDbFactory
interface:
public interface MongoDbFactory { /** * Creates a default {@link DB} instance. * * @return the DB instance * @throws DataAccessException */ DB getDb() throws DataAccessException; /** * Creates a {@link DB} instance to access the database with the given name. * * @param dbName must not be {@literal null} or empty. * * @return the DB instance * @throws DataAccessException */ DB getDb(String dbName) throws DataAccessException; }
The example below shows SimpleMongoDbFactory
, the out-of-the-box implementation:
In Java:
MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(new Mongo(), "test");
Or in Spring's XML configuration:
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoDbFactory"> <constructor-arg> <bean class="com.mongodb.Mongo"/> </constructor-arg> <constructor-arg value="test"/> </bean>
As you can see SimpleMongoDbFactory
takes two arguments: 1) a Mongo
instance and
2) a String specifying the name of the database. If you need to configure properties such as host
, port
, etc,
you can pass those using one of the constructors provided by the underlying Mongo
class.
For more information on how to configure MongoDB, please refer to the
Spring-Data-Document reference.
As described in EIP, a Message Store allows you to persist Messages. This can be very useful when dealing with components that have a capability to buffer messages (QueueChannel, Aggregator, Resequencer, etc.) if reliability is a concern. In Spring Integration, the MessageStore strategy also provides the foundation for the ClaimCheck pattern, which is described in EIP as well.
Spring Integration's MongoDB module provides the MongoDbMessageStore
which is an implementation of both
the MessageStore
strategy (mainly used by the ClaimCheckpattern)
and the MessageGroupStore
strategy (mainly used by the Aggregator and
Resequencer patterns).
<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore"> <constructor-arg ref="mongoDbFactory"/> </bean> <int:channel id="somePersistentQueueChannel"> <int:queue message-store="mongoDbMessageStore"/> <int:channel> <int:aggregator input-channel="inputChannel" output-channel="outputChannel" message-store="mongoDbMessageStore"/>
Above is a sample MongoDbMessageStore
configuration that shows its usage by a QueueChannel
and an Aggregator. As you can see it is a simple bean configuration, and it expects a
MongoDbFactory
as a constructor argument.
The MongoDbMessageStore
expands the Message
as a Mongo document
with all nested properties using the Spring Data Mongo Mapping mechanism. It is useful when you need to have access to
the payload
or headers
for auditing or analytics, for example, against stored messages.
Important | |
---|---|
The |
Spring Integration 3.0 introduced the ConfigurableMongoDbMessageStore
-
MessageStore
and MessageGroupStore
implementation.
This class can receive, as a constructor argument, a MongoTemplate
, with which you can
configure with a custom WriteConcern
, for example. Another constructor requires a
MappingMongoConverter
, and a MongoDbFactory
,
which allows you to provide some custom conversions for Message
s and their properties.
Note, by default, the ConfigurableMongoDbMessageStore
uses standard Java serialization
to write/read Message
s to/from MongoDB and relies on default values for other
properties from MongoTemplate
, which is built from the provided
MongoDbFactory
and MappingMongoConverter
.
The default name for the collection stored by the ConfigurableMongoDbMessageStore
is
configurableStoreMessages
. It is recommended to use this implementation for robust and flexible solutions
when messages contain complex data types.
Starting with version 4.0, the new MongoDbChannelMessageStore
has been introduced; it is an optimized MessageGroupStore
for use
in QueueChannel
s.
With priorityEnabled = true
, it can be used in
<int:priority-queue>
s to achieve priority order polling for
persisted messages. The priority MonogDB document field is populated from the
IntegrationMessageHeaderAccessor.PRIORITY
(priority
)
message header.
In addition, all MongoDB MessageStore
s now have a sequence
field for MessageGroup
documents. The sequence
value is the result of an $inc
operation for a simple
sequence
document from the same collection, which is created on demand. The sequence
field is used in
poll
operations to provide first-in-first-out (FIFO) message order
(within priority if configured) when messages are stored within the same millisecond.
Note | |
---|---|
It is not recommended to use the same |
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore"> <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/> </bean> <int:channel id="queueChannel"> <int:queue message-store="store"/> </int:channel> <bean id="priorityStore" parent="channelStore"> <property name="priorityEnabled" value="true"/> </bean> <int:channel id="priorityChannel"> <int:priority-queue message-store="priorityStore"/> </int:channel>
The MongoDb Inbound Channel Adapter is a polling consumer that reads data from MongoDb and sends it as a Message payload.
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter" channel="replyChannel" query="{'name' : 'Bob'}" entity-class="java.lang.Object" auto-startup="false"> <int:poller fixed-rate="100"/> </int-mongodb:inbound-channel-adapter>
As you can see from the configuration above, you configure a MongoDb Inbound Channel Adapter using
the inbound-channel-adapter
element, providing values for various attributes such as:
query
or query-expression
- a
JSON query (see MongoDb Querying)
entity-class
- the type of the payload object; if not supplied, a
com.mongodb.DBObject
will be returned.
collection-name
or collection-name-expression
-
Identifies the name of the MongoDb collection to use.
mongodb-factory
-
reference to an instance of o.s.data.mongodb.MongoDbFactory
mongo-template
-
reference to an instance of o.s.data.mongodb.core.MongoTemplate
and other attributes that are common across all other inbound adapters (e.g., 'channel').
Note | |
---|---|
You cannot set both mongo-template and mongodb-factory .
|
The example above is relatively simple and static since it has a literal value for the query
and uses
the default name for a collection
. Sometimes you may need to change those values at runtime, based on some condition.
To do that, simply use their -expression
equivalents (query-expression
and
collection-name-expression
) where the provided expression can be any valid SpEL expression.
Also, you may wish to do some post-processing to the successfully processed data that was read from the MongoDb. For example; you may want to move or remove a document after its been processed. You can do this using Transaction Synchronization feature that was added with Spring Integration 2.2.
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter" channel="replyChannel" query="{'name' : 'Bob'}" entity-class="java.lang.Object" auto-startup="false"> <int:poller fixed-rate="200" max-messages-per-poll="1"> <int:transactional synchronization-factory="syncFactory"/> </int:poller> </int-mongodb:inbound-channel-adapter> <int:transaction-synchronization-factory id="syncFactory"> <int:after-commit expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)" channe="someChannel"/> </int:transaction-synchronization-factory> <bean id="documentCleaner" class="foo.bar.DocumentCleaner"/> <bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
public class DocumentCleaner { public void remove(MongoOperations mongoOperations, Object target, String collectionName) { if (target instanceof List<?>){ List<?> documents = (List<?>) target; for (Object document : documents) { mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName); } } } }
As you can see from the above, all you need to do is declare your poller to be transactional with a transactional
element.
This element can reference a real transaction manager (for example if some other part of your flow invokes JDBC).
If you don't have a 'real' transaction, you can use a
org.springframework.integration.transaction.PseudoTransactionManager
which is an implementation
of Spring's PlatformTransactionManager
and enables the use of the transaction synchronization
features of the mongo adapter when there is no actual transaction.
Important | |
---|---|
This does NOT make MongoDB itself transactional, it simply allows the synchronization of actions to be taken before/after success (commit) or after failure (rollback). |
Once your poller is transactional all you need to do is set an instance of the
org.springframework.integration.transaction.TransactionSynchronizationFactory
on the transactional
element.
TransactionSynchronizationFactory
will create an instance of the TransactioinSynchronization
.
For your convenience, we've exposed a default SpEL-based TransactionSynchronizationFactory
which allows
you to configure SpEL expressions, with their execution being coordinated (synchronized) with a transaction.
Expressions for before-commit, after-commit, and after-rollback are supported, together with a channel for each where the
evaluation result (if any) will be sent. For each sub-element you can specify expression
and/or channel
attributes. If only the channel
attribute is present the received Message will be sent there as part of the particular
synchronization scenario. If only the expression
attribute is present and the result of an expression is a non-Null
value, a Message with the result as the payload will be generated and sent to a default channel (NullChannel) and will appear in the
logs (DEBUG). If you want the evaluation result to go to a specific channel add a channel
attribute. If the result of an
expression is null or void, no Message will be generated.
For more information about transaction synchronization, see Section C.3, “Transaction Synchronization”.
The MongoDb Outbound Channel Adapter allows you to write the Message payload to a MongoDb document store
<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression" collection-name="myCollection" mongo-converter="mongoConverter" mongodb-factory="mongoDbFactory" />
As you can see from the configuration above, you configure a MongoDb Outbound Channel Adapter using
the outbound-channel-adapter
element, providing values for various attributes such as:
collection-name
or collection-name-expression
-
Identifies the name of the MongoDb collection to use.
mongo-converter
-
reference to an instance of o.s.data.mongodb.core.convert.MongoConverter
to assist with
converting a raw java object to a JSON document representation
mongodb-factory
-
reference to an instance of o.s.data.mongodb.MongoDbFactory
mongo-template
-
reference to an instance of o.s.data.mongodb.core.MongoTemplate
(NOTE: you can not have both mongo-template and mongodb-factory set)
and other attributes that are common across all other inbound adapters (e.g., 'channel').
The example above is relatively simple and static since it has a literal value for the collection-name
.
Sometimes you may need to change this value at runtime based on some condition.
To do that, simply use collection-name-expression
where the provided expression can be any valid SpEL expression.