Basically a mirror-image of the Splitter, the Aggregator is a type of Message Handler that receives multiple Messages and combines them into a single Message. In fact, Aggregators are often downstream consumers in a pipeline that includes a Splitter.
Technically, the Aggregator is more complex than a Splitter, because it is required to maintain state (the Messages to-be-aggregated), to decide when the complete group of Messages is available.
The Aggregator combines a group of related messages, by correlating and storing them, until the group is deemed complete. At that point, the Aggregator will create a single message by processing the whole group, and will send that aggregated message as output.
An important aspect of implementing an Aggregator is providing the logic that has to be executed when the aggregation (creation of a single message out of many) takes place.
In Spring Integration, the grouping of the messages for aggregation is done by default based on their CORRELATION_ID message header (i.e. the messages with the same CORRELATION_ID will be grouped together). However, this can be customized, and the users can opt for other ways of specifying how the messages should be grouped together, by using a CorrelationStrategy (see below).
Another important concern is, what happens if late messages arrive after the aggregation has taken place? In this case, the user needs to be able to decide whether they should be discarded or not.
The Aggregation API consists of a number of classes:
The interface MessageGroupProcessor
and related
base class AbstractAggregatingMessageGroupProcessor
and its
subclass MethodInvokingAggregatingMessageGroupProcessor
The ReleaseStrategy
interface and its default
implementation SequenceSizeReleaseStrategy
The CorrelationStrategy
interface and its default
implementation HeaderAttributeCorrelationStrategy
The CorrelatingMessageHandler
is a
MessageHandler
implementation, encapsulating the common
functionalities of an Aggregator (and other correlating use cases), which are:
correlating messages into a group to be aggregated
maintaining those messages until the group is complete
deciding when the group is in fact complete
processing the completed group into a single aggregated message
recognizing and responding to an expired group
The responsibility of deciding how the messages should be grouped together
is delegated to a CorrelationStrategy
instance. The responsibility
of deciding whether the message group can be released is delegated to a
ReleaseStrategy
instance.
Here is a brief highlight of the base
AbstractAggregatingMessageGroupProcessor
(the responsibility of
implementing the aggregateMessages method is left to the
developer):
public abstract class AbstractAggregatingMessageGroupProcessor implements MessageGroupProcessor { protected Map<String, Object> aggregateHeaders(MessageGroup group) { .... } protected abstract Object aggregatePayloads(MessageGroup group); }The CorrelationStrategy is owned by the
CorrelatingMessageHandler
and it has
a default value based on the correlation ID message header:
private volatile CorrelationStrategy correlationStrategy = new HeaderAttributeCorrelationStrategy(MessageHeaders.CORRELATION_ID);
When appropriate, the simplest option is the DefaultAggregatingMessageGroupProcessor
.
It creates a single Message whose payload is a List of the payloads received
for a given group. It uses the default CorrelationStrategy
and
CompletionStrategy
as shown above. This works well for simple
Scatter Gather implementations with either a Splitter, Publish Subscribe Channel,
or Recipient List Router upstream.
![]() | Note |
---|---|
When using a Publish Subscribe Channel or Recipient List Router in this type of scenario, be sure to enable the flag to apply sequence. That will add the necessary headers (correlation id, sequence number and sequence size). That behavior is enabled by default for Splitters in Spring Integration, but it is not enabled for the Publish Subscribe Channel or Recipient List Router because those components may be used in a variety of contexts where those headers are not necessary. |
When implementing a specific aggregator object for an application,
a developer can extend AbstractAggregatingMessageGroupProcessor
and
implement the aggregatePayloads
method. However, there are
better suited (which reads, less coupled to the API) solutions for
implementing the aggregation logic, which can be configured easily
either through XML or through annotations.
In general, any ordinary Java class (i.e. POJO) can implement the aggregation algorithm. For doing so, it must provide a method that accepts as an argument a single java.util.List (parametrized lists are supported as well). This method will be invoked for aggregating messages, as follows:
if the argument is a parametrized java.util.List, and the parameter type is assignable to Message, then the whole list of messages accumulated for aggregation will be sent to the aggregator
if the argument is a non-parametrized java.util.List or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages
if the return type is not assignable to Message, then it will be treated as the payload for a Message that will be created automatically by the framework.
![]() | Note |
---|---|
In the interest of code simplicity, and promoting best practices such as low coupling, testability, etc., the preferred way of implementing the aggregation logic is through a POJO, and using the XML or annotation support for setting it up in the application. |
The ReleaseStrategy
interface is defined as
follows:
public interface ReleaseStrategy { boolean canRelease(MessageGroup messages); }
In general, any ordinary Java class (i.e. POJO) can implement the completion decision mechanism. For doing so, it must provide a method that accepts as an argument a single java.util.List (parametrized lists are supported as well), and returns a boolean value. This method will be invoked after the arrival of a new message, to decide whether the group is complete or not, as follows:
if the argument is a parametrized java.util.List, and the parameter type is assignable to Message, then the whole list of messages accumulated in the group will be sent to the method
if the argument is a non-parametrized java.util.List or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages
the method must return true if the message group is ready for aggregation, and false otherwise.
When the group is released for aggregation, all its
unmarked messages are processed and then marked so they will not
be processed again. If the group is also complete (i.e. if all
messages from a sequence have arrived or if there is no sequence
defined) then the group is removed from the message store.
Partial sequences can be released, in which case the next time
the ReleaseStrategy
is called it will be presented
with a group containing marked messages (already processed) and
unmarked messages (a potential new partial sequence)
Spring Integration provides an out-of-the box implementation for
ReleaseStrategy
, the
SequenceSizerReleaseStrategy
. This implementation uses
the SEQUENCE_NUMBER and SEQUENCE_SIZE of the arriving messages for
deciding when a message group is complete and ready to be
aggregated. As shown above, it is also the default strategy.
The CorrelationStrategy
interface is defined as
follows:
public interface CorrelationStrategy { Object getCorrelationKey(Message<?> message); }
The method shall return an Object which represents the correlation key used for grouping messages together. The key must satisfy the criteria used for a key in a Map with respect to the implementation of equals() and hashCode().
In general, any ordinary Java class (i.e. POJO) can implement the
correlation decision mechanism, and the rules for mapping a message to
a method's argument (or arguments) are the same as for a
ServiceActivator
(including support for @Header
annotations). The method must return a value, and the value must not be
null
.
Spring Integration provides an out-of-the box implementation for
CorrelationStrategy
, the
HeaderAttributeCorrelationStrategy
. This implementation
returns the value of one of the message headers (whose name is specified
by a constructor argument) as the correlation key. By default, the
correlation strategy is a HeaderAttributeCorrelationStrategy returning
the value of the CORRELATION_ID header attribute.
Spring Integration supports the configuration of an aggregator via XML through the <aggregator/> element. Below you can see an example of an aggregator with all optional parameters defined.
<channel id="inputChannel"/> <aggregator id="completelyDefinedAggregator"input-channel="inputChannel"
output-channel="outputChannel"
discard-channel="discardChannel"
ref="aggregatorBean"
method="add"
release-strategy="releaseStrategyBean"
release-strategy-method="canRelease"
correlation-strategy="correlationStrategyBean"
correlation-strategy-method="groupNumbersByLastDigit"
message-store="messageStore"
send-partial-result-on-expiry="true"
send-timeout="86420000"
/> <channel id="outputChannel"/> <bean id="aggregatorBean" class="sample.PojoAggregator"/> <bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/> <bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
The id of the aggregator is optional. | |
The input channel of the aggregator. Required. | |
The channel where the aggregator will send the aggregation results. Optional (because incoming messages can specify a reply channel themselves). | |
The channel where the aggregator will send the messages that
timed out (if | |
A reference to a bean defined in the application context. The bean must implement the aggregation logic as described above. Required. | |
A method defined on the bean referenced by | |
A reference to a bean that implements the decision algorithm as to whether a given message group is complete. The bean can be an implementation of the CompletionStrategy interface or a POJO. In the latter case the completion-strategy-method attribute must be defined as well. Optional (by default, the aggregator will use sequence size) . | |
A method defined on the bean referenced by
| |
A reference to a bean that implements the correlation strategy. The bean can be an implementation of the CorrelationStrategy interface or a POJO. In the latter case the correlation-strategy-method attribute must be defined as well. Optional (by default, the aggregator will use the correlation id header attribute) . | |
A method defined on the bean referenced by
| |
A reference to a | |
Whether upon the expiration of the message group, the aggregator will try to aggregate the messages that have already arrived. Optional (false by default). | |
The timeout for sending the aggregated messages to the output or reply channel. Optional. |
Using a "ref" attribute is generally recommended if a custom aggregator handler
implementation can be reused in other <aggregator>
definitions.
However if a custom aggregator handler implementation should be scoped to a concrete
definition of the <aggregator>
, you can use an inner bean definition
(starting with version 1.0.3) for custom aggregator handlers within the
<aggregator>
element:
<aggregator input-channel="input" method="sum" output-channel="output"> <beans:bean class="org.foo.ExampleAggregator"/> </aggregator>
![]() | Note |
---|---|
Using both a "ref" attribute and an inner bean definition in the same
|
An example implementation of the aggregator bean looks as follows:
public class PojoAggregator { public Long add(List<Long> results) { long total = 0l; for (long partialResult: results) { total += partialResult; } return total; } }
An implementation of the completion strategy bean for the example above may be as follows:
public class PojoReleaseStrategy { ... public boolean canRelease(List<Long> numbers) { int sum = 0; for (long number: numbers) { sum += number; } return sum >= maxValue; } }
![]() | Note |
---|---|
Wherever it makes sense, the release strategy method and the aggregator method can be combined in a single bean. |
An implementation of the correlation strategy bean for the example above may be as follows:
public class PojoCorrelationStrategy { ... public Long groupNumbersByLastDigit(Long number) { return number % 10; } }
For example, this aggregator would group numbers by some criterion (in our case the remainder after dividing by 10) and will hold the group until the sum of the numbers which represents the payload exceeds a certain value.
![]() | Note |
---|---|
Wherever it makes sense, the release strategy method, correlation strategy method and the aggregator method can be combined in a single bean (all of them or any two). |
An aggregator configured using annotations can look like this.
public class Waiter { ... @Aggregator public Delivery aggregatingMethod(List<OrderItem> items) { ... } @ReleaseStrategy public boolean releaseChecker(List<Message<?>> messages) { ... } @CorrelationStrategy public String correlateBy(OrderItem item) { ... } }
All of the configuration options provided by the xml element are also available for the @Aggregator annotation.
The aggregator can be either referenced explicitly from XML or, if the @MessageEndpoint is defined on the class, detected automatically through classpath scanning.