21. MongoDb Support

As of version 2.1 Spring Integration introduces support for MongoDB: a "high-performance, open source, document-oriented database". This support comes in the form of a MongoDB-based MessageStore.

21.1 Introduction

To download, install, and run MongoDB please refer to the MongoDB documentation.

21.2 Connecting to MongoDb

To begin interacting with MongoDB you first need to connect to it. Spring Integration builds on the support provided by another Spring project, Spring Data MongoDB, which provides a factory class called MongoDbFactory that simplifies integration with the MongoDB Client API.

MongoDbFactory

To connect to MongoDB you can use an implementation of the MongoDbFactory interface:

public interface MongoDbFactory {

	/**
	 * Creates a default {@link DB} instance.
	 * 
	 * @return the DB instance
	 * @throws DataAccessException
	 */
	DB getDb() throws DataAccessException;
	
	/**
	 * Creates a {@link DB} instance to access the database with the given name.
	 * 
	 * @param dbName must not be {@literal null} or empty.
	 *
	 * @return the DB instance
	 * @throws DataAccessException
	 */
	DB getDb(String dbName) throws DataAccessException;
}

The example below shows SimpleMongoDbFactory, the out-of-the-box implementation:

In Java:

MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(new Mongo(), "test");

Or in Spring's XML configuration:

<bean id="mongoDbFactory" class="org.springframework.data.mongodb.core.SimpleMongoDbFactory">
    <constructor-arg>
        <bean class="com.mongodb.Mongo"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

As you can see SimpleMongoDbFactory takes two arguments: 1) a Mongo instance and 2) a String specifying the name of the database. If you need to configure properties such as host, port, etc, you can pass those using one of the constructors provided by the underlying Mongo class. For more information on how to configure MongoDB, please refer to the Spring-Data-Document reference.

21.3 MongoDB Message Store

As described in EIP, a Message Store allows you to persist Messages. This can be very useful when dealing with components that have a capability to buffer messages (QueueChannel, Aggregator, Resequencer, etc.) if reliability is a concern. In Spring Integration, the MessageStore strategy also provides the foundation for the ClaimCheck pattern, which is described in EIP as well.

Spring Integration's MongoDB module provides the MongoDbMessageStore which is an implementation of both the MessageStore strategy (mainly used by the QueueChannel and ClaimCheck patterns) and the MessageGroupStore strategy (mainly used by the Aggregator and Resequencer patterns).

<bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="mongoDbMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="mongoDbMessageStore"/>

Above is a sample MongoDbMessageStore configuration that shows its usage by a QueueChannel and an Aggregator. As you can see it is a simple bean configuration, and it expects a MongoDbFactory as a constructor argument.

21.4 MongoDB Inbound Channel Adapter

The MongoDb Inbound Channel Adapter is a polling consumer that reads data from MongoDb and sends it as a Message payload.

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
       channel="replyChannel"
       query="{'name' : 'Bob'}"
       entity-class="java.lang.Object"
       auto-startup="false">
		<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>

As you can see from the configuration above, you configure a MongoDb Inbound Channel Adapter using the inbound-channel-adapter element, providing values for various attributes such as:

  • query or query-expression - a JSON query (see MongoDb Querying)

  • entity-class - the type of the payload object; if not supplied, a com.mongodb.DBObject will be returned.

  • collection-name or collection-name-expression - Identifies the name of the MongoDb collection to use.

  • mongodb-factory - reference to an instance of org.springframework.data.mongodb.MongoDbFactory

  • mongo-template - reference to an instance of org.springframework.data.mongodb.core.MongoTemplate

and other attributes that are common across all other inbound adapters (e.g., 'channel').

[Note]Note
You cannot set both mongo-template and mongodb-factory.

The example above is relatively simple and static since it has a literal value for the query and uses the default name for a collection. Sometimes you may need to change those values at runtime, based on some condition. To do that, simply use their -expression equivalents (query-expression and collection-name-expression) where the provided expression can be any valid SpEL expression.

Also, you may wish to do some post-processing to the successfully processed data that was read from the MongoDb. For example; you may want to move or remove a document after its been processed. You can do this using Transaction Synchronization feature that was added with Spring Integration 2.2.

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
	   channel="replyChannel"
	   query="{'name' : 'Bob'}"
	   entity-class="java.lang.Object"
	   auto-startup="false">
				<int:poller fixed-rate="200" max-messages-per-poll="1">
				    <int:transactional synchronization-factory="syncFactory"/>
				</int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
	<int:after-commit expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)" channe="someChannel"/>
</int:transaction-synchronization-factory>
	
<bean id="documentCleaner" class="foo.bar.DocumentCleaner"/>

<bean id="transactionManager" class="org.springframework.integration.transaction.PseudoTransactionManager"/>
public class DocumentCleaner {
		public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
			if (target instanceof List<?>){
				List<?> documents = (List<?>) target;
				for (Object document : documents) {
					mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
				}
			}
		}
}

As you can see from the above, all you need to do is declare your poller to be transactional with a transactional element. This element can reference a real transaction manager (for example if some other part of your flow invokes JDBC). If you don't have a 'real' transaction, you can use a org.springframework.integration.transaction.PseudoTransactionManager which is an implementation of Spring's PlatformTransactionManager and enables the use of the transaction synchronization features of the mongo adapter when there is no actual transaction.

[Important]Important
This does NOT make MongoDB itself transactional, it simply allows the synchronization of actions to be taken before/after success (commit) or after failure (rollback).

Once your poller is transactional all you need to do is set an instance of the org.springframework.integration.transaction.TransactionSynchronizationFactory on the transactional element. TransactionSynchronizationFactory will create an instance of the TransactioinSynchronization. For your convenience, we've exposed a default SpEL-based TransactionSynchronizationFactory which allows you to configure SpEL expressions, with their execution being coordinated (synchronized) with a transaction. Expressions for before-commit, after-commit, and after-rollback are supported, together with a channel for each where the evaluation result (if any) will be sent. For each sub-element you can specify expression and/or channel attributes. If only the channel attribute is present the received Message will be sent there as part of the particular synchronization scenario. If only the expression attribute is present and the result of an expression is a non-Null value, a Message with the result as the payload will be generated and sent to a default channel (NullChannel) and will appear in the logs (DEBUG). If you want the evaluation result to go to a specific channel add a channel attribute. If the result of an expression is null or void, no Message will be generated.

For more information about transaction synchronization, see Section B.3, “Transaction Synchronization”.

21.5 MongoDB Outbound Channel Adapter

The MongoDb Outbound Channel Adapter allows you to write the Message payload to a MongoDb document store

<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
	collection-name="myCollection"
	mongo-converter="mongoConverter"
	mongodb-factory="mongoDbFactory" />

As you can see from the configuration above, you configure a MongoDb Outbound Channel Adapter using the outbound-channel-adapter element, providing values for various attributes such as:

  • collection-name or collection-name-expression - Identifies the name of the MongoDb collection to use.

  • mongo-converter - reference to an instance of org.springframework.data.mongodb.core.convert.MongoConverter to assist with converting a raw java object to a JSON document representation

  • mongodb-factory - reference to an instance of org.springframework.data.mongodb.MongoDbFactory

  • mongo-template - reference to an instance of org.springframework.data.mongodb.core.MongoTemplate (NOTE: you can not have both mongo-template and mongodb-factory set)

and other attributes that are common across all other inbound adapters (e.g., 'channel').

The example above is relatively simple and static since it has a literal value for the collection-name. Sometimes you may need to change this value at runtime based on some condition. To do that, simply use collection-name-expression where the provided expression can be any valid SpEL expression.