The Enterprise Integration Patterns (EIP) book identifies several patterns that have the ability to buffer messages.
For example, an aggregator buffers messages until they can be released, and a
QueueChannel buffers messages until consumers explicitly receive those messages from that channel.
Because of the failures that can occur at any point within your message flow, EIP components that buffer messages also introduce a point where messages could be lost.
To mitigate the risk of losing messages, EIP defines the message store pattern, which lets EIP components store messages, typically in some type of persistent store (such as an RDBMS).
Spring Integration provides support for the message store pattern by:
Providing several implementations of this interface
message-storeattribute on all components that have the capability to buffer messages so that you can inject any instance that implements the
Details on how to configure a specific message store implementation and how to inject a
MessageStore implementation into a specific buffering component are described throughout the manual (see the specific component, such as QueueChannel, Aggregator, Delayer, and others).
The following pair of examples show how to add a reference to a message store for a
QueueChannel and for an aggregator:
<int:channel id="myQueueChannel"> <int:queue message-store="refToMessageStore"/> <int:channel>
<int:aggregator … message-store="refToMessageStore"/>
By default, messages are stored in-memory by using
o.s.i.store.SimpleMessageStore, an implementation of
That might be fine for development or simple low-volume environments where the potential loss of non-persistent messages is not a concern.
However, the typical production application needs a more robust option, not only to mitigate the risk of message loss but also to avoid potential out-of-memory errors.
Therefore, we also provide
MessageStore implementations for a variety of data-stores.
The following is a complete list of supported implementations:
Hazelcast Message Store: Uses a Hazelcast distributed cache to store messages
JDBC Message Store: Uses an RDBMS to store messages
Redis Message Store: Uses a Redis key/value datastore to store messages
MongoDB Message Store: Uses a MongoDB document store to store messages
However, be aware of some limitations while using persistent implementations of the
The Message data (payload and headers) is serialized and deserialized by using different serialization strategies, depending on the implementation of the
Pay special attention to the headers that represent certain types of data.
For example, if one of the headers contains an instance of some Spring bean, upon deserialization, you may end up with a different instance of that bean, which directly affects some of the implicit headers created by the framework (such as
Beginning with Spring Integration version 3.0, you can resolve this issue with a header enricher configured to replace these headers with a name after registering the channel with the
Also, consider what happens when you configure a message-flow as follows: gateway → queue-channel (backed by a persistent Message Store) → service-activator.
That gateway creates a temporary reply channel, which is lost by the time the service-activator’s poller reads from the queue.
Again, you can use the header enricher to replace the headers with a
For more information, see Header Enricher.
Spring Integration 4.0 introduced two new interfaces:
ChannelMessageStore: To implement operations specific for
PriorityCapableChannelMessageStore: To mark
MessageStoreimplementations to be used for
PriorityChannelinstances and to provide priority order for persisted messages.
The real behavior depends on the implementation.
The framework provides the following implementations, which can be used as a persistent
Starting with version 4.1, the
Users accessing the group store outside of components such as aggregators now get a direct reference to the group being used by the aggregator instead of a copy. Manipulation of the group outside the aggregator may cause unpredictable results.
For this reason, you should either not perform such manipulation or set the
Starting with version 4.3, some
MessageGroupStore implementations can be injected with a custom
MessageGroupFactory strategy to create and customize the
MessageGroup instances used by the
This defaults to a
SimpleMessageGroupFactory, which produces
SimpleMessageGroup instances based on the
LinkedHashSet) internal collection.
Other possible options are
BLOCKING_QUEUE, where the last one can be used to reinstate the previous
PERSISTENT option is available.
See the next section for more information.
Starting with version 5.0.1, the
LIST option is also available for when the order and uniqueness of messages in the group does not matter.
MessageGroupStore and Lazy-load
Starting with version 4.3, all persistent
MessageGroupStore instances retrieve
MessageGroup instances and their
messages from the store in the lazy-load manner.
In most cases, it is useful for the correlation
MessageHandler instances (see Aggregator and Resequencer), when it would add overhead to load entire the
MessageGroup from the store on each correlation operation.
You can use the
AbstractMessageGroupStore.setLazyLoadMessageGroups(false) option to switch off the lazy-load behavior from the configuration.
Our performance tests for lazy-load on MongoDB
MessageStore (MongoDB Message Store) and
<aggregator> (Aggregator) use a custom
release-strategy similar to the following:
<int:aggregator input-channel="inputChannel" output-channel="outputChannel" message-store="mongoStore" release-strategy-expression="size() == 1000"/>
It produces results similar to the following for 1000 simple messages:
... StopWatch 'Lazy-Load Performance': running time (millis) = 38918 ----------------------------------------- ms % Task name ----------------------------------------- 02652 007% Lazy-Load 36266 093% Eager ...
However, starting with version 5.5, all the persistent
MessageGroupStore implementations provide a
streamMessagesForGroup(Object groupId) contract based on the target database streaming API.
This improves resources utilization when groups are very big in the store.
Internally in the framework this new API is used in the Delayer (for example) when it reschedules persisted messages on startup.
Stream<Message<?>> must be closed in the end of processing, e.g. via auto-close by the
PersistentMessageGroup is used, its
streamMessages() delegates to the
Message Group Condition
Starting with version 5.5, the
MessageGroup abstraction provides a
condition string option.
The value of this option can be anything that could be parsed later on for any reason to make a decision for the group.
For example a
ReleaseStrategy from a correlation message handler may consult this property from the group instead of iterating all the messages in the group.
MessageGroupStore exposes a
setGroupCondition(Object groupId, String condition) API.
For this purpose a
setGroupConditionSupplier(BiFunction<Message<?>, String, String>) option has been added to the
This function is evaluated against each message after it has been added to the group as well as the existing condition of the group.
The implementation may decide to return a new value, the existing value, or reset the target condition to
The value for a
condition can be a JSON, SpEL expression, number or anything what can be serialized as a string and parsed afterwards.
For example, the
FileMarkerReleaseStrategy from the File Aggregator component, populates a condition into a group from the
FileHeaders.LINE_COUNT header of the
FileSplitter.FileMarker.Mark.END message and consults with it from its
canRelease() comparing a group size with the value in this condition.
This way it doesn’t iterate all the messages in group to find a
FileSplitter.FileMarker.Mark.END message with the
It also allows the end marker to arrive at the aggregator before all the other records; for example when processing a file in a multi-threaded environment.
In addition, for configuration convenience, a
GroupConditionProvider contract has been introduced.
AbstractCorrelatingMessageHandler checks if the provided
ReleaseStrategy implements this interface and extracts a
conditionSupplier for group condition evaluation logic.