25. MongoDb Support

Version 2.1 introduced support for MongoDB: a "high-performance, open source, document-oriented database".

You need to include this dependency into your project:

Maven. 

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mongodb</artifactId>
    <version>5.1.3.BUILD-SNAPSHOT</version>
</dependency>

Gradle. 

compile "org.springframework.integration:spring-integration-mongodb:5.1.3.BUILD-SNAPSHOT"

To download, install, and run MongoDB, see the MongoDB documentation.

25.1 Connecting to MongoDb

To begin interacting with MongoDB, you first need to connect to it. Spring Integration builds on the support provided by another Spring project, Spring Data MongoDB. It provides a factory class called MongoDbFactory, which simplifies integration with the MongoDB Client API.

25.1.1 Using MongoDbFactory

To connect to MongoDB you can use an implementation of the MongoDbFactory interface. The following listing shows the MongoDbFactory interface:

public interface MongoDbFactory {

    /**
     * Creates a default {@link DB} instance.
     *
     * @return the DB instance
     * @throws DataAccessException
     */
    DB getDb() throws DataAccessException;

    /**
     * Creates a {@link DB} instance to access the database with the given name.
     *
     * @param dbName must not be {@literal null} or empty.
     *
     * @return the DB instance
     * @throws DataAccessException
     */
    DB getDb(String dbName) throws DataAccessException;
}

The following example shows how to use SimpleMongoDbFactory, the out-of-the-box implementation, in Java:

MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(new Mongo(), "test");

The following example shows how to use SimpleMongoDbFactory in XML configuration:

<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoDbFactory">
    <constructor-arg>
        <bean class="com.mongodb.Mongo"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

SimpleMongoDbFactory takes two arguments: a Mongo instance and a String that specifies the name of the database. If you need to configure properties such as host, port, and others, you can pass those by using one of the constructors provided by the underlying Mongo class. For more information on how to configure MongoDB, see the Spring-Data-MongoDB reference.

25.2 MongoDB Message Store

As described in the Enterprise Integration Patterns (EIP) book, a Message Store lets you persist messages. Doing so can be useful when dealing with components that have the ability to buffer messages (QueueChannel, aggregator, resequencer, and others.) if reliability is a concern. In Spring Integration, the MessageStore strategy also provides the foundation for the claim check pattern, which is described in EIP as well.

Spring Integration’s MongoDB module provides the MongoDbMessageStore, which is an implementation of both the MessageStore strategy (mainly used by the claim check pattern) and the MessageGroupStore strategy (mainly used by the aggregator and resequencer patterns).

The following example configures a MongoDbMessageStore to use a QueueChannel and an aggregator:

<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="mongoDbMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="mongoDbMessageStore"/>

The preceding example is a simple bean configuration, and it expects a MongoDbFactory as a constructor argument.

The MongoDbMessageStore expands the Message as a Mongo document with all nested properties by using the Spring Data Mongo mapping mechanism. It is useful when you need to have access to the payload or headers for auditing or analytics — for example, against stored messages.

[Important]Important

The MongoDbMessageStore uses a custom MappingMongoConverter implementation to store Message instances as MongoDB documents, and there are some limitations for the properties (payload and header values) of the Message. For example, there is no ability to configure custom converters for complex domain payload instances or header values. There is also no way to provide a custom MongoTemplate (or MappingMongoConverter). To achieve these capabilities, an alternative MongoDB MessageStore implementation has been introduced (we describe it next).

Spring Integration 3.0 introduced the ConfigurableMongoDbMessageStore. It implements both the MessageStore and MessageGroupStore interfaces. This class can receive, as a constructor argument, a MongoTemplate, with which you can, for example, configure a custom WriteConcern. Another constructor requires a MappingMongoConverter and a MongoDbFactory, which lets you provide some custom conversions for Message instances and their properties. Note that, by default, the ConfigurableMongoDbMessageStore uses standard Java serialization to write and read Message instances to and from MongoDB (see MongoDbMessageBytesConverter) and relies on default values for other properties from MongoTemplate. It builds a MongoTemplate from the provided MongoDbFactory and MappingMongoConverter. The default name for the collection stored by the ConfigurableMongoDbMessageStore is configurableStoreMessages. We recommend using this implementation to create robust and flexible solutions when messages contain complex data types.

25.2.1 MongoDB Channel Message Store

Version 4.0 introduced the new MongoDbChannelMessageStore. It is an optimized MessageGroupStore for use in QueueChannel instances. With priorityEnabled = true, you can use it in <int:priority-queue> instances to achieve priority-order polling for persisted messages. The priority MongoDB document field is populated from the IntegrationMessageHeaderAccessor.PRIORITY (priority) message header.

In addition, all MongoDB MessageStore instances now have a sequence field for MessageGroup documents. The sequence value is the result of an $inc operation for a simple sequence document from the same collection, which is created on demand. The sequence field is used in poll operations to provide first-in-first-out (FIFO) message order (within priority, if configured) when messages are stored within the same millisecond.

