23. MongoDb Support

As of version 2.1 Spring Integration introduces support for MongoDB: a "high-performance, open source, document-oriented database". This support comes in the form of a MongoDB-based MessageStore.

23.1 Introduction

To download, install, and run MongoDB please refer to the MongoDB documentation.

23.2 Connecting to MongoDb

To begin interacting with MongoDB you first need to connect to it. Spring Integration builds on the support provided by another Spring project, Spring Data MongoDB, which provides a factory class called MongoDbFactory that simplifies integration with the MongoDB Client API.

MongoDbFactory

To connect to MongoDB you can use an implementation of the MongoDbFactory interface:

public interface MongoDbFactory {

    /**
     * Creates a default {@link DB} instance.
     *
     * @return the DB instance
     * @throws DataAccessException
     */
    DB getDb() throws DataAccessException;

    /**
     * Creates a {@link DB} instance to access the database with the given name.
     *
     * @param dbName must not be {@literal null} or empty.
     *
     * @return the DB instance
     * @throws DataAccessException
     */
    DB getDb(String dbName) throws DataAccessException;
}

The example below shows SimpleMongoDbFactory, the out-of-the-box implementation:

In Java:

MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(new Mongo(), "test");

Or in Spring’s XML configuration:

<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoDbFactory">
    <constructor-arg>
        <bean class="com.mongodb.Mongo"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

As you can see SimpleMongoDbFactory takes two arguments: 1) a Mongo instance and 2) a String specifying the name of the database. If you need to configure properties such as host, port, etc, you can pass those using one of the constructors provided by the underlying Mongo class. For more information on how to configure MongoDB, please refer to the Spring-Data-MongoDB reference.

23.3 MongoDB Message Store

As described in EIP, a Message Store allows you to persist Messages. This can be very useful when dealing with components that have a capability to buffer messages (QueueChannel, Aggregator, Resequencer, etc.) if reliability is a concern. In Spring Integration, the MessageStore strategy also provides the foundation for the ClaimCheck pattern, which is described in EIP as well.

Spring Integration’s MongoDB module provides the MongoDbMessageStore which is an implementation of both the MessageStore strategy (mainly used by the ClaimCheck pattern) and the MessageGroupStore strategy (mainly used by the Aggregator and Resequencer patterns).

<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="mongoDbMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="mongoDbMessageStore"/>

Above is a sample MongoDbMessageStore configuration that shows its usage by a QueueChannel and an Aggregator. As you can see it is a simple bean configuration, and it expects a MongoDbFactory as a constructor argument.

The MongoDbMessageStore expands the Message as a Mongo document with all nested properties using the Spring Data Mongo Mapping mechanism. It is useful when you need to have access to the payload or headers for auditing or analytics, for example, against stored messages.

[Important]Important

The MongoDbMessageStore uses a custom MappingMongoConverter implementation to store Message s as MongoDB documents and there are some limitations for the properties (payload and header values) of the Message. For example, there is no ability to configure custom converters for complex domain payload s or header values. Or to provide a custom MongoTemplate (or MappingMongoConverter). To achieve these capabilities, an alternative MongoDB MessageStore implementation has been introduced; see next paragraph.

Spring Integration 3.0 introduced the ConfigurableMongoDbMessageStore - MessageStore and MessageGroupStore implementation. This class can receive, as a constructor argument, a MongoTemplate, with which you can configure with a custom WriteConcern, for example. Another constructor requires a MappingMongoConverter, and a MongoDbFactory, which allows you to provide some custom conversions for Message s and their properties. Note, by default, the ConfigurableMongoDbMessageStore uses standard Java serialization to write/read Message s to/from MongoDB (see MongoDbMessageBytesConverter) and relies on default values for other properties from MongoTemplate, which is built from the provided MongoDbFactory and MappingMongoConverter. The default name for the collection stored by the ConfigurableMongoDbMessageStore is configurableStoreMessages. It is recommended to use this implementation for robust and flexible solutions when messages contain complex data types.

23.3.1 MongoDB Channel Message Store

Starting with version 4.0, the new MongoDbChannelMessageStore has been introduced; it is an optimized MessageGroupStore for use in QueueChannel s. With priorityEnabled = true, it can be used in <int:priority-queue> s to achieve priority order polling for persisted messages. The priority MongoDB document field is populated from the IntegrationMessageHeaderAccessor.PRIORITY (priority) message header.

In addition, all MongoDB MessageStore s now have a sequence field for MessageGroup documents. The sequence value is the result of an $inc operation for a simple sequence document from the same collection, which is created on demand. The sequence field is used in poll operations to provide first-in-first-out (FIFO) message order (within priority if configured) when messages are stored within the same millisecond.

