Message Store
The Enterprise Integration Patterns (EIP) book identifies several patterns that have the ability to buffer messages.
For example, an aggregator buffers messages until they can be released, and a QueueChannel
buffers messages until consumers explicitly receive those messages from that channel.
Because of the failures that can occur at any point within your message flow, EIP components that buffer messages also introduce a point where messages could be lost.
To mitigate the risk of losing messages, EIP defines the message store pattern, which lets EIP components store messages, typically in some type of persistent store (such as an RDBMS).
Spring Integration provides support for the message store pattern by:
-
Defining an
org.springframework.integration.store.MessageStore
strategy interface -
Providing several implementations of this interface
-
Exposing a
message-store
attribute on all components that have the capability to buffer messages so that you can inject any instance that implements theMessageStore
interface.
Details on how to configure a specific message store implementation and how to inject a MessageStore
implementation into a specific buffering component are described throughout the manual (see the specific component, such as QueueChannel, Aggregator, Delayer, and others).
The following pair of examples show how to add a reference to a message store for a QueueChannel
and for an aggregator:
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator message-store="refToMessageStore"/>
By default, messages are stored in-memory by using o.s.i.store.SimpleMessageStore
, an implementation of MessageStore
.
That might be fine for development or simple low-volume environments where the potential loss of non-persistent messages is not a concern.
However, the typical production application needs a more robust option, not only to mitigate the risk of message loss but also to avoid potential out-of-memory errors.
Therefore, we also provide MessageStore
implementations for a variety of data-stores.
The following is a complete list of supported implementations:
-
Hazelcast Message Store: Uses a Hazelcast distributed cache to store messages
-
JDBC Message Store: Uses an RDBMS to store messages
-
Redis Message Store: Uses a Redis key/value datastore to store messages
-
MongoDB Message Store: Uses a MongoDB document store to store messages
However, be aware of some limitations while using persistent implementations of the The Message data (payload and headers) is serialized and deserialized by using different serialization strategies, depending on the implementation of the Pay special attention to the headers that represent certain types of data.
For example, if one of the headers contains an instance of some Spring bean, upon deserialization, you may end up with a different instance of that bean, which directly affects some of the implicit headers created by the framework (such as Beginning with Spring Integration version 3.0, you can resolve this issue with a header enricher configured to replace these headers with a name after registering the channel with the Also, consider what happens when you configure a message-flow as follows: gateway → queue-channel (backed by a persistent Message Store) → service-activator.
That gateway creates a temporary reply channel, which is lost by the time the service-activator’s poller reads from the queue.
Again, you can use the header enricher to replace the headers with a For more information, see Header Enricher. |
Spring Integration 4.0 introduced two new interfaces:
-
ChannelMessageStore
: To implement operations specific forQueueChannel
instances -
PriorityCapableChannelMessageStore
: To markMessageStore
implementations to be used forPriorityChannel
instances and to provide priority order for persisted messages.
The real behavior depends on the implementation.
The framework provides the following implementations, which can be used as a persistent MessageStore
for QueueChannel
and PriorityChannel
:
Caution about
SimpleMessageStore Starting with version 4.1, the Users accessing the group store outside of components such as aggregators now get a direct reference to the group being used by the aggregator instead of a copy. Manipulation of the group outside the aggregator may cause unpredictable results. For this reason, you should either not perform such manipulation or set the |
Using MessageGroupFactory
Starting with version 4.3, some MessageGroupStore
implementations can be injected with a custom MessageGroupFactory
strategy to create and customize the MessageGroup
instances used by the MessageGroupStore
.
This defaults to a SimpleMessageGroupFactory
, which produces SimpleMessageGroup
instances based on the GroupType.HASH_SET
(LinkedHashSet
) internal collection.
Other possible options are SYNCHRONISED_SET
and BLOCKING_QUEUE
, where the last one can be used to reinstate the previous SimpleMessageGroup
behavior.
Also, the PERSISTENT
option is available.
See the next section for more information.
Starting with version 5.0.1, the LIST
option is also available for when the order and uniqueness of messages in the group does not matter.
Persistent MessageGroupStore
and Lazy-load
Starting with version 4.3, all persistent MessageGroupStore
instances retrieve MessageGroup
instances and their messages
from the store in the lazy-load manner.
In most cases, it is useful for the correlation MessageHandler
instances (see Aggregator and Resequencer), when it would add overhead to load entire the MessageGroup
from the store on each correlation operation.
You can use the AbstractMessageGroupStore.setLazyLoadMessageGroups(false)
option to switch off the lazy-load behavior from the configuration.
Our performance tests for lazy-load on MongoDB MessageStore
(MongoDB Message Store) and <aggregator>
(Aggregator) use a custom release-strategy
similar to the following:
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
It produces results similar to the following for 1000 simple messages:
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
However, starting with version 5.5, all the persistent MessageGroupStore
implementations provide a streamMessagesForGroup(Object groupId)
contract based on the target database streaming API.
This improves resources utilization when groups are very big in the store.
Internally in the framework this new API is used in the Delayer (for example) when it reschedules persisted messages on startup.
A returned Stream<Message<?>>
must be closed in the end of processing, e.g. via auto-close by the try-with-resources
.
Whenever a PersistentMessageGroup
is used, its streamMessages()
delegates to the MessageGroupStore.streamMessagesForGroup()
.
Message Group Condition
Starting with version 5.5, the MessageGroup
abstraction provides a condition
string option.
The value of this option can be anything that could be parsed later on for any reason to make a decision for the group.
For example a ReleaseStrategy
from a correlation message handler may consult this property from the group instead of iterating all the messages in the group.
The MessageGroupStore
exposes a setGroupCondition(Object groupId, String condition)
API.
For this purpose a setGroupConditionSupplier(BiFunction<Message<?>, String, String>)
option has been added to the AbstractCorrelatingMessageHandler
.
This function is evaluated against each message after it has been added to the group as well as the existing condition of the group.
The implementation may decide to return a new value, the existing value, or reset the target condition to null
.
The value for a condition
can be a JSON, SpEL expression, number or anything what can be serialized as a string and parsed afterward.
For example, the FileMarkerReleaseStrategy
from the File Aggregator component, populates a condition into a group from the FileHeaders.LINE_COUNT
header of the FileSplitter.FileMarker.Mark.END
message and consults with it from its canRelease()
comparing a group size with the value in this condition.
This way it doesn’t iterate all the messages in group to find a FileSplitter.FileMarker.Mark.END
message with the FileHeaders.LINE_COUNT
header.
It also allows the end marker to arrive at the aggregator before all the other records; for example when processing a file in a multi-threaded environment.
In addition, for configuration convenience, a GroupConditionProvider
contract has been introduced.
The AbstractCorrelatingMessageHandler
checks if the provided ReleaseStrategy
implements this interface and extracts a conditionSupplier
for group condition evaluation logic.