Message Store

The Enterprise Integration Patterns (EIP) book identifies several patterns that have the ability to buffer messages. For example, an aggregator buffers messages until they can be released, and a QueueChannel buffers messages until consumers explicitly receive those messages from that channel. Because of the failures that can occur at any point within your message flow, EIP components that buffer messages also introduce a point where messages could be lost.

To mitigate the risk of losing messages, EIP defines the message store pattern, which lets EIP components store messages, typically in some type of persistent store (such as an RDBMS).

Spring Integration provides support for the message store pattern by:

  • Defining an strategy interface

  • Providing several implementations of this interface

  • Exposing a message-store attribute on all components that have the capability to buffer messages so that you can inject any instance that implements the MessageStore interface.

Details on how to configure a specific message store implementation and how to inject a MessageStore implementation into a specific buffering component are described throughout the manual (see the specific component, such as QueueChannel, Aggregator, Delayer, and others). The following pair of examples show how to add a reference to a message store for a QueueChannel and for an aggregator:

Example 1. QueueChannel
<int:channel id="myQueueChannel">
    <int:queue message-store="refToMessageStore"/>
Example 2. Aggregator
<int:aggregator … message-store="refToMessageStore"/>

By default, messages are stored in-memory by using, an implementation of MessageStore. That might be fine for development or simple low-volume environments where the potential loss of non-persistent messages is not a concern. However, the typical production application needs a more robust option, not only to mitigate the risk of message loss but also to avoid potential out-of-memory errors. Therefore, we also provide MessageStore implementations for a variety of data-stores. The following is a complete list of supported implementations:

However, be aware of some limitations while using persistent implementations of the MessageStore.

The Message data (payload and headers) is serialized and deserialized by using different serialization strategies, depending on the implementation of the MessageStore. For example, when using JdbcMessageStore, only Serializable data is persisted by default. In this case, non-Serializable headers are removed before serialization occurs. Also, be aware of the protocol-specific headers that are injected by transport adapters (such as FTP, HTTP, JMS, and others). For example, <http:inbound-channel-adapter/> maps HTTP headers into message headers, and one of them is an ArrayList of non-serializable org.springframework.http.MediaType instances. However, you can inject your own implementation of the Serializer and Deserializer strategy interfaces into some MessageStore implementations (such as JdbcMessageStore) to change the behavior of serialization and deserialization.

Pay special attention to the headers that represent certain types of data. For example, if one of the headers contains an instance of some Spring bean, upon deserialization, you may end up with a different instance of that bean, which directly affects some of the implicit headers created by the framework (such as REPLY_CHANNEL or ERROR_CHANNEL). Currently, they are not serializable, but, even if they were, the deserialized channel would not represent the expected instance.

Beginning with Spring Integration version 3.0, you can resolve this issue with a header enricher configured to replace these headers with a name after registering the channel with the HeaderChannelRegistry.

Also, consider what happens when you configure a message-flow as follows: gateway → queue-channel (backed by a persistent Message Store) → service-activator. That gateway creates a temporary reply channel, which is lost by the time the service-activator’s poller reads from the queue. Again, you can use the header enricher to replace the headers with a String representation.

For more information, see Header Enricher.

Spring Integration 4.0 introduced two new interfaces:

  • ChannelMessageStore: To implement operations specific for QueueChannel instances

  • PriorityCapableChannelMessageStore: To mark MessageStore implementations to be used for PriorityChannel instances and to provide priority order for persisted messages.

The real behavior depends on the implementation. The framework provides the following implementations, which can be used as a persistent MessageStore for QueueChannel and PriorityChannel:

Caution about SimpleMessageStore

Starting with version 4.1, the SimpleMessageStore no longer copies the message group when calling getMessageGroup(). For large message groups, this was a significant performance problem. 4.0.1 introduced a boolean copyOnGet property that lets you control this behavior. When used internally by the aggregator, this property was set to false to improve performance. It is now false by default.