[Note]Note

We do not recommend using the same MongoDbChannelMessageStore bean for priority and non-priority, because the priorityEnabled option applies to the entire store. However, the same collection can be used for both MongoDbChannelMessageStore types, because message polling from the store is sorted and uses indexes. To configure that scenario, you can extend one message store bean from the other, as the following example shows:

<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
    <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="store"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

25.2.2 MongoDB Metadata Store

Spring Integration 4.2 introduced a new MongoDB-based MetadataStore (see Section 12.5, “Metadata Store”) implementation. You can use the MongoDbMetadataStore to maintain metadata state across application restarts. You can use this new MetadataStore implementation with adapters such as:

To instruct these adapters to use the new MongoDbMetadataStore, declare a Spring bean with a bean name of metadataStore. The feed inbound channel adapter automatically picks up and use the declared MongoDbMetadataStore. The following example shows how to declare a bean with a name of metadataStore:

@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
    return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}

The MongoDbMetadataStore also implements ConcurrentMetadataStore, letting it be reliably shared across multiple application instances, where only one instance is allowed to store or modify a key’s value. All these operations are atomic, thanks to MongoDB guarantees.

25.3 MongoDB Inbound Channel Adapter

The MongoDB inbound channel adapter is a polling consumer that reads data from MongoDB and sends it as a Message payload. The following example shows how to configure a MongoDB inbound channel adapter:

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
       channel="replyChannel"
       query="{'name' : 'Bob'}"
       entity-class="java.lang.Object"
       auto-startup="false">
		<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>

As the preceding configuration shows, you configure a MongoDb inbound channel adapter by using the inbound-channel-adapter element and providing values for various attributes, such as:

  • query: A JSON query (see MongoDB Querying)
  • query-expression: A SpEL expression that is evaluated to a JSON query string (as the query attribute above) or to an instance of o.s.data.mongodb.core.query.Query. Mutually exclusive with the query attribute.
  • entity-class: The type of the payload object. If not supplied, a com.mongodb.DBObject is returned.
  • collection-name or collection-name-expression: Identifies the name of the MongoDB collection to use.
  • mongodb-factory: Reference to an instance of o.s.data.mongodb.MongoDbFactory
  • mongo-template: Reference to an instance of o.s.data.mongodb.core.MongoTemplate
  • Other attributes that are common across all other inbound adapters (such as channel).
[Note]Note

You cannot set both mongo-template and mongodb-factory.

The preceding example is relatively simple and static, since it has a literal value for the query and uses the default name for a collection. Sometimes, you may need to change those values at runtime, based on some condition. To do so, use their -expression equivalents (query-expression and collection-name-expression), where the provided expression can be any valid SpEL expression.

Also, you may wish to do some post-processing to the successfully processed data that was read from the MongoDB. For example; you may want to move or remove a document after it has been processed. You can do so by using that transaction synchronization feature Spring Integration 2.2 added, as the following example shows:

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
    channel="replyChannel"
    query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
    entity-class="java.lang.Object"
    auto-startup="false">
        <int:poller fixed-rate="200" max-messages-per-poll="1">
            <int:transactional synchronization-factory="syncFactory"/>
        </int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit
        expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
        channe="someChannel"/>
</int:transaction-synchronization-factory>

<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

The following example shows the DocumentCleaner referenced in the preceding example:

public class DocumentCleaner {
    public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
        if (target instanceof List<?>){
            List<?> documents = (List<?>) target;
            for (Object document : documents) {
                mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
            }
        }
    }
}

You can declare your poller to be transactional by using the transactional element. This element can reference a real transaction manager (for example, if some other part of your flow invokes JDBC). If you do not have a "real" transaction, you can use an instance of o.s.i.transaction.PseudoTransactionManager, which is an implementation of Spring’s PlatformTransactionManager and enables the use of the transaction synchronization features of the Mongo adapter when there is no actual transaction.

[Important]Important

Doing so does not make MongoDB itself transactional. It lets the synchronization of actions be taken before or after success (commit) or after failure (rollback).

Once your poller is transactional, you can set an instance of the o.s.i.transaction.TransactionSynchronizationFactory on the transactional element. TransactionSynchronizationFactory creates an instance of the TransactionSynchronization. For your convenience, we have exposed a default SpEL-based TransactionSynchronizationFactory that lets you configure SpEL expressions, with their execution being coordinated (synchronized) with a transaction. Expressions for before-commit, after-commit, and after-rollback events are supported, together with a channel for each event where the evaluation result (if any) is sent. For each child element, you can specify expression and channel attributes. If only the channel attribute is present, the received message is sent there as part of the particular synchronization scenario. If only the expression attribute is present and the result of an expression is a non-null value, a message with the result as the payload is generated and sent to a default channel (NullChannel) and appears in the logs (on the DEBUG level). If you want the evaluation result to go to a specific channel, add a channel attribute. If the result of an expression is null or void, no message is generated.

