MongoDb Support

Version 2.1 introduced support for MongoDB: a “high-performance, open source, document-oriented database”.

You need to include this dependency into your project:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mongodb</artifactId>
    <version>5.5.2-SNAPSHOT</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-mongodb:5.5.2-SNAPSHOT"

To download, install, and run MongoDB, see the MongoDB documentation.

Connecting to MongoDb

Blocking or Reactive?

Beginning with version 5.3, Spring Integration provides support for reactive MongoDB drivers to enable non-blocking I/O when accessing MongoDB. To enable reactive support, add the MongoDB reactive streams driver to your dependencies:

Maven
<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
Gradle
compile "org.mongodb:mongodb-driver-reactivestreams"

For regular synchronous client you need to add its respective driver into dependencies:

Maven
<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-sync</artifactId>
</dependency>
Gradle
compile "org.mongodb:mongodb-driver-sync"

Both of them are optional in the framework for better end-user choice support.

To begin interacting with MongoDB, you first need to connect to it. Spring Integration builds on the support provided by another Spring project, Spring Data MongoDB. It provides factory classes called MongoDatabaseFactory and ReactiveMongoDatabaseFactory, which simplify integration with the MongoDB Client API.

Spring Data provides provides the blocking MongoDB driver by default but you may opt-in for reactive usage by including the above dependency.

Using MongoDatabaseFactory

To connect to MongoDB you can use an implementation of the MongoDatabaseFactory interface.

The following example shows how to use SimpleMongoClientDatabaseFactory:

Java
MongoDatabaseFactory mongoDbFactory =
        new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
XML
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

SimpleMongoClientDatabaseFactory takes two arguments: a MongoClient instance and a String that specifies the name of the database. If you need to configure properties such as host, port, and others, you can pass those by using one of the constructors provided by the underlying MongoClients class. For more information on how to configure MongoDB, see the Spring-Data-MongoDB reference.

Using ReactiveMongoDatabaseFactory

To connect to MongoDB with the reactive driver, you can use an implementation of the ReactiveMongoDatabaseFactory interface.

The following example shows how to use SimpleReactiveMongoDatabaseFactory:

Java
ReactiveMongoDatabaseFactory mongoDbFactory =
        new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
XML
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

MongoDB Message Store

As described in the Enterprise Integration Patterns (EIP) book, a Message Store lets you persist messages. Doing so can be useful when dealing with components that have the ability to buffer messages (QueueChannel, aggregator, resequencer, and others.) if reliability is a concern. In Spring Integration, the MessageStore strategy also provides the foundation for the claim check pattern, which is described in EIP as well.

Spring Integration’s MongoDB module provides the MongoDbMessageStore, which is an implementation of both the MessageStore strategy (mainly used by the claim check pattern) and the MessageGroupStore strategy (mainly used by the aggregator and resequencer patterns).

The following example configures a MongoDbMessageStore to use a QueueChannel and an aggregator:

<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="mongoDbMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="mongoDbMessageStore"/>

The preceding example is a simple bean configuration, and it expects a MongoDbFactory as a constructor argument.

The MongoDbMessageStore expands the Message as a Mongo document with all nested properties by using the Spring Data Mongo mapping mechanism. It is useful when you need to have access to the payload or headers for auditing or analytics — for example, against stored messages.

The MongoDbMessageStore uses a custom MappingMongoConverter implementation to store Message instances as MongoDB documents, and there are some limitations for the properties (payload and header values) of the Message.

Starting with version 5.1.6, the MongoDbMessageStore can be configured with custom converters which are propagated into an internal MappingMongoConverter implementation. See MongoDbMessageStore.setCustomConverters(Object…​ customConverters) JavaDocs for more information.

Spring Integration 3.0 introduced the ConfigurableMongoDbMessageStore. It implements both the MessageStore and MessageGroupStore interfaces. This class can receive, as a constructor argument, a MongoTemplate, with which you can, for example, configure a custom WriteConcern. Another constructor requires a MappingMongoConverter and a MongoDbFactory, which lets you provide some custom conversions for Message instances and their properties. Note that, by default, the ConfigurableMongoDbMessageStore uses standard Java serialization to write and read Message instances to and from MongoDB (see MongoDbMessageBytesConverter) and relies on default values for other properties from MongoTemplate. It builds a MongoTemplate from the provided MongoDbFactory and MappingMongoConverter. The default name for the collection stored by the ConfigurableMongoDbMessageStore is configurableStoreMessages. We recommend using this implementation to create robust and flexible solutions when messages contain complex data types.

MongoDB Channel Message Store