Users accessing the group store outside of components such as aggregators now get a direct reference to the group being used by the aggregator instead of a copy. Manipulation of the group outside of the aggregator may cause unpredictable results.

For this reason, you should either not perform such manipulation or set the copyOnGet property to true.

Using MessageGroupFactory

Starting with version 4.3, some MessageGroupStore implementations can be injected with a custom MessageGroupFactory strategy to create and customize the MessageGroup instances used by the MessageGroupStore. This defaults to a SimpleMessageGroupFactory, which produces SimpleMessageGroup instances based on the GroupType.HASH_SET (LinkedHashSet) internal collection. Other possible options are SYNCHRONISED_SET and BLOCKING_QUEUE, where the last one can be used to reinstate the previous SimpleMessageGroup behavior. Also the PERSISTENT option is available. See the next section for more information. Starting with version 5.0.1, the LIST option is also available for when the order and uniqueness of messages in the group does not matter.

Persistent MessageGroupStore and Lazy-load

Starting with version 4.3, all persistent MessageGroupStore instances retrieve MessageGroup instances and their messages from the store in the lazy-load manner. In most cases, it is useful for the correlation MessageHandler instances (see Aggregator and Resequencer), when it would add overhead to load entire the MessageGroup from the store on each correlation operation.

You can use the AbstractMessageGroupStore.setLazyLoadMessageGroups(false) option to switch off the lazy-load behavior from the configuration.

Our performance tests for lazy-load on MongoDB MessageStore (MongoDB Message Store) and <aggregator> (Aggregator) use a custom release-strategy similar to the following:

<int:aggregator input-channel="inputChannel"
                release-strategy-expression="size() == 1000"/>

It produces results similar to the following for 1000 simple messages:

StopWatch 'Lazy-Load Performance': running time (millis) = 38918
ms     %     Task name
02652  007%  Lazy-Load
36266  093%  Eager

However starting with version 5.5, all the persistent MessageGroupStore implementations provide a streamMessagesForGroup(Object groupId) contract based on the target database streaming API. This improves resources utilization when groups are very big in the store. Internally in the framework this new API is used in the Delayer (for example) when it reschedules persisted messages on startup. A returned Stream<Message<?>> must be closed in the end of processing, e.g. via auto-close by the try-with-resources. Whenever a PersistentMessageGroup is used, its streamMessages() delegates to the MessageGroupStore.streamMessagesForGroup().

Message Group Condition

Starting with version 5.5, the MessageGroup abstraction provides a condition string option. The value of this option can be anything that could be parsed later on for any reason to make a decision for the group. For example a ReleaseStrategy from a correlation message handler may consult this property from the group instead of iterating all the messages in the group. The MessageGroupStore exposes a setGroupCondition(Object groupId, String condition) API. For this purpose a setGroupConditionSupplier(BiFunction<Message<?>, String, String>) option has been added to the AbstractCorrelatingMessageHandler. This function is evaluated against each message after it has been added to the group as well as the existing condition of the group. The implementation may decide to return a new value, the existing value, or reset the target condition to null. The value for a condition can be a JSON, SpEL expression, number or anything what can be serialized as a string and parsed afterwards. For example, the FileMarkerReleaseStrategy from the File Aggregator component, populates a condition into a group from the FileHeaders.LINE_COUNT header of the FileSplitter.FileMarker.Mark.END message and consults with it from its canRelease() comparing a group size with the value in this condition. This way it doesn’t iterate all the messages in group to find a FileSplitter.FileMarker.Mark.END message with the FileHeaders.LINE_COUNT header. It also allows the end marker to arrive at the aggregator before all the other records; for example when processing a file in a multi-threaded environment.

In addition, for configuration convenience, a GroupConditionProvider contract has been introduced. The AbstractCorrelatingMessageHandler checks if the provided ReleaseStrategy implements this interface and extracts a conditionSupplier for group condition evaluation logic.