11.4 Configuring an Aggregator with XML

Spring Integration supports the configuration of an aggregator via XML through the <aggregator/> element. Below you can see an example of an aggregator with all optional parameters defined.

<channel id="inputChannel"/>

<aggregator id="completelyDefinedAggregator" 1
    input-channel="inputChannel" 2
    output-channel="outputChannel"  3
    discard-channel="discardChannel"  4
    ref="aggregatorBean" 5
    method="add" 6
    completion-strategy="completionStrategyBean"  7
    completion-strategy-method="checkCompleteness" 8
    correlation-strategy="correlationStrategyBean" 9
    correlation-strategy-method="correlationStrategyMethod" 10
    timeout="42" 11
    send-partial-result-on-timeout="true" 12
    reaper-interval="135" 13
    tracked-correlation-id-capacity="99" 14
    send-timeout="86420000" 15 /> 

<channel id="outputChannel"/>

<bean id="aggregatorBean" class="sample.PojoAggregator"/>

<bean id="completionStrategyBean" class="sample.PojoCompletionStrategy"/>

1

The id of the aggregator is optional.

2

The input channel of the aggregator. Required.

3

The channel where the aggregator will send the aggregation results. Optional (because incoming messages can specify a reply channel themselves).

4

The channel where the aggregator will send the messages that timed out (if send-partial-results-on-timeout is false). Optional.

5

A reference to a bean defined in the application context. The bean must implement the aggregation logic as described above. Required.

6

A method defined on the bean referenced by ref, that implements the message aggregation algorithm. Optional, with restrictions (see above).

7

A reference to a bean that implements the decision algorithm as to whether a given message group is complete. The bean can be an implementation of the CompletionStrategy interface or a POJO. In the latter case the completion-strategy-method attribute must be defined as well. Optional (by default, the aggregator will use sequence size) .

8

A method defined on the bean referenced by completion-strategy, that implements the completion decision algorithm. Optional, with restrictions (requires completion-strategy to be present).

9

A reference to a bean that implements the correlation strategy. The bean can be an implementation of the CorrelationStrategy interface or a POJO. In the latter case the correlation-strategy-method attribute must be defined as well. Optional (by default, the aggregator will use the correlation id header attribute) .

10

A method defined on the bean referenced by correlation-strategy, that implements the completion decision algorithm. Optional, with restrictions (requires correlation-strategy to be present).

11

The timeout for aggregating messages (counted from the arrival of the first message). Optional.

12

Whether upon the expiration of the timeout, the aggregator shall try to aggregate the already arrived messages. Optional (false by default).

13

The interval (in milliseconds) at which a reaper task is executed, checking if there are any timed out groups. Optional.

14

The capacity of the correlation id tracker. Remembers the already processed correlation ids, preventing the formation of new groups for messages that arrive after their group has been already processed (aggregated or discarded). Optional.

15

The timeout for sending out messages. Optional.

An implementation of the aggregator bean, for example, looks as follows:

public class PojoAggregator {

  public Long add(List<Long> results) {
    long total = 0l;
    for (long partialResult: results) {
      total += partialResult;
    }
    return total;
  }

}

An implementation of the completion strategy bean for the example above may be as follows:

public class PojoCompletionStrategy {
...
  public boolean checkCompleteness(List<Long> numbers) {
    int sum = 0;
    for (long number: numbers) {
      sum += number;
    }
    return sum >= maxValue;
  }
}

Wherever it makes sense, the completion strategy method and the aggregator method can be combined in a single bean.

An implementation of the correlation strategy bean for the example above may be as follows:

public class PojoCorrelationStrategy {
...
  public Long groupsNumbersByLastDigit(Long number) {
    return number % 10;
  }
}

For example, this aggregator would group numbers by some criterion (in our case the remainder by dividing to 10) and will hold on the group until the sum of the numbers which represents the payload exceeds a certain value.

Wherever it makes sense, the completion strategy method, correlation strategy method and the aggregator method can be combined in a single bean (all of them or any two).