11. Aggregator

11.1 Introduction

Basically a mirror-image of the Splitter, the Aggregator is a type of Message Handler that receives multiple Messages and combines them into a single Message. In fact, Aggregators are often downstream consumers in a pipeline that includes a Splitter.

Technically, the Aggregator is more complex than a Splitter, because it is required to maintain state (the Messages to-be-aggregated), to decide when the complete group of Messages is available, and to timeout if necessary. Furthermore, in case of a timeout, the Aggregator needs to know whether to send the partial results or to discard them to a separate channel.

11.2 Functionality

The Aggregator combines a group of related messages, by correlating and storing them, until the group is deemed complete. At that point, the Aggregator will create a single message by processing the whole group, and will send that aggregated message as output.

As messages might arrive with a certain delay (or certain messages from the group might not arrive at all), the Aggregator can specify a timeout (counted from the moment when the first message in the group has arrived), and whether, in the case of a timeout, the group should be discarded, or the Aggregator should merely attempt to create a single message out of what has arrived so far. An important aspect of implementing an Aggregator is providing the logic that has to be executed when the aggregation (creation of a single message out of many) takes place.

In Spring Integration, the grouping of the messages for aggregation is done by default based on their CORRELATION_ID message header (i.e. the messages with the same CORRELATION_ID will be grouped together). However, this can be customized, and the users can opt for other ways of specifying how the messages should be grouped together, by using a CorrelationStrategy (see below).

An important concern with respect to the timeout is, what happens if late messages arrive after the aggregation has taken place? In this case, a configuration option allows the user to decide whether they should be discarded or not.

11.3 Programming model

The Aggregation API consists of a number of classes:

  • The base class AbstractMessageAggregator and its subclass MethodInvokingMessageAggregator

  • The CompletionStrategy interface and its default implementation SequenceSizeCompletionStrategy

  • The CorrelationStrategy interface and its default implementation HeaderAttributeCorrelationStrategy

11.3.1 AbstractMessageAggregator

The AbstractMessageAggregator is a MessageHandler implementation, encapsulating the common functionalities of an Aggregator, which are:

  • correlating messages into a group to be aggregated

  • maintaining those messages until the group is complete

  • deciding when the group is in fact complete

  • processing the completed group into a single aggregated message

  • recognizing and responding to a timed-out completion attempt

The responsibility of deciding how the messages should be grouped together is delegated to a CorrelationStrategy instance. The responsibility of deciding whether the message group is complete is delegated to a CompletionStrategy instance.

Here is a brief highlight of the base AbstractMessageAggregator (the responsibility of implementing the aggregateMessages method is left to the developer):

public abstract class AbstractMessageAggregator 
              extends AbstractMessageBarrierHandler {

  private volatile CompletionStrategy completionStrategy
                            = new SequenceSizeCompletionStrategy();
  ....

  protected abstract Message<?> aggregateMessages(List<Message<?>> messages);

}
It also inherits the following default CorrelationStrategy:
private volatile CorrelationStrategy correlationStrategy =
          new HeaderAttributeCorrelationStrategy(MessageHeaders.CORRELATION_ID);

When appropriate, the simplest option is the DefaultMessageAggregator. It creates a single Message whose payload is a List of the payloads received for a given group. It uses the default CorrelationStrategy and CompletionStrategy as shown above. This works well for simple Scatter Gather implementations with either a Splitter, Publish Subscribe Channel, or Recipient List Router upstream.

[Note]Note

When using a Publish Subscribe Channel or Recipient List Router in this type of scenario, be sure to enable the flag to apply sequence. That will add the necessary headers (correlation id, sequence number and sequence size). That behavior is enabled by default for Splitters in Spring Integration, but it is not enabled for the Publish Subscribe Channel or Recipient List Router because those components may be used in a variety of contexts where those headers are not necessary.

When implementing a specific aggregator object for an application, a developer can extend AbstractMessageAggregator and implement the aggregateMessages method. However, there are better suited (which reads, less coupled to the API) solutions for implementing the aggregation logic, which can be configured easily either through XML or through annotations.