For more information about transaction synchronization, see Section C.3, “Transaction Synchronization”.

25.4 MongoDB Outbound Channel Adapter

The MongoDB outbound channel adapter lets you write the message payload to a MongoDB document store, as the following example shows:

<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
	collection-name="myCollection"
	mongo-converter="mongoConverter"
	mongodb-factory="mongoDbFactory" />

As the preceding configuration shows, you can configure a MongoDB outbound channel adapter by using the outbound-channel-adapter element, providing values for various attributes, such as:

  • collection-name or collection-name-expression: Identifies the name of the MongoDb collection to use.
  • mongo-converter: Reference to an instance of o.s.data.mongodb.core.convert.MongoConverter that assists with converting a raw Java object to a JSON document representation.
  • mongodb-factory: Reference to an instance of o.s.data.mongodb.MongoDbFactory.
  • mongo-template: Reference to an instance of o.s.data.mongodb.core.MongoTemplate. NOTE: you can not have both mongo-template and mongodb-factory set.
  • Other attributes that are common across all other inbound adapters (such as channel).

The preceding example is relatively simple and static, since it has a literal value for the collection-name. Sometimes, you may need to change this value at runtime, based on some condition. To do that, use collection-name-expression, where the provided expression is any valid SpEL expression.

25.5 MongoDB Outbound Gateway

Version 5.0 introduced the MongoDB outbound gateway. It allows you query a database by sending a message to its request channel. The gateway then send the response to the reply channel. You can use the message payload and headers to specify the query and the collection name, as the following example shows:

<int-mongodb:outbound-gateway id="gatewayQuery"
    mongodb-factory="mongoDbFactory"
    mongo-converter="mongoConverter"
    query="{firstName: 'Bob'}"
    collection-name="myCollection"
    request-channel="in"
    reply-channel="out"
    entity-class="org.springframework.integration.mongodb.test.entity$Person"/>

You can use the following attributes with a MongoDB outbound Gateway:

  • collection-name or collection-name-expression: Identifies the name of the MongoDB collection to use.
  • mongo-converter: Reference to an instance of o.s.data.mongodb.core.convert.MongoConverter that assists with converting a raw Java object to a JSON document representation.
  • mongodb-factory: Reference to an instance of o.s.data.mongodb.MongoDbFactory.
  • mongo-template: Reference to an instance of o.s.data.mongodb.core.MongoTemplate. NOTE: you can not set both mongo-template and mongodb-factory.
  • entity-class: The fully qualified name of the entity class to be passed to the find(..) and findOne(..) methods in MongoTemplate. If this attribute is not provided, the default value is org.bson.Document.
  • query or query-expression: Specifies the MongoDB query. See the MongoDB documentation for more query samples.
  • collection-callback: Reference to an instance of org.springframework.data.mongodb.core.CollectionCallback. Preferable an instance of o.s.i.mongodb.outbound.MessageCollectionCallback since 5.0.11 with the request message context. See its Javadocs for more information. NOTE: You can not have both collection-callback and any of the query attributes.

25.5.1 Configuring with Java Configuration

The following Spring Boot application shows an example of how to configure the outbound gateway with Java configuration:

@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory mongoDbFactory;

    @Bean
    @ServiceActivator(inputChannel = "requestChannel")
    public MessageHandler mongoDbOutboundGateway() {
        MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
        gateway.setCollectionNameExpressionString("'myCollection'");
        gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
        gateway.setExpectSingleResult(true);
        gateway.setEntityClass(Person.class);
        gateway.setOutputChannelName("replyChannel");
        return gateway;
    }

    @Bean
    @ServiceActivator(inputChannel = "replyChannel")
    public MessageHandler handler() {
        return message -> System.out.println(message.getPayload());
    }
}

25.5.2 Configuring with the Java DSL

The following Spring Boot application show an example of how to configure the outbound gateway with the Java DSL:

@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory;

    @Autowired
    private MongoConverter;


    @Bean
    public IntegrationFlow gatewaySingleQueryFlow() {
        return f -> f
                .handle(queryOutboundGateway())
                .channel(c -> c.queue("retrieveResults"));
    }

    private MongoDbOutboundGatewaySpec queryOutboundGateway() {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
                .query("{name : 'Bob'}")
                .collectionNameFunction(m -> m.getHeaders().get("collection"))
                .expectSingleResult(true)
                .entityClass(Person.class);
    }

}

As an alternate to the query and query-expression properties, you can specify other database operations by using the collectionCallback property as a reference to the MessageCollectionCallback functional interface implementation. The following example specifies a count operation:

private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
    return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .collectionCallback((collection, requestMessage) -> collection.count())
            .collectionName("myCollection");
    }