Version 4.0 introduced the new MongoDbChannelMessageStore. It is an optimized MessageGroupStore for use in QueueChannel instances. With priorityEnabled = true, you can use it in <int:priority-queue> instances to achieve priority-order polling for persisted messages. The priority MongoDB document field is populated from the IntegrationMessageHeaderAccessor.PRIORITY (priority) message header.

In addition, all MongoDB MessageStore instances now have a sequence field for MessageGroup documents. The sequence value is the result of an $inc operation for a simple sequence document from the same collection, which is created on demand. The sequence field is used in poll operations to provide first-in-first-out (FIFO) message order (within priority, if configured) when messages are stored within the same millisecond.

We do not recommend using the same MongoDbChannelMessageStore bean for priority and non-priority, because the priorityEnabled option applies to the entire store. However, the same collection can be used for both MongoDbChannelMessageStore types, because message polling from the store is sorted and uses indexes. To configure that scenario, you can extend one message store bean from the other, as the following example shows:
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
    <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="store"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

MongoDB Metadata Store

Spring Integration 4.2 introduced a new MongoDB-based MetadataStore (see Metadata Store) implementation. You can use the MongoDbMetadataStore to maintain metadata state across application restarts. You can use this new MetadataStore implementation with adapters such as:

To instruct these adapters to use the new MongoDbMetadataStore, declare a Spring bean with a bean name of metadataStore. The feed inbound channel adapter automatically picks up and use the declared MongoDbMetadataStore. The following example shows how to declare a bean with a name of metadataStore:

@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
    return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}

The MongoDbMetadataStore also implements ConcurrentMetadataStore, letting it be reliably shared across multiple application instances, where only one instance is allowed to store or modify a key’s value. All these operations are atomic, thanks to MongoDB guarantees.

MongoDB Inbound Channel Adapter

The MongoDB inbound channel adapter is a polling consumer that reads data from MongoDB and sends it as a Message payload. The following example shows how to configure a MongoDB inbound channel adapter:

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
       channel="replyChannel"
       query="{'name' : 'Bob'}"
       entity-class="java.lang.Object"
       auto-startup="false">
		<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>

As the preceding configuration shows, you configure a MongoDb inbound channel adapter by using the inbound-channel-adapter element and providing values for various attributes, such as:

  • query: A JSON query (see MongoDB Querying)

  • query-expression: A SpEL expression that is evaluated to a JSON query string (as the query attribute above) or to an instance of o.s.data.mongodb.core.query.Query. Mutually exclusive with the query attribute.

  • entity-class: The type of the payload object. If not supplied, a com.mongodb.DBObject is returned.

  • collection-name or collection-name-expression: Identifies the name of the MongoDB collection to use.

  • mongodb-factory: Reference to an instance of o.s.data.mongodb.MongoDbFactory

  • mongo-template: Reference to an instance of o.s.data.mongodb.core.MongoTemplate

  • Other attributes that are common across all other inbound adapters (such as 'channel').

You cannot set both mongo-template and mongodb-factory.

The preceding example is relatively simple and static, since it has a literal value for the query and uses the default name for a collection. Sometimes, you may need to change those values at runtime, based on some condition. To do so, use their -expression equivalents (query-expression and collection-name-expression), where the provided expression can be any valid SpEL expression.

Also, you may wish to do some post-processing to the successfully processed data that was read from the MongoDB. For example; you may want to move or remove a document after it has been processed. You can do so by using that transaction synchronization feature Spring Integration 2.2 added, as the following example shows:

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
    channel="replyChannel"
    query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
    entity-class="java.lang.Object"
    auto-startup="false">
        <int:poller fixed-rate="200" max-messages-per-poll="1">
            <int:transactional synchronization-factory="syncFactory"/>
        </int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit
        expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
        channel="someChannel"/>
</int:transaction-synchronization-factory>

<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

The following example shows the DocumentCleaner referenced in the preceding example:

public class DocumentCleaner {
    public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
        if (target instanceof List<?>){
            List<?> documents = (List<?>) target;
            for (Object document : documents) {
                mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
            }
        }
    }
}

You can declare your poller to be transactional by using the transactional element. This element can reference a real transaction manager (for example, if some other part of your flow invokes JDBC). If you do not have a “real” transaction, you can use an instance of o.s.i.transaction.PseudoTransactionManager, which is an implementation of Spring’s PlatformTransactionManager and enables the use of the transaction synchronization features of the Mongo adapter when there is no actual transaction.

Doing so does not make MongoDB itself transactional. It lets the synchronization of actions be taken before or after success (commit) or after failure (rollback).

