Basically a mirror-image of the Splitter, the Aggregator is a type of Message Handler that receives multiple Messages and combines them into a single Message. In fact, Aggregators are often downstream consumers in a pipeline that includes a Splitter.
Technically, the Aggregator is more complex than a Splitter, because it is required to maintain state (the Messages to-be-aggregated), to decide when the complete group of Messages is available, and to timeout if necessary. Furthermore, in case of a timeout, the Aggregator needs to know whether to send the partial results or to discard them to a separate channel.
The Aggregator combines a group of related messages, by correlating and storing them, until the group is deemed complete. At that point, the Aggregator will create a single message by processing the whole group, and will send that aggregated message as output.
As messages might arrive with a certain delay (or certain messages from the group might not arrive at all), the Aggregator can specify a timeout (counted from the moment when the first message in the group has arrived), and whether, in the case of a timeout, the group should be discarded, or the Aggregator should merely attempt to create a single message out of what has arrived so far. An important aspect of implementing an Aggregator is providing the logic that has to be executed when the aggregation (creation of a single message out of many) takes place.
In Spring Integration, the grouping of the messages for aggregation is done by default based on their CORRELATION_ID message header (i.e. the messages with the same CORRELATION_ID will be grouped together). However, this can be customized, and the users can opt for other ways of specifying how the messages should be grouped together, by using a CorrelationStrategy (see below).
An important concern with respect to the timeout is, what happens if late messages arrive after the aggregation has taken place? In this case, a configuration option allows the user to decide whether they should be discarded or not.
The Aggregation API consists of a number of classes:
The base class AbstractMessageAggregator
and its
subclass MethodInvokingMessageAggregator
The CompletionStrategy
interface and its default
implementation SequenceSizeCompletionStrategy
The CorrelationStrategy
interface and its default
implementation HeaderAttributeCorrelationStrategy
The AbstractMessageAggregator
is a
MessageHandler
implementation, encapsulating the common
functionalities of an Aggregator, which are:
correlating messages into a group to be aggregated
maintaining those messages until the group is complete
deciding when the group is in fact complete
processing the completed group into a single aggregated message
recognizing and responding to a timed-out completion attempt
The responsibility of deciding how the messages should be grouped together
is delegated to a CorrelationStrategy
instance. The responsibility
of deciding whether the message group is complete is delegated to a
CompletionStrategy
instance.
Here is a brief highlight of the base
AbstractMessageAggregator
(the responsibility of
implementing the aggregateMessages method is left to the
developer):
public abstract class AbstractMessageAggregator extends AbstractMessageBarrierHandler { private volatile CompletionStrategy completionStrategy = new SequenceSizeCompletionStrategy(); .... protected abstract Message<?> aggregateMessages(List<Message<?>> messages); }It also inherits the following default CorrelationStrategy:
private volatile CorrelationStrategy correlationStrategy = new HeaderAttributeCorrelationStrategy(MessageHeaders.CORRELATION_ID);
When appropriate, the simplest option is the DefaultMessageAggregator
.
It creates a single Message whose payload is a List of the payloads received
for a given group. It uses the default CorrelationStrategy
and
CompletionStrategy
as shown above. This works well for simple
Scatter Gather implementations with either a Splitter, Publish Subscribe Channel,
or Recipient List Router upstream.
Note | |
---|---|
When using a Publish Subscribe Channel or Recipient List Router in this type of scenario, be sure to enable the flag to apply sequence. That will add the necessary headers (correlation id, sequence number and sequence size). That behavior is enabled by default for Splitters in Spring Integration, but it is not enabled for the Publish Subscribe Channel or Recipient List Router because those components may be used in a variety of contexts where those headers are not necessary. |
When implementing a specific aggregator object for an application,
a developer can extend AbstractMessageAggregator
and
implement the aggregateMessages
method. However, there are
better suited (which reads, less coupled to the API) solutions for
implementing the aggregation logic, which can be configured easily
either through XML or through annotations.
In general, any ordinary Java class (i.e. POJO) can implement the aggregation algorithm. For doing so, it must provide a method that accepts as an argument a single java.util.List (parametrized lists are supported as well). This method will be invoked for aggregating messages, as follows:
if the argument is a parametrized java.util.List, and the parameter type is assignable to Message, then the whole list of messages accumulated for aggregation will be sent to the aggregator
if the argument is a non-parametrized java.util.List or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages
if the return type is not assignable to Message, then it will be treated as the payload for a Message that will be created automatically by the framework.
Note | |
---|---|
In the interest of code simplicity, and promoting best practices such as low coupling, testability, etc., the preferred way of implementing the aggregation logic is through a POJO, and using the XML or annotation support for setting it up in the application. |
The CompletionStrategy
interface is defined as
follows:
public interface CompletionStrategy { boolean isComplete(List<Message<?>> messages); }
In general, any ordinary Java class (i.e. POJO) can implement the completion decision mechanism. For doing so, it must provide a method that accepts as an argument a single java.util.List (parametrized lists are supported as well), and returns a boolean value. This method will be invoked after the arrival of a new message, to decide whether the group is complete or not, as follows:
if the argument is a parametrized java.util.List, and the parameter type is assignable to Message, then the whole list of messages accumulated in the group will be sent to the method
if the argument is a non-parametrized java.util.List or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages
the method must return true if the message group is complete and ready for aggregation, and false otherwise.
Spring Integration provides an out-of-the box implementation for
CompletionStrategy
, the
SequenceSizeCompletionStrategy
. This implementation uses
the SEQUENCE_NUMBER and SEQUENCE_SIZE of the arriving messages for
deciding when a message group is complete and ready to be
aggregated. As shown above, it is also the default strategy.
The CorrelationStrategy
interface is defined as
follows:
public interface CorrelationStrategy { Object getCorrelationKey(Message<?> message); }
The method shall return an Object which represents the correlation key used for grouping messages together. The key must satisfy the criteria used for a key in a Map with respect to the implementation of equals() and hashCode().
In general, any ordinary Java class (i.e. POJO) can implement the
correlation decision mechanism, and the rules for mapping a message to
a method's argument (or arguments) are the same as for a
ServiceActivator
(including support for @Header
annotations). The method must return a value, and the value must not be
null
.
Spring Integration provides an out-of-the box implementation for
CorrelationStrategy
, the
HeaderAttributeCorrelationStrategy
. This implementation
returns the value of one of the message headers (whose name is specified
by a constructor argument) as the correlation key. By default, the
correlation strategy is a HeaderAttributeCorrelationStrategy returning
the value of the CORRELATION_ID header attribute.
Spring Integration supports the configuration of an aggregator via XML through the <aggregator/> element. Below you can see an example of an aggregator with all optional parameters defined.
<channel id="inputChannel"/> <aggregator id="completelyDefinedAggregator" input-channel="inputChannel" output-channel="outputChannel" discard-channel="discardChannel" ref="aggregatorBean" method="add" completion-strategy="completionStrategyBean" completion-strategy-method="checkCompleteness" correlation-strategy="correlationStrategyBean" correlation-strategy-method="groupNumbersByLastDigit" timeout="42" send-partial-result-on-timeout="true" reaper-interval="135" tracked-correlation-id-capacity="99" send-timeout="86420000" /> <channel id="outputChannel"/> <bean id="aggregatorBean" class="sample.PojoAggregator"/> <bean id="completionStrategyBean" class="sample.PojoCompletionStrategy"/> <bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
The id of the aggregator is optional. | |
The input channel of the aggregator. Required. | |
The channel where the aggregator will send the aggregation results. Optional (because incoming messages can specify a reply channel themselves). | |
The channel where the aggregator will send the messages that
timed out (if | |
A reference to a bean defined in the application context. The bean must implement the aggregation logic as described above. Required. | |
A method defined on the bean referenced by | |
A reference to a bean that implements the decision algorithm as to whether a given message group is complete. The bean can be an implementation of the CompletionStrategy interface or a POJO. In the latter case the completion-strategy-method attribute must be defined as well. Optional (by default, the aggregator will use sequence size) . | |
A method defined on the bean referenced by
| |
A reference to a bean that implements the correlation strategy. The bean can be an implementation of the CorrelationStrategy interface or a POJO. In the latter case the correlation-strategy-method attribute must be defined as well. Optional (by default, the aggregator will use the correlation id header attribute) . | |
A method defined on the bean referenced by
| |
The timeout (in milliseconds) for aggregating messages (counted from the arrival of the first message). Optional. | |
Whether upon the expiration of the timeout, the aggregator shall try to aggregate the messages that have already arrived. Optional (false by default). | |
The interval (in milliseconds) at which a reaper task is executed, checking if there are any timed out groups. Optional. | |
The capacity of the correlation id tracker. Remembers the already processed correlation ids, preventing the formation of new groups for messages that arrive after their group has been already processed (aggregated or discarded). Set this value to 0 if you do not want the messages to be discarded in such a scenario. Optional. | |
The timeout for sending the aggregated messages to the output or reply channel. Optional. |
Using a "ref" attribute is generally recommended if a custom aggregator handler
implementation can be reused in other <aggregator>
definitions.
However if a custom aggregator handler implementation should be scoped to a concrete
definition of the <aggregator>
, you can use an inner bean definition
(starting with version 1.0.3) for custom aggregator handlers within the
<aggregator>
element:
<aggregator input-channel="input" method="sum" output-channel="output"> <beans:bean class="org.foo.ExampleAggregator"/> </aggregator>
Note | |
---|---|
Using both a "ref" attribute and an inner bean definition in the same
|
An example implementation of the aggregator bean looks as follows:
public class PojoAggregator { public Long add(List<Long> results) { long total = 0l; for (long partialResult: results) { total += partialResult; } return total; } }
An implementation of the completion strategy bean for the example above may be as follows:
public class PojoCompletionStrategy { ... public boolean checkCompleteness(List<Long> numbers) { int sum = 0; for (long number: numbers) { sum += number; } return sum >= maxValue; } }
Note | |
---|---|
Wherever it makes sense, the completion strategy method and the aggregator method can be combined in a single bean. |
An implementation of the correlation strategy bean for the example above may be as follows:
public class PojoCorrelationStrategy { ... public Long groupNumbersByLastDigit(Long number) { return number % 10; } }
For example, this aggregator would group numbers by some criterion (in our case the remainder after dividing by 10) and will hold the group until the sum of the numbers which represents the payload exceeds a certain value.
Note | |
---|---|
Wherever it makes sense, the completion strategy method, correlation strategy method and the aggregator method can be combined in a single bean (all of them or any two). |
An aggregator configured using annotations can look like this.
public class Waiter { ... @Aggregator public Delivery aggregatingMethod(List<OrderItem> items) { ... } @CompletionStrategy public boolean completionChecker(List<Message<?>> messages) { ... } @CorrelationStrategy public String correlateBy(OrderItem item) { ... } }
All of the configuration options provided by the xml element are also available for the @Aggregator annotation.
The aggregator can be either referenced explicitly from XML or, if the @MessageEndpoint is defined on the class, detected automatically through classpath scanning.