MongoDb Support
Version 2.1 introduced support for MongoDB: a “high-performance, open source, document-oriented database”.
You need to include this dependency into your project:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mongodb</artifactId>
<version>5.5.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:5.5.5"
To download, install, and run MongoDB, see the MongoDB documentation.
Connecting to MongoDb
Blocking or Reactive?
Beginning with version 5.3, Spring Integration provides support for reactive MongoDB drivers to enable non-blocking I/O when accessing MongoDB. To enable reactive support, add the MongoDB reactive streams driver to your dependencies:
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"
For regular synchronous client you need to add its respective driver into dependencies:
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"
Both of them are optional
in the framework for better end-user choice support.
To begin interacting with MongoDB, you first need to connect to it.
Spring Integration builds on the support provided by another Spring project, Spring Data MongoDB.
It provides factory classes called MongoDatabaseFactory
and ReactiveMongoDatabaseFactory
, which simplify integration with the MongoDB Client API.
Spring Data provides provides the blocking MongoDB driver by default but you may opt-in for reactive usage by including the above dependency. |
Using MongoDatabaseFactory
To connect to MongoDB you can use an implementation of the MongoDatabaseFactory
interface.
The following example shows how to use SimpleMongoClientDatabaseFactory
:
MongoDatabaseFactory mongoDbFactory =
new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
SimpleMongoClientDatabaseFactory
takes two arguments: a MongoClient
instance and a String
that specifies the name of the database.
If you need to configure properties such as host
, port
, and others, you can pass those by using one of the constructors provided by the underlying MongoClients
class.
For more information on how to configure MongoDB, see the Spring-Data-MongoDB reference.
Using ReactiveMongoDatabaseFactory
To connect to MongoDB with the reactive driver, you can use an implementation of the ReactiveMongoDatabaseFactory
interface.
The following example shows how to use SimpleReactiveMongoDatabaseFactory
:
ReactiveMongoDatabaseFactory mongoDbFactory =
new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
MongoDB Message Store
As described in the Enterprise Integration Patterns (EIP) book, a Message Store lets you persist messages.
Doing so can be useful when dealing with components that have the ability to buffer messages (QueueChannel
, aggregator
, resequencer
, and others.) if reliability is a concern.
In Spring Integration, the MessageStore
strategy also provides the foundation for the claim check pattern, which is described in EIP as well.
Spring Integration’s MongoDB module provides the MongoDbMessageStore
, which is an implementation of both the MessageStore
strategy (mainly used by the claim check pattern) and the MessageGroupStore
strategy (mainly used by the aggregator and resequencer patterns).
The following example configures a MongoDbMessageStore
to use a QueueChannel
and an aggregator
:
<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="mongoDbMessageStore"/>
<int:channel>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="mongoDbMessageStore"/>
The preceding example is a simple bean configuration, and it expects a MongoDbFactory
as a constructor argument.
The MongoDbMessageStore
expands the Message
as a Mongo document with all nested properties by using the Spring Data Mongo mapping mechanism.
It is useful when you need to have access to the payload
or headers
for auditing or analytics — for example, against stored messages.
The MongoDbMessageStore uses a custom MappingMongoConverter implementation to store Message instances as MongoDB documents, and there are some limitations for the properties (payload and header values) of the Message .
|
Starting with version 5.1.6, the MongoDbMessageStore
can be configured with custom converters which are propagated into an internal MappingMongoConverter
implementation.
See MongoDbMessageStore.setCustomConverters(Object… customConverters)
JavaDocs for more information.
Spring Integration 3.0 introduced the ConfigurableMongoDbMessageStore
.
It implements both the MessageStore
and MessageGroupStore
interfaces.
This class can receive, as a constructor argument, a MongoTemplate
, with which you can, for example, configure a custom WriteConcern
.
Another constructor requires a MappingMongoConverter
and a MongoDbFactory
, which lets you provide some custom conversions for Message
instances and their properties.
Note that, by default, the ConfigurableMongoDbMessageStore
uses standard Java serialization to write and read Message
instances to and from MongoDB (see MongoDbMessageBytesConverter
) and relies on default values for other properties from MongoTemplate
.
It builds a MongoTemplate
from the provided MongoDbFactory
and MappingMongoConverter
.
The default name for the collection stored by the ConfigurableMongoDbMessageStore
is configurableStoreMessages
.
We recommend using this implementation to create robust and flexible solutions when messages contain complex data types.
MongoDB Channel Message Store
Version 4.0 introduced the new MongoDbChannelMessageStore
.
It is an optimized MessageGroupStore
for use in QueueChannel
instances.
With priorityEnabled = true
, you can use it in <int:priority-queue>
instances to achieve priority-order polling for persisted messages.
The priority MongoDB document field is populated from the IntegrationMessageHeaderAccessor.PRIORITY
(priority
) message header.
In addition, all MongoDB MessageStore
instances now have a sequence
field for MessageGroup
documents.
The sequence
value is the result of an $inc
operation for a simple sequence
document from the same collection, which is created on demand.
The sequence
field is used in poll
operations to provide first-in-first-out (FIFO) message order (within priority, if configured) when messages are stored within the same millisecond.
We do not recommend using the same MongoDbChannelMessageStore bean for priority and non-priority, because the priorityEnabled option applies to the entire store.
However, the same collection can be used for both MongoDbChannelMessageStore types, because message polling from the store is sorted and uses indexes.
To configure that scenario, you can extend one message store bean from the other, as the following example shows:
|
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
<constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="store"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
MongoDB Metadata Store
Spring Integration 4.2 introduced a new MongoDB-based MetadataStore
(see Metadata Store) implementation.
You can use the MongoDbMetadataStore
to maintain metadata state across application restarts.
You can use this new MetadataStore
implementation with adapters such as:
To instruct these adapters to use the new MongoDbMetadataStore
, declare a Spring bean with a bean name of metadataStore
.
The feed inbound channel adapter automatically picks up and use the declared MongoDbMetadataStore
.
The following example shows how to declare a bean with a name of metadataStore
:
@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}
The MongoDbMetadataStore
also implements ConcurrentMetadataStore
, letting it be reliably shared across multiple application instances, where only one instance is allowed to store or modify a key’s value.
All these operations are atomic, thanks to MongoDB guarantees.
MongoDB Inbound Channel Adapter
The MongoDB inbound channel adapter is a polling consumer that reads data from MongoDB and sends it as a Message
payload.
The following example shows how to configure a MongoDB inbound channel adapter:
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query="{'name' : 'Bob'}"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>
As the preceding configuration shows, you configure a MongoDb inbound channel adapter by using the inbound-channel-adapter
element and providing values for various attributes, such as:
-
query
: A JSON query (see MongoDB Querying) -
query-expression
: A SpEL expression that is evaluated to a JSON query string (as thequery
attribute above) or to an instance ofo.s.data.mongodb.core.query.Query
. Mutually exclusive with thequery
attribute. -
entity-class
: The type of the payload object. If not supplied, acom.mongodb.DBObject
is returned. -
collection-name
orcollection-name-expression
: Identifies the name of the MongoDB collection to use. -
mongodb-factory
: Reference to an instance ofo.s.data.mongodb.MongoDbFactory
-
mongo-template
: Reference to an instance ofo.s.data.mongodb.core.MongoTemplate
-
Other attributes that are common across all other inbound adapters (such as 'channel').
You cannot set both mongo-template and mongodb-factory .
|
The preceding example is relatively simple and static, since it has a literal value for the query
and uses the default name for a collection
.
Sometimes, you may need to change those values at runtime, based on some condition.
To do so, use their -expression
equivalents (query-expression
and collection-name-expression
), where the provided expression can be any valid SpEL expression.
Also, you may wish to do some post-processing to the successfully processed data that was read from the MongoDB. For example; you may want to move or remove a document after it has been processed. You can do so by using that transaction synchronization feature Spring Integration 2.2 added, as the following example shows:
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="200" max-messages-per-poll="1">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-mongodb:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit
expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
channel="someChannel"/>
</int:transaction-synchronization-factory>
<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
The following example shows the DocumentCleaner
referenced in the preceding example:
public class DocumentCleaner {
public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
if (target instanceof List<?>){
List<?> documents = (List<?>) target;
for (Object document : documents) {
mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
}
}
}
}
You can declare your poller to be transactional by using the transactional
element.
This element can reference a real transaction manager (for example, if some other part of your flow invokes JDBC).
If you do not have a “real” transaction, you can use an instance of o.s.i.transaction.PseudoTransactionManager
, which is an implementation of Spring’s PlatformTransactionManager
and enables the use of the transaction synchronization features of the Mongo adapter when there is no actual transaction.
Doing so does not make MongoDB itself transactional. It lets the synchronization of actions be taken before or after success (commit) or after failure (rollback). |
Once your poller is transactional, you can set an instance of the o.s.i.transaction.TransactionSynchronizationFactory
on the transactional
element.
A TransactionSynchronizationFactory
creates an instance of the TransactionSynchronization
.
For your convenience, we have exposed a default SpEL-based TransactionSynchronizationFactory
that lets you configure SpEL expressions, with their execution being coordinated (synchronized) with a transaction.
Expressions for before-commit, after-commit, and after-rollback events are supported, together with a channel for each event where the evaluation result (if any) is sent.
For each child element, you can specify expression
and channel
attributes.
If only the channel
attribute is present, the received message is sent there as part of the particular synchronization scenario.
If only the expression
attribute is present and the result of an expression is a non-null value, a message with the result as the payload is generated and sent to a default channel (NullChannel
) and appears in the logs (on the DEBUG
level).
If you want the evaluation result to go to a specific channel, add a channel
attribute.
If the result of an expression is null or void, no message is generated.
For more information about transaction synchronization, see Transaction Synchronization.
Starting with version 5.5, the MongoDbMessageSource
can be configured with an updateExpression
, which must evaluate to a String
with the MongoDb update
syntax or to an org.springframework.data.mongodb.core.query.Update
instance.
It can be used as an alternative to abov described post-processing procedure and it modifies those entities that were fetched from the collection, so they won’t be pulled from the collection again on the next polling cycle (assuming the update changes some value used in the query).
It is still recommended to use transactions to achieve execution isolation and data consistency, when several instances of the MongoDbMessageSource
for the same collection are used in the cluster.
MongoDB Change Stream Inbound Channel Adapter
Starting with version 5.3, the spring-integration-mongodb
module introduces the MongoDbChangeStreamMessageProducer
- a reactive MessageProducerSupport
implementation for the Spring Data ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class)
API.
This component produces a Flux
of messages with a body
of ChangeStreamEvent
as the payload by default and some change stream related headers (see MongoHeaders
).
It is recommended that this MongoDbChangeStreamMessageProducer
is combined with a FluxMessageChannel
as the outputChannel
for on-demand subscription and event consumption downstream.
The Java DSL configuration for this channel adapter may look like this:
@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
return IntegrationFlows.from(
MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
.domainType(Person.class)
.collection("person")
.extractBody(false))
.channel(MessageChannels.flux())
.get();
}
When the MongoDbChangeStreamMessageProducer
is stopped, or the subscription is cancelled downstream, or the MongoDb change stream produces an OperationType.INVALIDATE
, the Publisher
is completed.
The channel adapter can be started again and a new Publisher
of source data is created and it is automatically subscribed in the MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>)
.
This channel adapter can be reconfigured for new options between starts, if there is a requirement to consume change stream events from other places.
See more information about change stream support in Spring Data MongoDb documentation.
MongoDB Outbound Channel Adapter
The MongoDB outbound channel adapter lets you write the message payload to a MongoDB document store, as the following example shows:
<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
collection-name="myCollection"
mongo-converter="mongoConverter"
mongodb-factory="mongoDbFactory" />
As the preceding configuration shows, you can configure a MongoDB outbound channel adapter by using the outbound-channel-adapter
element, providing values for various attributes, such as:
-
collection-name
orcollection-name-expression
: Identifies the name of the MongoDb collection to use. -
mongo-converter
: Reference to an instance ofo.s.data.mongodb.core.convert.MongoConverter
that assists with converting a raw Java object to a JSON document representation. -
mongodb-factory
: Reference to an instance ofo.s.data.mongodb.MongoDbFactory
. -
mongo-template
: Reference to an instance ofo.s.data.mongodb.core.MongoTemplate
. NOTE: you can not have both mongo-template and mongodb-factory set. -
Other attributes that are common across all other inbound adapters (such as 'channel').
The preceding example is relatively simple and static, since it has a literal value for the collection-name
.
Sometimes, you may need to change this value at runtime, based on some condition.
To do that, use collection-name-expression
, where the provided expression is any valid SpEL expression.
MongoDB Outbound Gateway
Version 5.0 introduced the MongoDB outbound gateway. It allows you query a database by sending a message to its request channel. The gateway then send the response to the reply channel. You can use the message payload and headers to specify the query and the collection name, as the following example shows:
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory;
@Autowired
private MongoConverter;
@Bean
public IntegrationFlow gatewaySingleQueryFlow() {
return f -> f
.handle(queryOutboundGateway())
.channel(c -> c.queue("retrieveResults"));
}
private MongoDbOutboundGatewaySpec queryOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction(m -> m.getHeaders().get("collection"))
.expectSingleResult(true)
.entityClass(Person.class);
}
}
class MongoDbKotlinApplication {
fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)
@Autowired
lateinit var mongoDbFactory: MongoDatabaseFactory;
@Autowired
lateinit var mongoConverter: MongoConverter;
@Bean
fun gatewaySingleQueryFlow() =
integrationFlow {
handle(queryOutboundGateway())
channel { queue("retrieveResults") }
}
private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction<Any> { m -> m.headers["collection"] as String }
.expectSingleResult(true)
.entityClass(Person::class.java)
}
}
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory mongoDbFactory;
@Bean
@ServiceActivator(inputChannel = "requestChannel")
public MessageHandler mongoDbOutboundGateway() {
MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
gateway.setCollectionNameExpressionString("'myCollection'");
gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
gateway.setExpectSingleResult(true);
gateway.setEntityClass(Person.class);
gateway.setOutputChannelName("replyChannel");
return gateway;
}
@Bean
@ServiceActivator(inputChannel = "replyChannel")
public MessageHandler handler() {
return message -> System.out.println(message.getPayload());
}
}
<int-mongodb:outbound-gateway id="gatewayQuery"
mongodb-factory="mongoDbFactory"
mongo-converter="mongoConverter"
query="{firstName: 'Bob'}"
collection-name="myCollection"
request-channel="in"
reply-channel="out"
entity-class="org.springframework.integration.mongodb.test.entity$Person"/>
You can use the following attributes with a MongoDB outbound Gateway:
-
collection-name
orcollection-name-expression
: Identifies the name of the MongoDB collection to use. -
mongo-converter
: Reference to an instance ofo.s.data.mongodb.core.convert.MongoConverter
that assists with converting a raw Java object to a JSON document representation. -
mongodb-factory
: Reference to an instance ofo.s.data.mongodb.MongoDbFactory
. -
mongo-template
: Reference to an instance ofo.s.data.mongodb.core.MongoTemplate
. NOTE: you can not set bothmongo-template
andmongodb-factory
. -
entity-class
: The fully qualified name of the entity class to be passed to thefind(..)
andfindOne(..)
methods in MongoTemplate. If this attribute is not provided, the default value isorg.bson.Document
. -
query
orquery-expression
: Specifies the MongoDB query. See the MongoDB documentation for more query samples. -
collection-callback
: Reference to an instance oforg.springframework.data.mongodb.core.CollectionCallback
. Preferable an instance ofo.s.i.mongodb.outbound.MessageCollectionCallback
since 5.0.11 with the request message context. See its Javadocs for more information. NOTE: You can not have bothcollection-callback
and any of the query attributes.
As an alternate to the query
and query-expression
properties, you can specify other database operations by using the collectionCallback
property as a reference to the MessageCollectionCallback
functional interface implementation.
The following example specifies a count operation:
private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.collectionCallback((collection, requestMessage) -> collection.count())
.collectionName("myCollection");
}
MongoDB Reactive Channel Adapters
Starting with version 5.3, the ReactiveMongoDbStoringMessageHandler
and ReactiveMongoDbMessageSource
implementations are provided.
They are based on the ReactiveMongoOperations
from Spring Data and requires a org.mongodb:mongodb-driver-reactivestreams
dependency.
The ReactiveMongoDbStoringMessageHandler
is an implementation of the ReactiveMessageHandler
which is supported natively in the framework when reactive streams composition is involved in the integration flow definition.
See more information in the ReactiveMessageHandler.
From configuration perspective there is no difference with many other standard channel adapters. For example with Java DSL such a channel adapter could be used like:
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return f -> f
.channel(MessageChannels.flux())
.handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}
In this sample we are going to connect to the MongoDb via provided ReactiveMongoDatabaseFactory
and store a data from request message into a default collection with the data
name.
The real operation is going to be performed on-demand from the reactive stream composition in the internally created ReactiveStreamsConsumer
.
The ReactiveMongoDbMessageSource
is an AbstractMessageSource
implementation based on the provided ReactiveMongoDatabaseFactory
or ReactiveMongoOperations
and MongoDb query (or expression), calls find()
or findOne()
operation according an expectSingleResult
option with an expected entityClass
type to convert a query result.
A query execution and result evaluation is performed on demand when Publisher
(Flux
or Mono
according expectSingleResult
option) in the payload of produced message is subscribed.
The framework can subscribe to such a payload automatically (essentially flatMap
) when splitter and FluxMessageChannel
are used downstream.
Otherwise it is target application responsibility to subscribe into a polled publishers in downstream endpoints.
With Java DSL such a channel adapter could be configured like:
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return IntegrationFlows
.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
.entityClass(Person.class),
c -> c.poller(Pollers.fixedDelay(1000)))
.split()
.channel(c -> c.flux("output"))
.get();
}
Starting with version 5.5, the ReactiveMongoDbMessageSource
can be configured with an updateExpression
.
It has the same functionality as the blocking MongoDbMessageSource
.
See MongoDB Inbound Channel Adapter and AbstractMongoDbMessageSourceSpec
JavaDocs for more information.