In general, any ordinary Java class (i.e. POJO) can implement the aggregation algorithm. For doing so, it must provide a method that accepts as an argument a single java.util.List (parametrized lists are supported as well). This method will be invoked for aggregating messages, as follows:

  • if the argument is a parametrized java.util.List, and the parameter type is assignable to Message, then the whole list of messages accumulated for aggregation will be sent to the aggregator

  • if the argument is a non-parametrized java.util.List or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages

  • if the return type is not assignable to Message, then it will be treated as the payload for a Message that will be created automatically by the framework.

[Note]Note

In the interest of code simplicity, and promoting best practices such as low coupling, testability, etc., the preferred way of implementing the aggregation logic is through a POJO, and using the XML or annotation support for setting it up in the application.

11.3.2 CompletionStrategy

The CompletionStrategy interface is defined as follows:

public interface CompletionStrategy {

  boolean isComplete(List<Message<?>> messages);

}

In general, any ordinary Java class (i.e. POJO) can implement the completion decision mechanism. For doing so, it must provide a method that accepts as an argument a single java.util.List (parametrized lists are supported as well), and returns a boolean value. This method will be invoked after the arrival of a new message, to decide whether the group is complete or not, as follows:

  • if the argument is a parametrized java.util.List, and the parameter type is assignable to Message, then the whole list of messages accumulated in the group will be sent to the method

  • if the argument is a non-parametrized java.util.List or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages

  • the method must return true if the message group is complete and ready for aggregation, and false otherwise.

Spring Integration provides an out-of-the box implementation for CompletionStrategy, the SequenceSizeCompletionStrategy. This implementation uses the SEQUENCE_NUMBER and SEQUENCE_SIZE of the arriving messages for deciding when a message group is complete and ready to be aggregated. As shown above, it is also the default strategy.

11.3.3 CorrelationStrategy

The CorrelationStrategy interface is defined as follows:

public interface CorrelationStrategy {

  Object getCorrelationKey(Message<?> message);

}

The method shall return an Object which represents the correlation key used for grouping messages together. The key must satisfy the criteria used for a key in a Map with respect to the implementation of equals() and hashCode().

In general, any ordinary Java class (i.e. POJO) can implement the correlation decision mechanism, and the rules for mapping a message to a method's argument (or arguments) are the same as for a ServiceActivator (including support for @Header annotations). The method must return a value, and the value must not be null.

Spring Integration provides an out-of-the box implementation for CorrelationStrategy, the HeaderAttributeCorrelationStrategy. This implementation returns the value of one of the message headers (whose name is specified by a constructor argument) as the correlation key. By default, the correlation strategy is a HeaderAttributeCorrelationStrategy returning the value of the CORRELATION_ID header attribute.

11.4 Configuring an Aggregator with XML

Spring Integration supports the configuration of an aggregator via XML through the <aggregator/> element. Below you can see an example of an aggregator with all optional parameters defined.

<channel id="inputChannel"/>

<aggregator id="completelyDefinedAggregator" 1
    input-channel="inputChannel" 2
    output-channel="outputChannel"  3
    discard-channel="discardChannel"  4
    ref="aggregatorBean" 5
    method="add" 6
    completion-strategy="completionStrategyBean"  7
    completion-strategy-method="checkCompleteness" 8
    correlation-strategy="correlationStrategyBean" 9
    correlation-strategy-method="groupNumbersByLastDigit" 10
    timeout="42" 11
    send-partial-result-on-timeout="true" 12
    reaper-interval="135" 13
    tracked-correlation-id-capacity="99" 14
    send-timeout="86420000" 15 /> 

<channel id="outputChannel"/>

<bean id="aggregatorBean" class="sample.PojoAggregator"/>

<bean id="completionStrategyBean" class="sample.PojoCompletionStrategy"/>

<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>

1

The id of the aggregator is optional.

2

The input channel of the aggregator. Required.

3

The channel where the aggregator will send the aggregation results. Optional (because incoming messages can specify a reply channel themselves).

4

The channel where the aggregator will send the messages that timed out (if send-partial-results-on-timeout is false). Optional.

5

A reference to a bean defined in the application context. The bean must implement the aggregation logic as described above. Required.

6

A method defined on the bean referenced by ref, that implements the message aggregation algorithm. Optional, with restrictions (see above).

7

