Message Store
The Enterprise Integration Patterns (EIP) book identifies several patterns that have the ability to buffer messages.
For example, an aggregator buffers messages until they can be released, and a QueueChannel
buffers messages until consumers explicitly receive those messages from that channel.
Because of the failures that can occur at any point within your message flow, EIP components that buffer messages also introduce a point where messages could be lost.
To mitigate the risk of losing messages, EIP defines the message store pattern, which lets EIP components store messages, typically in some type of persistent store (such as an RDBMS).
Spring Integration provides support for the message store pattern by:
-
Defining an
org.springframework.integration.store.MessageStore
strategy interface -
Providing several implementations of this interface
-
Exposing a
message-store
attribute on all components that have the capability to buffer messages so that you can inject any instance that implements theMessageStore
interface.
Details on how to configure a specific message store implementation and how to inject a MessageStore
implementation into a specific buffering component are described throughout the manual (see the specific component, such as QueueChannel, Aggregator, Delayer, and others).
The following pair of examples show how to add a reference to a message store for a QueueChannel
and for an aggregator:
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator … message-store="refToMessageStore"/>
By default, messages are stored in-memory by using o.s.i.store.SimpleMessageStore
, an implementation of MessageStore
.
That might be fine for development or simple low-volume environments where the potential loss of non-persistent messages is not a concern.
However, the typical production application needs a more robust option, not only to mitigate the risk of message loss but also to avoid potential out-of-memory errors.
Therefore, we also provide MessageStore
implementations for a variety of data-stores.
The following is a complete list of supported implementations:
-
JDBC Message Store: Uses an RDBMS to store messages
-
Redis Message Store: Uses a Redis key/value datastore to store messages
-
MongoDB Message Store: Uses a MongoDB document store to store messages
-
Gemfire Message Store: Uses a Gemfire distributed cache to store messages
However, be aware of some limitations while using persistent implementations of the The Message data (payload and headers) is serialized and deserialized by using different serialization strategies, depending on the implementation of the Pay special attention to the headers that represent certain types of data.
For example, if one of the headers contains an instance of some Spring bean, upon deserialization, you may end up with a different instance of that bean, which directly affects some of the implicit headers created by the framework (such as Beginning with Spring Integration version 3.0, you can resolve this issue with a header enricher configured to replace these headers with a name after registering the channel with the Also, consider what happens when you configure a message-flow as follows: gateway → queue-channel (backed by a persistent Message Store) → service-activator.
That gateway creates a temporary reply channel, which is lost by the time the service-activator’s poller reads from the queue.
Again, you can use the header enricher to replace the headers with a For more information, see Header Enricher. |
Spring Integration 4.0 introduced two new interfaces:
-
ChannelMessageStore
: To implement operations specific forQueueChannel
instances -
PriorityCapableChannelMessageStore
: To markMessageStore
implementations to be used forPriorityChannel
instances and to provide priority order for persisted messages.
The real behavior depends on the implementation.
The framework provides the following implementations, which can be used as a persistent MessageStore
for QueueChannel
and PriorityChannel
:
Caution about
SimpleMessageStore Starting with version 4.1, the Users accessing the group store outside of components such as aggregators now get a direct reference to the group being used by the aggregator instead of a copy. Manipulation of the group outside of the aggregator may cause unpredictable results. For this reason, you should either not perform such manipulation or set the |
Using MessageGroupFactory
Starting with version 4.3, some MessageGroupStore
implementations can be injected with a custom MessageGroupFactory
strategy to create and customize the MessageGroup
instances used by the MessageGroupStore
.
This defaults to a SimpleMessageGroupFactory
, which produces SimpleMessageGroup
instances based on the GroupType.HASH_SET
(LinkedHashSet
) internal collection.
Other possible options are SYNCHRONISED_SET
and BLOCKING_QUEUE
, where the last one can be used to reinstate the previous SimpleMessageGroup
behavior.
Also the PERSISTENT
option is available.
See the next section for more information.
Starting with version 5.0.1, the LIST
option is also available for when the order and uniqueness of messages in the group does not matter.
Persistent MessageGroupStore
and Lazy-load
Starting with version 4.3, all persistent MessageGroupStore
instances retrieve MessageGroup
instances and their messages
from the store in the lazy-load manner.
In most cases, it is useful for the correlation MessageHandler
instances (see Aggregator and Resequencer), when it would add overhead to load entire the MessageGroup
from the store on each correlation operation.
You can use the AbstractMessageGroupStore.setLazyLoadMessageGroups(false)
option to switch off the lazy-load behavior from the configuration.
Our performance tests for lazy-load on MongoDB MessageStore
(MongoDB Message Store) and <aggregator>
(Aggregator) use a custom release-strategy
similar to the following:
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
It produces results similar to the following for 1000 simple messages:
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
However starting with version 5.5, all the persistent MessageGroupStore
implementations provide a streamMessagesForGroup(Object groupId)
contract based on the target database streaming API.
This improves resources utilization when groups are very big in the store.
Internally in the framework this new API is used in the Delayer (for example) when it reschedules persisted messages on startup.
A returned Stream<Message<?>>
must be closed in the end of processing, e.g. via auto-close by the try-with-resources
.
Whenever a PersistentMessageGroup
is used, its streamMessages()
delegates to the MessageGroupStore.streamMessagesForGroup()
.