Aggregators and Resequencers
An Aggregator
is conceptually the opposite of a Splitter
.
It aggregates a sequence of individual messages into a single message and is necessarily more complex.
By default, an aggregator returns a message that contains a collection of payloads from incoming messages.
The same rules are applied for the Resequencer
.
The following example shows a canonical example of the splitter-aggregator pattern:
@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlow.from("splitAggregateInput")
.split()
.channel(MessageChannels.executor(this.taskExecutor()))
.resequence()
.aggregate()
.get();
}
The split()
method splits the list into individual messages and sends them to the ExecutorChannel
.
The resequence()
method reorders messages by sequence details found in the message headers.
The aggregate()
method collects those messages.
However, you can change the default behavior by specifying a release strategy and correlation strategy, among other things. Consider the following example:
.aggregate(a ->
a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
.releaseStrategy(g -> g.size() > 10)
.messageStore(messageStore()))
The preceding example correlates messages that have myCorrelationKey
headers and releases the messages once at least ten have been accumulated.
Similar lambda configurations are provided for the resequence()
EIP method.