11.4 Configuring an Aggregator with XML

Spring Integration supports the configuration of an aggregator via XML through the <aggregator/> element. A completely defined sample on how to define such an element is presented below, as well as it:

<channel id="inputChannel"/>

<aggregator id="completelyDefinedAggregator" 1
    input-channel="inputChannel" 2
    output-channel="outputChannel"  3
    discard-channel="discardChannel"  4
    ref="aggregatorBean" 5
    method="add" 6
    completion-strategy="completionStrategyBean"  7
    completion-strategy-method="checkCompleteness" 8
    timeout="42" 9
    send-partial-result-on-timeout="true" 10
    reaper-interval="135" 11
    tracked-correlation-id-capacity="99" 12
    send-timeout="86420000" 13 /> 

<channel id="outputChannel"/>

<bean id="aggregatorBean" class="sample.PojoAggregator"/>

<bean id="completionStrategyBean" class="sample.PojoCompletionStrategy"/>

1

The id of the aggregator is optional.

2

The input channel of the aggregator. Required.

3

The channel where the aggregator will send the aggregation results. Optional (because incoming messages can specify a reply channel themselves).

4

The channel where the aggregator will send the messages that timed out (if send-partial-results-on-timeout is false). Optional.

5

A reference to a bean defined in the application context. The bean must implement the aggregation logic as described above. Required.

6

A method defined on the bean referenced by ref, that implements the message aggregation algorithm. Optional, with restrictions (see above).

7

A reference to a bean that implements the decision algorithm as to whether a given message group is complete. The bean can be an implementation of the CompletionStrategy interface or a POJO. In the latter case the completion-strategy-method attribute must be defined as well. Optional (by default, the aggregator .

8

A method defined on the bean referenced by completion-strategy, that implements the completion decision algorithm. Optional, with restrictions (requires completion-strategy to be present).

9

The timeout for aggregating messages (counted from the arrival of the first message). Optional.

10

Whether upon the expiration of the timeout, the aggregator shall try to aggregate the already arrived messages. Optional (false by default).

11

The interval (in milliseconds) at which a reaper task is executed, checking if there are any timed out groups. Optional.

12

The capacity of the correlation id tracker. Remembers the already processed correlation ids, preventing the formation of new groups for messages that arrive after their group has been already processed (aggregated or discarded). Optional.

13

The timeout for sending out messages. Optional.

An implementation of the aggregator bean, for example, looks as follows:

public class PojoAggregator {

  public Long add(List<Long> results) {
    long total = 0l;
    for (long partialResult: results) {
      total += partialResult;
    }
    return total;
  }

}

An implementation of the completion strategy bean for the example above may be as follows:

public class PojoCompletionStrategy {
...
  public boolean checkCompleteness(List<Long> numbers) {
    int sum = 0;
    for (long number: numbers) {
      sum += number;
    }
    return sum >= maxValue;
  }
}

Wherever it makes sense, the completion strategy method and the aggregator method can be combined in a single bean.