Once your poller is transactional, you can set an instance of the o.s.i.transaction.TransactionSynchronizationFactory on the transactional element. A TransactionSynchronizationFactory creates an instance of the TransactionSynchronization. For your convenience, we have exposed a default SpEL-based TransactionSynchronizationFactory that lets you configure SpEL expressions, with their execution being coordinated (synchronized) with a transaction. Expressions for before-commit, after-commit, and after-rollback events are supported, together with a channel for each event where the evaluation result (if any) is sent. For each child element, you can specify expression and channel attributes. If only the channel attribute is present, the received message is sent there as part of the particular synchronization scenario. If only the expression attribute is present and the result of an expression is a non-null value, a message with the result as the payload is generated and sent to a default channel (NullChannel) and appears in the logs (on the DEBUG level). If you want the evaluation result to go to a specific channel, add a channel attribute. If the result of an expression is null or void, no message is generated.

For more information about transaction synchronization, see Transaction Synchronization.

Starting with version 5.5, the MongoDbMessageSource can be configured with an updateExpression, which must evaluate to a String with the MongoDb update syntax or to an org.springframework.data.mongodb.core.query.Update instance. It can be used as an alternative to abov described post-processing procedure and it modifies those entities that were fetched from the collection, so they won’t be pulled from the collection again on the next polling cycle (assuming the update changes some value used in the query). It is still recommended to use transactions to achieve execution isolation and data consistency, when several instances of the MongoDbMessageSource for the same collection are used in the cluster.

MongoDB Change Stream Inbound Channel Adapter

Starting with version 5.3, the spring-integration-mongodb module introduces the MongoDbChangeStreamMessageProducer - a reactive MessageProducerSupport implementation for the Spring Data ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class) API. This component produces a Flux of messages with a body of ChangeStreamEvent as the payload by default and some change stream related headers (see MongoHeaders). It is recommended that this MongoDbChangeStreamMessageProducer is combined with a FluxMessageChannel as the outputChannel for on-demand subscription and event consumption downstream.

The Java DSL configuration for this channel adapter may look like this:

@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
    return IntegrationFlows.from(
            MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
                    .domainType(Person.class)
                    .collection("person")
                    .extractBody(false))
            .channel(MessageChannels.flux())
            .get();
}

When the MongoDbChangeStreamMessageProducer is stopped, or the subscription is cancelled downstream, or the MongoDb change stream produces an OperationType.INVALIDATE, the Publisher is completed. The channel adapter can be started again and a new Publisher of source data is created and it is automatically subscribed in the MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>). This channel adapter can be reconfigured for new options between starts, if there is a requirement to consume change stream events from other places.

See more information about change stream support in Spring Data MongoDb documentation.

MongoDB Outbound Channel Adapter

The MongoDB outbound channel adapter lets you write the message payload to a MongoDB document store, as the following example shows:

<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
	collection-name="myCollection"
	mongo-converter="mongoConverter"
	mongodb-factory="mongoDbFactory" />

As the preceding configuration shows, you can configure a MongoDB outbound channel adapter by using the outbound-channel-adapter element, providing values for various attributes, such as:

  • collection-name or collection-name-expression: Identifies the name of the MongoDb collection to use.

  • mongo-converter: Reference to an instance of o.s.data.mongodb.core.convert.MongoConverter that assists with converting a raw Java object to a JSON document representation.

  • mongodb-factory: Reference to an instance of o.s.data.mongodb.MongoDbFactory.

  • mongo-template: Reference to an instance of o.s.data.mongodb.core.MongoTemplate. NOTE: you can not have both mongo-template and mongodb-factory set.

  • Other attributes that are common across all other inbound adapters (such as 'channel').

The preceding example is relatively simple and static, since it has a literal value for the collection-name. Sometimes, you may need to change this value at runtime, based on some condition. To do that, use collection-name-expression, where the provided expression is any valid SpEL expression.

MongoDB Outbound Gateway

Version 5.0 introduced the MongoDB outbound gateway. It allows you query a database by sending a message to its request channel. The gateway then send the response to the reply channel. You can use the message payload and headers to specify the query and the collection name, as the following example shows:

Java DSL
@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory;

    @Autowired
    private MongoConverter;


    @Bean
    public IntegrationFlow gatewaySingleQueryFlow() {
        return f -> f
                .handle(queryOutboundGateway())
                .channel(c -> c.queue("retrieveResults"));
    }

    private MongoDbOutboundGatewaySpec queryOutboundGateway() {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
                .query("{name : 'Bob'}")
                .collectionNameFunction(m -> m.getHeaders().get("collection"))
                .expectSingleResult(true)
                .entityClass(Person.class);
    }

}
Kotlin DSL
class MongoDbKotlinApplication {

    fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)

    @Autowired
    lateinit var mongoDbFactory: MongoDatabaseFactory;

    @Autowired
    lateinit var mongoConverter: MongoConverter;

    @Bean
    fun gatewaySingleQueryFlow() =
    integrationFlow {
        handle(queryOutboundGateway())
        channel { queue("retrieveResults") }
    }

    private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .query("{name : 'Bob'}")
            .collectionNameFunction<Any> { m -> m.headers["collection"] as String }
            .expectSingleResult(true)
            .entityClass(Person::class.java)
    }

}
Java
@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory mongoDbFactory;

    @Bean
    @ServiceActivator(inputChannel = "requestChannel")
    public MessageHandler mongoDbOutboundGateway() {
        MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
        gateway.setCollectionNameExpressionString("'myCollection'");
        gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
        gateway.setExpectSingleResult(true);
        gateway.setEntityClass(Person.class);
        gateway.setOutputChannelName("replyChannel");
        return gateway;
    }

    @Bean
    @ServiceActivator(inputChannel = "replyChannel")
    public MessageHandler handler() {
        return message -> System.out.println(message.getPayload());
    }
}
XML
<int-mongodb:outbound-gateway id="gatewayQuery"
    mongodb-factory="mongoDbFactory"
    mongo-converter="mongoConverter"
    query="{firstName: 'Bob'}"
    collection-name="myCollection"
    request-channel="in"
    reply-channel="out"
    entity-class="org.springframework.integration.mongodb.test.entity$Person"/>

You can use the following attributes with a MongoDB outbound Gateway:

  • collection-name or collection-name-expression: Identifies the name of the MongoDB collection to use.

  • mongo-converter: Reference to an instance of o.s.data.mongodb.core.convert.MongoConverter that assists with converting a raw Java object to a JSON document representation.

  • mongodb-factory: Reference to an instance of o.s.data.mongodb.MongoDbFactory.

  • mongo-template: Reference to an instance of o.s.data.mongodb.core.MongoTemplate. NOTE: you can not set both mongo-template and mongodb-factory.

  • entity-class: The fully qualified name of the entity class to be passed to the find(..) and findOne(..) methods in MongoTemplate. If this attribute is not provided, the default value is org.bson.Document.

  • query or query-expression: Specifies the MongoDB query. See the MongoDB documentation for more query samples.

  • collection-callback: Reference to an instance of org.springframework.data.mongodb.core.CollectionCallback. Preferable an instance of o.s.i.mongodb.outbound.MessageCollectionCallback since 5.0.11 with the request message context. See its Javadocs for more information. NOTE: You can not have both collection-callback and any of the query attributes.

As an alternate to the query and query-expression properties, you can specify other database operations by using the collectionCallback property as a reference to the MessageCollectionCallback functional interface implementation. The following example specifies a count operation:

private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
    return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .collectionCallback((collection, requestMessage) -> collection.count())
            .collectionName("myCollection");
}

MongoDB Reactive Channel Adapters

Starting with version 5.3, the ReactiveMongoDbStoringMessageHandler and ReactiveMongoDbMessageSource implementations are provided. They are based on the ReactiveMongoOperations from Spring Data and requires a org.mongodb:mongodb-driver-reactivestreams dependency.

The ReactiveMongoDbStoringMessageHandler is an implementation of the ReactiveMessageHandler which is supported natively in the framework when reactive streams composition is involved in the integration flow definition. See more information in the ReactiveMessageHandler.

From configuration perspective there is no difference with many other standard channel adapters. For example with Java DSL such a channel adapter could be used like:

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return f -> f
            .channel(MessageChannels.flux())
            .handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}

In this sample we are going to connect to the MongoDb via provided ReactiveMongoDatabaseFactory and store a data from request message into a default collection with the data name. The real operation is going to be performed on-demand from the reactive stream composition in the internally created ReactiveStreamsConsumer.

The ReactiveMongoDbMessageSource is an AbstractMessageSource implementation based on the provided ReactiveMongoDatabaseFactory or ReactiveMongoOperations and MongoDb query (or expression), calls find() or findOne() operation according an expectSingleResult option with an expected entityClass type to convert a query result. A query execution and result evaluation is performed on demand when Publisher (Flux or Mono according expectSingleResult option) in the payload of produced message is subscribed. The framework can subscribe to such a payload automatically (essentially flatMap) when splitter and FluxMessageChannel are used downstream. Otherwise it is target application responsibility to subscribe into a polled publishers in downstream endpoints.

With Java DSL such a channel adapter could be configured like:

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return IntegrationFlows
            .from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
                            .entityClass(Person.class),
                    c -> c.poller(Pollers.fixedDelay(1000)))
            .split()
            .channel(c -> c.flux("output"))
            .get();
}

Starting with version 5.5, the ReactiveMongoDbMessageSource can be configured with an updateExpression. It has the same functionality as the blocking MongoDbMessageSource. See MongoDB Inbound Channel Adapter and AbstractMongoDbMessageSourceSpec JavaDocs for more information.