The Enterprise Integration Patterns (EIP) book identifies several patterns that have the ability to buffer messages.
For example, an aggregator buffers messages until they can be released, and a
QueueChannel buffers messages until consumers explicitly receive those messages from that channel.
Because of the failures that can occur at any point within your message flow, EIP components that buffer messages also introduce a point where messages could be lost.
To mitigate the risk of losing messages, EIP defines the message store pattern, which lets EIP components store messages, typically in some type of persistent store (such as an RDBMS).
Spring Integration provides support for the message store pattern by:
Providing several implementations of this interface
message-storeattribute on all components that have the capability to buffer messages so that you can inject any instance that implements the
Details on how to configure a specific message store implementation and how to inject a
MessageStore implementation into a specific buffering component are described throughout the manual (see the specific component, such as QueueChannel, Aggregator, Delayer, and others).
The following pair of examples show how to add a reference to a message store for a
QueueChannel and for an aggregator:
<int:channel id="myQueueChannel"> <int:queue message-store="refToMessageStore"/> <int:channel>
<int:aggregator … message-store="refToMessageStore"/>
By default, messages are stored in-memory by using
o.s.i.store.SimpleMessageStore, an implementation of
That might be fine for development or simple low-volume environments where the potential loss of non-persistent messages is not a concern.
However, the typical production application needs a more robust option, not only to mitigate the risk of message loss but also to avoid potential out-of-memory errors.
Therefore, we also provide
MessageStore implementations for a variety of data-stores.
The following is a complete list of supported implementations:
However, be aware of some limitations while using persistent implementations of the
The Message data (payload and headers) is serialized and deserialized by using different serialization strategies, depending on the implementation of the
Pay special attention to the headers that represent certain types of data.
For example, if one of the headers contains an instance of some Spring bean, upon deserialization, you may end up with a different instance of that bean, which directly affects some of the implicit headers created by the framework (such as
Beginning with Spring Integration version 3.0, you can resolve this issue with a header enricher configured to replace these headers with a name after registering the channel with the
Also, consider what happens when you configure a message-flow as follows: gateway → queue-channel (backed by a persistent Message Store) → service-activator.
That gateway creates a temporary reply channel, which is lost by the time the service-activator’s poller reads from the queue.
Again, you can use the header enricher to replace the headers with a
For more information, see Header Enricher.
Spring Integration 4.0 introduced two new interfaces:
ChannelMessageStore: To implement operations specific for
PriorityCapableChannelMessageStore: To mark
MessageStoreimplementations to be used for
PriorityChannelinstances and to provide priority order for persisted messages.
The real behavior depends on the implementation.
The framework provides the following implementations, which can be used as a persistent
Starting with version 4.1, the
Users accessing the group store outside of components such as aggregators now get a direct reference to the group being used by the aggregator instead of a copy. Manipulation of the group outside of the aggregator may cause unpredictable results.
For this reason, you should either not perform such manipulation or set the
Starting with version 4.3, some
MessageGroupStore implementations can be injected with a custom
MessageGroupFactory strategy to create and customize the
MessageGroup instances used by the
This defaults to a
SimpleMessageGroupFactory, which produces
SimpleMessageGroup instances based on the
LinkedHashSet) internal collection.
Other possible options are
BLOCKING_QUEUE, where the last one can be used to reinstate the
PERSISTENT option is available.
See the next section for more information.
Starting with version 5.0.1, the
LIST option is also available for when the order and uniqueness of messages in the group does not matter.
Starting with version 4.3, all persistent
MessageGroupStore instances retrieve
MessageGroup instances and their
from the store in the lazy-load manner.
In most cases, it is useful for the correlation
MessageHandler instances (see Aggregator and Resequencer),
when it would add overhead to load entire the
MessageGroup from the store on each correlation operation.
You can use the
AbstractMessageGroupStore.setLazyLoadMessageGroups(false) option to switch off the lazy-load behavior from the configuration.
<int:aggregator input-channel="inputChannel" output-channel="outputChannel" message-store="mongoStore" release-strategy-expression="size() == 1000"/>
It produces results similar to the following for 1000 simple messages:
... StopWatch 'Lazy-Load Performance': running time (millis) = 38918 ----------------------------------------- ms % Task name ----------------------------------------- 02652 007% Lazy-Load 36266 093% Eager ...