[Note]Note

It is not recommended to use the same MongoDbChannelMessageStore bean for priority and non-priority, because the priorityEnabled option applies to the entire store. However, the same collection can be used for both MongoDbChannelMessageStore types, because message polling from the store is sorted and uses indexes. To configure that scenario, simply extend one message store bean from the other:

<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
    <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="store"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

23.3.2 MongoDB Metadata Store

As of Spring Integration 4.2, a new MongoDB-based MetadataStore (Section 10.5, “Metadata Store”) implementation is available. The MongoDbMetadataStore can be used to maintain metadata state across application restarts. This new MetadataStore implementation can be used with adapters such as:

In order to instruct these adapters to use the new MongoDbMetadataStore, simply declare a Spring bean using the bean name metadataStore. The Twitter Inbound Channel Adapter and the Feed Inbound Channel Adapter will both automatically pick up and use the declared MongoDbMetadataStore:

@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
    return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}

The MongoDbMetadataStore also implements ConcurrentMetadataStore, allowing it to be reliably shared across multiple application instances where only one instance will be allowed to store or modify a key’s value. All these operations are atomic via MongoDB guarantees.

23.4 MongoDB Inbound Channel Adapter

The MongoDb Inbound Channel Adapter is a polling consumer that reads data from MongoDb and sends it as a Message payload.

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
       channel="replyChannel"
       query="{'name' : 'Bob'}"
       entity-class="java.lang.Object"
       auto-startup="false">
		<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>

As you can see from the configuration above, you configure a MongoDb Inbound Channel Adapter using the inbound-channel-adapter element, providing values for various attributes such as:

  • query - a JSON query (see MongoDb Querying)
  • query-expression - A SpEL expression that is evaluated to a JSON query String (as the query attribute above), or to an instance of o.s.data.mongodb.core.query.Query. Mutually exclusive with query attribute.
  • entity-class - the type of the payload object; if not supplied, a com.mongodb.DBObject will be returned.
  • collection-name or collection-name-expression - Identifies the name of the MongoDb collection to use.
  • mongodb-factory - reference to an instance of o.s.data.mongodb.MongoDbFactory
  • mongo-template - reference to an instance of o.s.data.mongodb.core.MongoTemplate

    and other attributes that are common across all other inbound adapters (e.g., 'channel').
[Note]Note

You cannot set both mongo-template and mongodb-factory.

The example above is relatively simple and static since it has a literal value for the query and uses the default name for a collection. Sometimes you may need to change those values at runtime, based on some condition. To do that, simply use their -expression equivalents (query-expression and collection-name-expression) where the provided expression can be any valid SpEL expression.

Also, you may wish to do some post-processing to the successfully processed data that was read from the MongoDb. For example; you may want to move or remove a document after its been processed. You can do this using Transaction Synchronization feature that was added with Spring Integration 2.2.

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
    channel="replyChannel"
    query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
    entity-class="java.lang.Object"
    auto-startup="false">
        <int:poller fixed-rate="200" max-messages-per-poll="1">
            <int:transactional synchronization-factory="syncFactory"/>
        </int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit
        expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
        channe="someChannel"/>
</int:transaction-synchronization-factory>

<bean id="documentCleaner" class="foo.bar.DocumentCleaner"/>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
public class DocumentCleaner {
    public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
        if (target instanceof List<?>){
            List<?> documents = (List<?>) target;
            for (Object document : documents) {
                mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
            }
        }
    }
}

As you can see from the above, all you need to do is declare your poller to be transactional with a transactional element. This element can reference a real transaction manager (for example if some other part of your flow invokes JDBC). If you don’t have a real transaction, you can use a org.springframework.integration.transaction.PseudoTransactionManager which is an implementation of Spring’s PlatformTransactionManager and enables the use of the transaction synchronization features of the mongo adapter when there is no actual transaction.

[Important]Important

This does NOT make MongoDB itself transactional, it simply allows the synchronization of actions to be taken before/after success (commit) or after failure (rollback).

Once your poller is transactional all you need to do is set an instance of the o.s.i.transaction.TransactionSynchronizationFactory on the transactional element. TransactionSynchronizationFactory will create an instance of the TransactionSynchronization. For your convenience, we’ve exposed a default SpEL-based TransactionSynchronizationFactory which allows you to configure SpEL expressions, with their execution being coordinated (synchronized) with a transaction. Expressions for before-commit, after-commit, and after-rollback are supported, together with a channel for each where the evaluation result (if any) will be sent. For each sub-element you can specify expression and/or channel attributes. If only the channel attribute is present the received Message will be sent there as part of the particular synchronization scenario. If only the expression attribute is present and the result of an expression is a non-Null value, a Message with the result as the payload will be generated and sent to a default channel (NullChannel) and will appear in the logs (DEBUG). If you want the evaluation result to go to a specific channel add a channel attribute. If the result of an expression is null or void, no Message will be generated.