A reference to a bean that implements the decision algorithm as to whether a given message group is complete. The bean can be an implementation of the CompletionStrategy interface or a POJO. In the latter case the completion-strategy-method attribute must be defined as well. Optional (by default, the aggregator will use sequence size) .

8

A method defined on the bean referenced by completion-strategy, that implements the completion decision algorithm. Optional, with restrictions (requires completion-strategy to be present).

9

A reference to a bean that implements the correlation strategy. The bean can be an implementation of the CorrelationStrategy interface or a POJO. In the latter case the correlation-strategy-method attribute must be defined as well. Optional (by default, the aggregator will use the correlation id header attribute) .

10

A method defined on the bean referenced by correlation-strategy, that implements the correlation key algorithm. Optional, with restrictions (requires correlation-strategy to be present).

11

The timeout (in milliseconds) for aggregating messages (counted from the arrival of the first message). Optional.

12

Whether upon the expiration of the timeout, the aggregator shall try to aggregate the messages that have already arrived. Optional (false by default).

13

The interval (in milliseconds) at which a reaper task is executed, checking if there are any timed out groups. Optional.

14

The capacity of the correlation id tracker. Remembers the already processed correlation ids, preventing the formation of new groups for messages that arrive after their group has been already processed (aggregated or discarded). Set this value to 0 if you do not want the messages to be discarded in such a scenario. Optional.

15

The timeout for sending the aggregated messages to the output or reply channel. Optional.

Using a "ref" attribute is generally recommended if a custom aggregator handler implementation can be reused in other <aggregator> definitions. However if a custom aggregator handler implementation should be scoped to a concrete definition of the <aggregator>, you can use an inner bean definition (starting with version 1.0.3) for custom aggregator handlers within the <aggregator> element:

<aggregator input-channel="input" method="sum" output-channel="output">
    <beans:bean class="org.foo.ExampleAggregator"/>
</aggregator>

[Note]Note

Using both a "ref" attribute and an inner bean definition in the same <aggregator> configuration is not allowed, as it creates an ambiguous condition. In such cases, an Exception will be thrown.

An example implementation of the aggregator bean looks as follows:

public class PojoAggregator {

  public Long add(List<Long> results) {
    long total = 0l;
    for (long partialResult: results) {
      total += partialResult;
    }
    return total;
  }

}

An implementation of the completion strategy bean for the example above may be as follows:

public class PojoCompletionStrategy {
...
  public boolean checkCompleteness(List<Long> numbers) {
    int sum = 0;
    for (long number: numbers) {
      sum += number;
    }
    return sum >= maxValue;
  }
}

[Note]Note

Wherever it makes sense, the completion strategy method and the aggregator method can be combined in a single bean.

An implementation of the correlation strategy bean for the example above may be as follows:

public class PojoCorrelationStrategy {
...
  public Long groupNumbersByLastDigit(Long number) {
    return number % 10;
  }
}

For example, this aggregator would group numbers by some criterion (in our case the remainder after dividing by 10) and will hold the group until the sum of the numbers which represents the payload exceeds a certain value.

[Note]Note

Wherever it makes sense, the completion strategy method, correlation strategy method and the aggregator method can be combined in a single bean (all of them or any two).

11.5 Configuring an Aggregator with Annotations

An aggregator configured using annotations can look like this.

public class Waiter {
  ... 

  @Aggregator  
  public Delivery aggregatingMethod(List<OrderItem> items) {
    ...
  }

  @CompletionStrategy 
  public boolean completionChecker(List<Message<?>> messages) {
    ...
  }

  @CorrelationStrategy 
  public String correlateBy(OrderItem item) {
    ...
  }

}

1

An annotation indicating that this method shall be used as an aggregator. Must be specified if this class will be used as an aggregator.

2

An annotation indicating that this method shall be used as the completion strategy of an aggregator. If not present on any method, the aggregator will use the SequenceSizeCompletionStrategy.

3

An annotation indicating that this method shall be used as the correlation strategy of an aggregator. If no correlation strategy is indicated, the aggregator will use the HeaderAttributeCorrelationStrategy based on CORRELATION_ID.

All of the configuration options provided by the xml element are also available for the @Aggregator annotation.

The aggregator can be either referenced explicitly from XML or, if the @MessageEndpoint is defined on the class, detected automatically through classpath scanning.