Basically a mirror-image of the Splitter, the Aggregator is a type of Message Handler that receives multiple Messages and combines them into a single Message. In fact, Aggregators are often downstream consumers in a pipeline that includes a Splitter.
Technically, the Aggregator is more complex than a Splitter, because it is required to maintain state (the Messages to be aggregated), to decide when the complete group of Messages is available. In order to do this it requires a MessageStore
The Aggregator combines a group of related messages, by correlating and storing them, until the group is deemed complete. At that point, the Aggregator will create a single message by processing the whole group, and will send that aggregated message as output.
An main aspect of implementing an Aggregator is providing the logic that has to be executed when the aggregation (creation of a single message out of many) takes place. The other two aspects are correlation and release
In Spring Integration, the grouping of the messages for aggregation (correlation) is done by default based on their CORRELATION_ID message header (i.e. the messages with the same CORRELATION_ID will be grouped together). However, this can be customized, and the users can opt for other ways of specifying how the messages should be grouped together, by using a CorrelationStrategy (see below).
To determine whether or not a group of messages may be processed, a ReleaseStrategy is consulted. The default release strategy for aggregator will release groups that have all messages from the sequence, but this can be entirely customized
The Aggregation API consists of a number of classes:
The interface MessageGroupProcessor
and related
base class AbstractAggregatingMessageGroupProcessor
and
its subclass
MethodInvokingAggregatingMessageGroupProcessor
The ReleaseStrategy
interface and its default
implementation SequenceSizeReleaseStrategy
The CorrelationStrategy
interface and its default
implementation HeaderAttributeCorrelationStrategy
The CorrelatingMessageHandler
is a
MessageHandler
implementation, encapsulating the common
functionalities of an Aggregator (and other correlating use cases),
which are:
correlating messages into a group to be aggregated
maintaining those messages in a MessageStore until the group may be released
deciding when the group is in fact may be released
processing the released group into a single aggregated message
recognizing and responding to an expired group
The responsibility of deciding how the messages should
be grouped together is delegated to a CorrelationStrategy
instance. The responsibility of deciding whether the message group can
be released is delegated to a ReleaseStrategy
instance.
Here is a brief highlight of the base
AbstractAggregatingMessageGroupProcessor
(the
responsibility of implementing the aggregateMessages method is left to
the developer):
public abstract class AbstractAggregatingMessageGroupProcessor implements MessageGroupProcessor { protected Map<String, Object> aggregateHeaders(MessageGroup group) { .... } protected abstract Object aggregatePayloads(MessageGroup group); }The CorrelationStrategy is owned by the
CorrelatingMessageHandler
and it has a default value based on the correlation ID message header:
private volatile CorrelationStrategy correlationStrategy = new HeaderAttributeCorrelationStrategy(MessageHeaders.CORRELATION_ID);
When appropriate, the simplest option is the
DefaultAggregatingMessageGroupProcessor
. It creates a
single Message whose payload is a List of the payloads received for a
given group. It uses the default CorrelationStrategy
and
CompletionStrategy
as shown above. This works well for
simple Scatter Gather implementations with either a Splitter, Publish
Subscribe Channel, or Recipient List Router upstream.
Note | |
---|---|
When using a Publish Subscribe Channel or Recipient List Router in this type of scenario, be sure to enable the flag to apply-sequence. That will add the necessary headers (correlation id, sequence number and sequence size). That behavior is enabled by default for Splitters in Spring Integration, but it is not enabled for the Publish Subscribe Channel or Recipient List Router because those components may be used in a variety of contexts where those headers are not necessary. |
When implementing a specific aggregator object for an application,
a developer can extend
AbstractAggregatingMessageGroupProcessor
and implement the
aggregatePayloads
method. However, there are better suited
(which reads, less coupled to the API) solutions for implementing the
aggregation logic, which can be configured easily either through XML or
through annotations.
In general, any ordinary Java class (i.e. POJO) can implement the aggregation algorithm. For doing so, it must provide a method that accepts as an argument a single java.util.List (parametrized lists are supported as well). This method will be invoked for aggregating messages, as follows:
if the argument is a parametrized java.util.List, and the parameter type is assignable to Message, then the whole list of messages accumulated for aggregation will be sent to the aggregator
if the argument is a non-parametrized java.util.List or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages
if the return type is not assignable to Message, then it will be treated as the payload for a Message that will be created automatically by the framework.
Note | |
---|---|
In the interest of code simplicity, and promoting best practices such as low coupling, testability, etc., the preferred way of implementing the aggregation logic is through a POJO, and using the XML or annotation support for setting it up in the application. |
The ReleaseStrategy
interface is defined as
follows:
public interface ReleaseStrategy { boolean canRelease(MessageGroup messages); }
In general, any ordinary Java class (i.e. POJO) can implement the completion decision mechanism. For doing so, it must provide a method that accepts as an argument a single java.util.List (parametrized lists are supported as well), and returns a boolean value. This method will be invoked after the arrival of a new message, to decide whether the group is complete or not, as follows:
if the argument is a parametrized java.util.List, and the parameter type is assignable to Message, then the whole list of messages accumulated in the group will be sent to the method
if the argument is a non-parametrized java.util.List or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages
the method must return true if the message group is ready for aggregation, and false otherwise.
When the group is released for aggregation, all its unmarked
messages are processed and then marked so they will not be processed
again. If the group is also complete (i.e. if all messages from a
sequence have arrived or if there is no sequence defined) then the group
is removed from the message store. Partial sequences can be released, in
which case the next time the ReleaseStrategy
is called it
will be presented with a group containing marked messages (already
processed) and unmarked messages (a potential new partial
sequence)
Spring Integration provides an out-of-the box implementation for
ReleaseStrategy
, the
SequenceSizerReleaseStrategy
. This implementation uses the
SEQUENCE_NUMBER and SEQUENCE_SIZE of the arriving messages for deciding
when a message group is complete and ready to be aggregated. As shown
above, it is also the default strategy.
The CorrelationStrategy
interface is defined as
follows:
public interface CorrelationStrategy { Object getCorrelationKey(Message<?> message); }
The method shall return an Object which represents the correlation key used for grouping messages together. The key must satisfy the criteria used for a key in a Map with respect to the implementation of equals() and hashCode().
In general, any ordinary Java class (i.e. POJO) can implement the
correlation decision mechanism, and the rules for mapping a message to a
method's argument (or arguments) are the same as for a
ServiceActivator
(including support for @Header
annotations). The method must return a value, and the value must not be
null
.
Spring Integration provides an out-of-the box implementation for
CorrelationStrategy
, the
HeaderAttributeCorrelationStrategy
. This implementation
returns the value of one of the message headers (whose name is specified
by a constructor argument) as the correlation key. By default, the
correlation strategy is a HeaderAttributeCorrelationStrategy returning
the value of the CORRELATION_ID header attribute.
Spring Integration supports the configuration of an aggregator via XML through the <aggregator/> element. Below you can see an example of an aggregator with all optional parameters defined.
<channel id="inputChannel"/>
<aggregator id="completelyDefinedAggregator"
input-channel="inputChannel"
output-channel="outputChannel"
discard-channel="discardChannel"
ref="aggregatorBean"
method="add"
release-strategy="releaseStrategyBean"
release-strategy-method="canRelease"
correlation-strategy="correlationStrategyBean"
correlation-strategy-method="groupNumbersByLastDigit"
message-store="messageStore"
send-partial-result-on-expiry="true"
send-timeout="86420000" />
<channel id="outputChannel"/>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
The id of the aggregator is optional. | |
The input channel of the aggregator. Required. | |
The channel where the aggregator will send the aggregation results. Optional (because incoming messages can specify a reply channel themselves). | |
The channel where the aggregator will send the messages that
timed out (if | |
A reference to a bean defined in the application context. The bean must implement the aggregation logic as described above. Required. | |
A method defined on the bean referenced by | |
A reference to a bean that implements the decision algorithm as to whether a given message group is complete. The bean can be an implementation of the CompletionStrategy interface or a POJO. In the latter case the completion-strategy-method attribute must be defined as well. Optional (by default, the aggregator will use sequence size) . | |
A method defined on the bean referenced by
| |
A reference to a bean that implements the correlation strategy. The bean can be an implementation of the CorrelationStrategy interface or a POJO. In the latter case the correlation-strategy-method attribute must be defined as well. Optional (by default, the aggregator will use the correlation id header attribute) . | |
A method defined on the bean referenced by
| |
A reference to a | |
Whether upon the expiration of the message group, the aggregator will try to aggregate the messages that have already arrived. Optional (false by default). | |
The timeout for sending the aggregated messages to the output or reply channel. Optional. |
Using a "ref" attribute is generally recommended if a custom
aggregator handler implementation can be reused in other
<aggregator>
definitions. However if a custom
aggregator handler implementation should be scoped to a concrete
definition of the <aggregator>
, you can use an inner
bean definition (starting with version 1.0.3) for custom aggregator
handlers within the <aggregator>
element:
<aggregator input-channel="input" method="sum" output-channel="output"> <beans:bean class="org.foo.ExampleAggregator"/> </aggregator>
Note | |
---|---|
Using both a "ref" attribute and an inner bean definition in the
same |
An example implementation of the aggregator bean looks as follows:
public class PojoAggregator { public Long add(List<Long> results) { long total = 0l; for (long partialResult: results) { total += partialResult; } return total; } }
An implementation of the completion strategy bean for the example above may be as follows:
public class PojoReleaseStrategy { ... public boolean canRelease(List<Long> numbers) { int sum = 0; for (long number: numbers) { sum += number; } return sum >= maxValue; } }
Note | |
---|---|
Wherever it makes sense, the release strategy method and the aggregator method can be combined in a single bean. |
An implementation of the correlation strategy bean for the example above may be as follows:
public class PojoCorrelationStrategy { ... public Long groupNumbersByLastDigit(Long number) { return number % 10; } }
For example, this aggregator would group numbers by some criterion (in our case the remainder after dividing by 10) and will hold the group until the sum of the numbers which represents the payload exceeds a certain value.
Note | |
---|---|
Wherever it makes sense, the release strategy method, correlation strategy method and the aggregator method can be combined in a single bean (all of them or any two). |
Aggregator (and some other patterns in Spring Integration) is a
stateful pattern that requires decisions to be made based on a group of
messages that have arrived over a period of time, all with the same
correlation key. The design of the interfaces in the stateful patterns
(e.g. ReleaseStrategy
) is driven by the principle
that the components (framework and user) should be to remain stateless.
All state is carried by the MessageGroup
and its
management is delegated to the
MessageGroupStore
.
The MessageGroupStore
accumulates state
information in MessageGroups
, potentially forever.
So to prevent stale state from hanging around, and for volatile stores to
provide a hook for cleaning up when the application shots down, the
MessageGroupStore
allows the user to register
callbacks to apply to MessageGroups
when they
expire. The interface is very straighforward:
public interface MessageGroupCallback { void execute(MessageGroupStore messageGroupStore, MessageGroup group); }
The callback has access directly to the store and the message group so it can manage the persistent state (e.g. by removing the group from the store entirely).
The MessageGroupStore maintains a list of these callbacks which it applies when asked to all messages whose timestamp is earlier than a time supplied as a parameter:
public interface MessageGroupStore { void registerMessageGroupExpiryCallback(MessageGroupCallback callback); int expireMessageGroups(long timeout); }
The expireMessageGroups method can be called with a timeout value: any message older than the current time minus this value wiull be expired, and have the callbacks applied. Thus it is the user of the store that defines what is meant by message group "expiry".
As a convenience for users, Spring Integration provides a wrapper
for the message expiry in the form of a
MessageGroupStoreReaper
:
<bean id="reaper" class="org...MessageGroupStoreReaper"> <property name="messageGroupStore" ref="messageStore"/> <property name="timeout" value="10"/> </bean> <task:scheduled-tasks scheduler="scheduler"> <task:scheduled ref="reaper" method="run" fixed-rate="10000"/> </task:scheduled-tasks>
The reaper is a Runnable, and all that is happening is that the
message group store's expire method is being called in the sample above
once every 10 seconds. In addition to the reaper, the expiry callbacks are
invoked when the application shuts down via a lifecycle callback in the
CorrelatingMessageHandler
.
The CorrelatingMessageHandler
registers its
own expiry callback, and this is the link with the boolean flag
send-partial-result-on-expiry
in the XML configuration of the
aggregator. If the flag is set to true, then when the expiry callback is
invoked then any unmarked messages in groups that are not yet released can
be sent on to the downstream channel.
An aggregator configured using annotations can look like this.
public class Waiter { ... @Aggregator public Delivery aggregatingMethod(List<OrderItem> items) { ... } @ReleaseStrategy public boolean releaseChecker(List<Message<?>> messages) { ... } @CorrelationStrategy public String correlateBy(OrderItem item) { ... } }
All of the configuration options provided by the xml element are also available for the @Aggregator annotation.
The aggregator can be either referenced explicitly from XML or, if the @MessageEndpoint is defined on the class, detected automatically through classpath scanning.