For more information about transaction synchronization, see Section C.3, “Transaction Synchronization”.

23.5 MongoDB Outbound Channel Adapter

The MongoDb Outbound Channel Adapter allows you to write the Message payload to a MongoDb document store

<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
	collection-name="myCollection"
	mongo-converter="mongoConverter"
	mongodb-factory="mongoDbFactory" />

As you can see from the configuration above, you configure a MongoDb Outbound Channel Adapter using the outbound-channel-adapter element, providing values for various attributes such as:

  • collection-name or collection-name-expression - Identifies the name of the MongoDb collection to use.
  • mongo-converter - reference to an instance of o.s.data.mongodb.core.convert.MongoConverter to assist with converting a raw java object to a JSON document representation
  • mongodb-factory - reference to an instance of o.s.data.mongodb.MongoDbFactory
  • mongo-template - reference to an instance of o.s.data.mongodb.core.MongoTemplate (NOTE: you can not have both mongo-template and mongodb-factory set)

and other attributes that are common across all other inbound adapters (e.g., channel).

The example above is relatively simple and static since it has a literal value for the collection-name. Sometimes you may need to change this value at runtime based on some condition. To do that, simply use collection-name-expression where the provided expression can be any valid SpEL expression.

23.6 MongoDB Outbound Gateway

Starting with version 5.0, the MongoDb Outbound Gateway is provided and it allows you to query a database by sending a Message to its request channel. The gateway will then send the response to the reply channel. The Message payload and headers can be used to specify the query, as well as collection name.

<int-mongodb:outbound-gateway id="gatewayQuery"
    mongodb-factory="mongoDbFactory"
    mongo-converter="mongoConverter"
    query="{firstName: 'Bob'}"
    collection-name="foo"
    request-channel="in"
    reply-channel="out"
    entity-class="org.springframework.integration.mongodb.test.entity$Person"/>
  • collection-name or collection-name-expression - identifies the name of the MongoDb collection to use;
  • mongo-converter - reference to an instance of o.s.data.mongodb.core.convert.MongoConverter to assist with converting a raw java object to a JSON document representation
  • mongodb-factory - reference to an instance of o.s.data.mongodb.MongoDbFactory
  • mongo-template - reference to an instance of o.s.data.mongodb.core.MongoTemplate (NOTE: you can not have both mongo-template and mongodb-factory set)
  • entity-class - the fully qualified name of the entity class to be passed to find(..) or findOne(..) method in MongoTemplate. If this attribute is not provided the default value is org.bson.Document;
  • query or query-expression - specifies the MongoDb query. Please refer to MongoDB documentation for more query samples.
  • collection-callback - reference to an instance of org.springframework.data.mongodb.core.CollectionCallback (NOTE: you can not have both collection-callback and any of the query attributes).

23.6.1 Configuring with Java Configuration

The following Spring Boot application provides an example of configuring the outbound gateway using Java configuration:

@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory mongoDbFactory;

    @Bean
    @ServiceActivator(inputChannel = "requestChannel")
    public MessageHandler mongoDbOutboundGateway() {
        MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
        gateway.setCollectionNameExpressionString("'foo'");
        gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
        gateway.setExpectSingleResult(true);
        gateway.setEntityClass(Person.class);
        gateway.setOutputChannelName("replyChannel");
        return gateway;
    }

    @Bean
    @ServiceActivator(inputChannel = "replyChannel")
    public MessageHandler handler() {
        return message -> System.out.println(message.getPayload());
    }
}

23.6.2 Configuring with the Java DSL

The following Spring Boot application provides an example of configuring the Outbound Gateway using the Java DSL:

@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory;

    @Autowired
    private MongoConverter;


    @Bean
    public IntegrationFlow gatewaySingleQueryFlow() {
        return f -> f
                .handle(queryOutboundGateway())
                .channel(c -> c.queue("retrieveResults"));
    }

    private MongoDbOutboundGatewaySpec queryOutboundGateway() {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
                .query("{name : 'Bob'}")
                .collectionNameFunction(m -> m.getHeaders().get("collection"))
                .expectSingleResult(true)
                .entityClass(Person.class);
    }

}

Alternatively to the query and query-expression properties, you can specify other database operations through the collectionCallback property. The following example specifies a count operation:

private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
    return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .collectionCallback(MongoCollection::count)
            .collectionName("foo");
    }