Resequencer

The resequencer is related to the aggregator but serves a different purpose. While the aggregator combines messages, the resequencer passes messages through without changing them.

Functionality

The resequencer works in a similar way to the aggregator, in the sense that it uses the CORRELATION_ID to store messages in groups. The difference is that the Resequencer does not process the messages in any way. Instead, it releases them in the order of their SEQUENCE_NUMBER header values.

With respect to that, you can opt to release all messages at once (after the whole sequence, according to the SEQUENCE_SIZE, and other possibilities) or as soon as a valid sequence is available. (We cover what we mean by "a valid sequence" later in this chapter.)

The resequencer is intended to resequence relatively short sequences of messages with small gaps. If you have a large number of disjoint sequences with many gaps, you may experience performance issues.

Configuring a Resequencer

See Aggregators and Resequencers for configuring a resequencer in Java DSL.

Configuring a resequencer requires only including the appropriate element in XML.

The following example shows a resequencer configuration:

<int:channel id="inputChannel"/>

<int:channel id="outputChannel"/>

<int:resequencer id="completelyDefinedResequencer"  (1)
  input-channel="inputChannel"  (2)
  output-channel="outputChannel"  (3)
  discard-channel="discardChannel"  (4)
  release-partial-sequences="true"  (5)
  message-store="messageStore"  (6)
  send-partial-result-on-expiry="true"  (7)
  send-timeout="86420000"  (8)
  correlation-strategy="correlationStrategyBean"  (9)
  correlation-strategy-method="correlate"  (10)
  correlation-strategy-expression="headers['something']"  (11)
  release-strategy="releaseStrategyBean"  (12)
  release-strategy-method="release"  (13)
  release-strategy-expression="size() == 10"  (14)
  empty-group-min-timeout="60000"  (15)

  lock-registry="lockRegistry"  (16)

  group-timeout="60000"  (17)
  group-timeout-expression="size() ge 2 ? 100 : -1"  (18)
  scheduler="taskScheduler" />  (19)
  expire-group-upon-timeout="false" />  (20)
1 The id of the resequencer is optional.
2 The input channel of the resequencer. Required.
3 The channel to which the resequencer sends the reordered messages. Optional.
4 The channel to which the resequencer sends the messages that timed out (if send-partial-result-on-timeout is set to false). Optional.
5 Whether to send out ordered sequences as soon as they are available or only after the whole message group arrives. Optional. (The default is false.)
6 A reference to a MessageGroupStore that can be used to store groups of messages under their correlation key until they are complete. Optional. (The default is a volatile in-memory store.)
7 Whether, upon the expiration of the group, the ordered group should be sent out (even if some messages are missing). Optional. (The default is false.) See Managing State in an Aggregator: MessageGroupStore.
8 The timeout interval to wait when sending a reply Message to the output-channel or discard-channel. It is applied only if the output channel has some 'sending' limitations, such as a QueueChannel with a fixed 'capacity'. In this case, a MessageDeliveryException is thrown. The send-timeout is ignored for AbstractSubscribableChannel implementations. For group-timeout(-expression), the MessageDeliveryException from the scheduled expired task leads this task to be rescheduled. Optional.
9 A reference to a bean that implements the message correlation (grouping) algorithm. The bean can be an implementation of the CorrelationStrategy interface or a POJO. In the latter case, the correlation-strategy-method attribute must also be defined. Optional. (By default, the aggregator uses the IntegrationMessageHeaderAccessor.CORRELATION_ID header.)
10 A method that is defined on the bean referenced by correlation-strategy and that implements the correlation decision algorithm. Optional, with restrictions (requires correlation-strategy to be present).
11 A SpEL expression representing the correlation strategy. Example: "headers['something']". Only one of correlation-strategy or correlation-strategy-expression is allowed.
12 A reference to a bean that implements the release strategy. The bean can be an implementation of the ReleaseStrategy interface or a POJO. In the latter case, the release-strategy-method attribute must also be defined. Optional (by default, the aggregator will use the IntegrationMessageHeaderAccessor.SEQUENCE_SIZE header attribute).
13 A method that is defined on the bean referenced by release-strategy and that implements the completion decision algorithm. Optional, with restrictions (requires release-strategy to be present).
14 A SpEL expression representing the release strategy. The root object for the expression is a MessageGroup. Example: "size() == 5". Only one of release-strategy or release-strategy-expression is allowed.
15 Only applies if a MessageGroupStoreReaper is configured for the <resequencer> MessageStore. By default, when a MessageGroupStoreReaper is configured to expire partial groups, empty groups are also removed. Empty groups exist after a group is released normally. This is to enable the detection and discarding of late-arriving messages. If you wish to expire empty groups on a longer schedule than expiring partial groups, set this property. Empty groups are then not removed from the MessageStore until they have not been modified for at least this number of milliseconds. Note that the actual time to expire an empty group is also affected by the reaper’s timeout property, and it could be as much as this value plus the timeout.
16 See Configuring an Aggregator with XML.
17 See Configuring an Aggregator with XML.
18 See Configuring an Aggregator with XML.
19 See Configuring an Aggregator with XML.
20 By default, when a group is completed due to a timeout (or by a MessageGroupStoreReaper), the empty group’s metadata is retained. Late arriving messages are immediately discarded. Set this to true to remove the group completely. Then, late arriving messages start a new group and are not be discarded until the group again times out. The new group is never released normally because of the “hole” in the sequence range that caused the timeout. Empty groups can be expired (completely removed) later by using a MessageGroupStoreReaper together with the empty-group-min-timeout attribute. Starting with version 5.0, empty groups are also scheduled for removal after the empty-group-min-timeout elapses. The default is 'false'.

Also see Aggregator Expiring Groups for more information.

Since there is no custom behavior to be implemented in Java classes for resequencers, there is no annotation support for it.