Version 2.1 introduced support for MongoDB: a "high-performance, open source, document-oriented database
".
This support comes in the form of a MongoDB-based MessageStore
.
To download, install, and run MongoDB, see the MongoDB documentation.
To begin interacting with MongoDB, you first need to connect to it.
Spring Integration builds on the support provided by another Spring project, Spring Data MongoDB.
It provides a factory class called MongoDbFactory
, which simplifies integration with the MongoDB Client API.
To connect to MongoDB you can use an implementation of the MongoDbFactory
interface.
The following listing shows the MongoDbFactory
interface:
public interface MongoDbFactory { /** * Creates a default {@link DB} instance. * * @return the DB instance * @throws DataAccessException */ DB getDb() throws DataAccessException; /** * Creates a {@link DB} instance to access the database with the given name. * * @param dbName must not be {@literal null} or empty. * * @return the DB instance * @throws DataAccessException */ DB getDb(String dbName) throws DataAccessException; }
The following example shows how to use SimpleMongoDbFactory
, the out-of-the-box implementation, in Java:
MongoDbFactory mongoDbFactory = new SimpleMongoDbFactory(new Mongo(), "test");
The following example shows how to use SimpleMongoDbFactory
in XML configuration:
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoDbFactory"> <constructor-arg> <bean class="com.mongodb.Mongo"/> </constructor-arg> <constructor-arg value="test"/> </bean>
SimpleMongoDbFactory
takes two arguments: a Mongo
instance and a String
that specifies the name of the database.
If you need to configure properties such as host
, port
, and others, you can pass those by using one of the constructors provided by the underlying Mongo
class.
For more information on how to configure MongoDB, see the Spring-Data-MongoDB reference.
As described in the Enterprise Integration Patterns (EIP) book, a Message Store lets you persist messages.
Doing so can be useful when dealing with components that have the ability to buffer messages (QueueChannel
, aggregator
, resequencer
, and others.) if reliability is a concern.
In Spring Integration, the MessageStore
strategy also provides the foundation for the claim check pattern, which is described in EIP as well.
Spring Integration’s MongoDB module provides the MongoDbMessageStore
, which is an implementation of both the MessageStore
strategy (mainly used by the claim check pattern) and the MessageGroupStore
strategy (mainly used by the aggregator and resequencer patterns).
The following example configures a MongoDbMessageStore
to use a QueueChannel
and an aggregator
:
<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore"> <constructor-arg ref="mongoDbFactory"/> </bean> <int:channel id="somePersistentQueueChannel"> <int:queue message-store="mongoDbMessageStore"/> <int:channel> <int:aggregator input-channel="inputChannel" output-channel="outputChannel" message-store="mongoDbMessageStore"/>
The preceding example is a simple bean configuration, and it expects a MongoDbFactory
as a constructor argument.
The MongoDbMessageStore
expands the Message
as a Mongo document with all nested properties by using the Spring Data Mongo mapping mechanism.
It is useful when you need to have access to the payload
or headers
for auditing or analytics — for example, against stored messages.
Important | |
---|---|
The |
Spring Integration 3.0 introduced the ConfigurableMongoDbMessageStore
. It implements both the MessageStore
and MessageGroupStore
interfaces.
This class can receive, as a constructor argument, a MongoTemplate
, with which you can, for example, configure a custom WriteConcern
.
Another constructor requires a MappingMongoConverter
and a MongoDbFactory
, which lets you provide some custom conversions for Message
instances and their properties.
Note that, by default, the ConfigurableMongoDbMessageStore
uses standard Java serialization to write and read Message
instances to and from MongoDB (see MongoDbMessageBytesConverter
) and relies on default values for other properties from MongoTemplate
.
It builds a MongoTemplate
from the provided MongoDbFactory
and MappingMongoConverter
.
The default name for the collection stored by the ConfigurableMongoDbMessageStore
is configurableStoreMessages
.
We recommend using this implementation to create robust and flexible solutions when messages contain complex data types.
Version 4.0 introduced the new MongoDbChannelMessageStore
.
It is an optimized MessageGroupStore
for use in QueueChannel
instances.
With priorityEnabled = true
, you can use it in <int:priority-queue>
instances to achieve priority-order polling for persisted messages.
The priority MongoDB document field is populated from the IntegrationMessageHeaderAccessor.PRIORITY
(priority
) message header.
In addition, all MongoDB MessageStore
instances now have a sequence
field for MessageGroup
documents.
The sequence
value is the result of an $inc
operation for a simple sequence
document from the same collection, which is created on demand.
The sequence
field is used in poll
operations to provide first-in-first-out (FIFO) message order (within priority, if configured) when messages are stored within the same millisecond.
Note | |
---|---|
We do not recommend using the same |
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore"> <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/> </bean> <int:channel id="queueChannel"> <int:queue message-store="store"/> </int:channel> <bean id="priorityStore" parent="channelStore"> <property name="priorityEnabled" value="true"/> </bean> <int:channel id="priorityChannel"> <int:priority-queue message-store="priorityStore"/> </int:channel>
Spring Integration 4.2 introduced a new MongoDB-based MetadataStore
(see Section 12.5, “Metadata Store”) implementation.
You can use the MongoDbMetadataStore
to maintain metadata state across application restarts.
You can use this new MetadataStore
implementation with adapters such as:
To instruct these adapters to use the new MongoDbMetadataStore
, declare a Spring bean with a bean name of metadataStore
.
The feed inbound channel adapter automatically picks up and use the declared MongoDbMetadataStore
.
The following example shows how to declare a bean with a name of metadataStore
:
@Bean public MetadataStore metadataStore(MongoDbFactory factory) { return new MongoDbMetadataStore(factory, "integrationMetadataStore"); }
The MongoDbMetadataStore
also implements ConcurrentMetadataStore
, letting it be reliably shared across multiple application instances, where only one instance is allowed to store or modify a key’s value.
All these operations are atomic, thanks to MongoDB guarantees.
The MongoDB inbound channel adapter is a polling consumer that reads data from MongoDB and sends it as a Message
payload.
The following example shows how to configure a MongoDB inbound channel adapter:
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter" channel="replyChannel" query="{'name' : 'Bob'}" entity-class="java.lang.Object" auto-startup="false"> <int:poller fixed-rate="100"/> </int-mongodb:inbound-channel-adapter>
As the preceding configuration shows, you configure a MongoDb inbound channel adapter by using the inbound-channel-adapter
element and providing values for various attributes, such as:
query
: A JSON query (see MongoDB Querying)
query-expression
: A SpEL expression that is evaluated to a JSON query string (as the query
attribute above) or to an instance of o.s.data.mongodb.core.query.Query
. Mutually exclusive with the query
attribute.
entity-class
: The type of the payload object. If not supplied, a com.mongodb.DBObject
is returned.
collection-name
or collection-name-expression
: Identifies the name of the MongoDB collection to use.
mongodb-factory
: Reference to an instance of o.s.data.mongodb.MongoDbFactory
mongo-template
: Reference to an instance of o.s.data.mongodb.core.MongoTemplate
Note | |
---|---|
You cannot set both |
The preceding example is relatively simple and static, since it has a literal value for the query
and uses the default name for a collection
.
Sometimes, you may need to change those values at runtime, based on some condition.
To do so, use their -expression
equivalents (query-expression
and collection-name-expression
), where the provided expression can be any valid SpEL expression.
Also, you may wish to do some post-processing to the successfully processed data that was read from the MongoDB. For example; you may want to move or remove a document after it has been processed. You can do so by using that transaction synchronization feature Spring Integration 2.2 added, as the following example shows:
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter" channel="replyChannel" query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)" entity-class="java.lang.Object" auto-startup="false"> <int:poller fixed-rate="200" max-messages-per-poll="1"> <int:transactional synchronization-factory="syncFactory"/> </int:poller> </int-mongodb:inbound-channel-adapter> <int:transaction-synchronization-factory id="syncFactory"> <int:after-commit expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)" channe="someChannel"/> </int:transaction-synchronization-factory> <bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/> <bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
The following example shows the DocumentCleaner
referenced in the preceding example:
public class DocumentCleaner { public void remove(MongoOperations mongoOperations, Object target, String collectionName) { if (target instanceof List<?>){ List<?> documents = (List<?>) target; for (Object document : documents) { mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName); } } } }
You can declare your poller to be transactional by using the transactional
element.
This element can reference a real transaction manager (for example, if some other part of your flow invokes JDBC).
If you do not have a "real
" transaction, you can use an instance of o.s.i.transaction.PseudoTransactionManager
, which is an implementation of Spring’s PlatformTransactionManager
and enables the use of the transaction synchronization features of the Mongo adapter when there is no actual transaction.
Important | |
---|---|
Doing so does not make MongoDB itself transactional. It lets the synchronization of actions be taken before or after success (commit) or after failure (rollback). |
Once your poller is transactional, you can set an instance of the o.s.i.transaction.TransactionSynchronizationFactory
on the transactional
element.
TransactionSynchronizationFactory
creates an instance of the TransactionSynchronization
.
For your convenience, we have exposed a default SpEL-based TransactionSynchronizationFactory
that lets you configure SpEL expressions, with their execution being coordinated (synchronized) with a transaction.
Expressions for before-commit, after-commit, and after-rollback events are supported, together with a channel for each event where the evaluation result (if any) is sent.
For each child element, you can specify expression
and channel
attributes.
If only the channel
attribute is present, the received message is sent there as part of the particular synchronization scenario.
If only the expression
attribute is present and the result of an expression is a non-null value, a message with the result as the payload is generated and sent to a default channel (NullChannel
) and appears in the logs (on the DEBUG
level).
If you want the evaluation result to go to a specific channel, add a channel
attribute.
If the result of an expression is null or void, no message is generated.
For more information about transaction synchronization, see Section C.3, “Transaction Synchronization”.
The MongoDB outbound channel adapter lets you write the message payload to a MongoDB document store, as the following example shows:
<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression" collection-name="myCollection" mongo-converter="mongoConverter" mongodb-factory="mongoDbFactory" />
As the preceding configuration shows, you can configure a MongoDB outbound channel adapter by using the outbound-channel-adapter
element, providing values for various attributes, such as:
collection-name
or collection-name-expression
: Identifies the name of the MongoDb collection to use.
mongo-converter
: Reference to an instance of o.s.data.mongodb.core.convert.MongoConverter
that assists with converting a raw Java object to a JSON document representation.
mongodb-factory
: Reference to an instance of o.s.data.mongodb.MongoDbFactory
.
mongo-template
: Reference to an instance of o.s.data.mongodb.core.MongoTemplate
.
NOTE: you can not have both mongo-template and mongodb-factory set.
The preceding example is relatively simple and static, since it has a literal value for the collection-name
.
Sometimes, you may need to change this value at runtime, based on some condition.
To do that, use collection-name-expression
, where the provided expression is any valid SpEL expression.
Version 5.0 introduced the MongoDB outbound gateway. It allows you query a database by sending a message to its request channel. The gateway thens send the response to the reply channel. You can use the message payload and headers to specify the query and the collection name, as the following example shows:
<int-mongodb:outbound-gateway id="gatewayQuery" mongodb-factory="mongoDbFactory" mongo-converter="mongoConverter" query="{firstName: 'Bob'}" collection-name="myCollection" request-channel="in" reply-channel="out" entity-class="org.springframework.integration.mongodb.test.entity$Person"/>
You can use the following attributes with a MongoDB outbound Gateway:
collection-name
or collection-name-expression
: Identifies the name of the MongoDB collection to use.
mongo-converter
: Reference to an instance of o.s.data.mongodb.core.convert.MongoConverter
that assists with converting a raw Java object to a JSON document representation.
mongodb-factory
: Reference to an instance of o.s.data.mongodb.MongoDbFactory
.
mongo-template
: Reference to an instance of o.s.data.mongodb.core.MongoTemplate
.
NOTE: you can not set both mongo-template
and mongodb-factory
.
entity-class
: The fully qualified name of the entity class to be passed to the find(..)
and findOne(..)
methods in MongoTemplate.
If this attribute is not provided, the default value is org.bson.Document
.
query
or query-expression
: Specifies the MongoDB query.
See the MongoDB documentation for more query samples.
collection-callback
: Reference to an instance of org.springframework.data.mongodb.core.CollectionCallback
.
NOTE: You can not have both collection-callback
and any of the query attributes.
The following Spring Boot application shows an example of how to configure the outbound gateway with Java configuration:
@SpringBootApplication public class MongoDbJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MongoDbJavaApplication.class) .web(false) .run(args); } @Autowired private MongoDbFactory mongoDbFactory; @Bean @ServiceActivator(inputChannel = "requestChannel") public MessageHandler mongoDbOutboundGateway() { MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory); gateway.setCollectionNameExpressionString("'myCollection'"); gateway.setQueryExpressionString("'{''name'' : ''Bob''}'"); gateway.setExpectSingleResult(true); gateway.setEntityClass(Person.class); gateway.setOutputChannelName("replyChannel"); return gateway; } @Bean @ServiceActivator(inputChannel = "replyChannel") public MessageHandler handler() { return message -> System.out.println(message.getPayload()); } }
The following Spring Boot application show an example of how to configure the outbound gateway with the Java DSL:
@SpringBootApplication public class MongoDbJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MongoDbJavaApplication.class) .web(false) .run(args); } @Autowired private MongoDbFactory; @Autowired private MongoConverter; @Bean public IntegrationFlow gatewaySingleQueryFlow() { return f -> f .handle(queryOutboundGateway()) .channel(c -> c.queue("retrieveResults")); } private MongoDbOutboundGatewaySpec queryOutboundGateway() { return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter) .query("{name : 'Bob'}") .collectionNameFunction(m -> m.getHeaders().get("collection")) .expectSingleResult(true) .entityClass(Person.class); } }
As an alternate to the query
and query-expression
properties, you can specify other database operations by using the collectionCallback
property.
The following example specifies a count operation:
private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() { return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter) .collectionCallback(MongoCollection::count) .collectionName("myCollection"); }