org.springframework.integration.router
Class AggregatingMessageHandler

java.lang.Object
  extended by org.springframework.integration.router.AggregatingMessageHandler
All Implemented Interfaces:
org.springframework.beans.factory.InitializingBean, MessageHandler

public class AggregatingMessageHandler
extends java.lang.Object
implements MessageHandler, org.springframework.beans.factory.InitializingBean

A MessageHandler implementation that waits for a complete group of Messages to arrive and then delegates to an Aggregator to combine them into a single Message.

Each Message that is received by this handler will be associated with a group based upon the 'correlationId' property of its header. If no such property is available, a MessageHandlingException will be thrown.

The default strategy for determining whether a group is complete is based on the 'sequenceSize' property of the header. Alternatively, a custom implementation of the CompletionStrategy may be provided.

The 'timeout' value determines how long to wait for the complete group after the arrival of the first Message of the group. The default value is 1 minute. If the timeout elapses prior to completion, then Messages with that timed-out 'correlationId' will be sent to the 'discardChannel' if provided.

Author:
Mark Fisher, Marius Bogoevici

Nested Class Summary
private  class AggregatingMessageHandler.ReaperTask
           
 
Field Summary
private  Aggregator aggregator
           
private  java.util.concurrent.ConcurrentMap<java.lang.Object,AggregationBarrier> barriers
           
private  CompletionStrategy completionStrategy
           
static long DEFAULT_REAPER_INTERVAL
           
static long DEFAULT_SEND_TIMEOUT
           
static long DEFAULT_TIMEOUT
           
static int DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY
           
private  MessageChannel defaultReplyChannel
           
private  MessageChannel discardChannel
           
private  java.util.concurrent.ScheduledExecutorService executor
           
private  boolean initialized
           
private  org.apache.commons.logging.Log logger
           
private  long reaperInterval
           
private  boolean sendPartialResultOnTimeout
           
private  long sendTimeout
           
private  long timeout
           
private  int trackedCorrelationIdCapacity
           
private  java.util.concurrent.BlockingQueue<java.lang.Object> trackedCorrelationIds
           
 
Constructor Summary
AggregatingMessageHandler(Aggregator aggregator)
           
AggregatingMessageHandler(Aggregator aggregator, java.util.concurrent.ScheduledExecutorService executor)
          Create a handler that delegates to the provided aggregator to combine a group of messages into a single message.
 
Method Summary
 void afterPropertiesSet()
          Initialize this handler.
private  void aggregationCompleted(java.lang.Object correlationId, java.util.List<Message<?>> messages)
           
 Message<?> handle(Message<?> message)
           
private  void removeBarrier(java.lang.Object correlationId)
           
private  MessageChannel resolveReplyChannelFromMessage(Message<?> message)
           
private  void sendToDiscardChannelIfAvailable(Message<?> message)
           
 void setCompletionStrategy(CompletionStrategy completionStrategy)
          Strategy to determine whether the group of messages is complete.
 void setDefaultReplyChannel(MessageChannel defaultReplyChannel)
          Set the default channel for sending aggregated Messages.
 void setDiscardChannel(MessageChannel discardChannel)
          Specify a channel for sending Messages that arrive after their aggregation group has either completed or timed-out.
 void setReaperInterval(long reaperInterval)
          Set the interval in milliseconds for the reaper thread.
 void setSendPartialResultOnTimeout(boolean sendPartialResultOnTimeout)
          Specify whether to aggregate and send the resulting Message when the timeout elapses prior to the CompletionStrategy.
 void setSendTimeout(long sendTimeout)
          Set the timeout for sending aggregation results and discarded Messages.
 void setTimeout(long timeout)
          Maximum time to wait (in milliseconds) for the completion strategy to become true.
 void setTrackedCorrelationIdCapacity(int trackedCorrelationIdCapacity)
          Set the number of completed correlationIds to track.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

DEFAULT_SEND_TIMEOUT

public static final long DEFAULT_SEND_TIMEOUT
See Also:
Constant Field Values

DEFAULT_TIMEOUT

public static final long DEFAULT_TIMEOUT
See Also:
Constant Field Values

DEFAULT_REAPER_INTERVAL

public static final long DEFAULT_REAPER_INTERVAL
See Also:
Constant Field Values

DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY

public static final int DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY
See Also:
Constant Field Values

logger

private final org.apache.commons.logging.Log logger

aggregator

private final Aggregator aggregator

defaultReplyChannel

private volatile MessageChannel defaultReplyChannel

discardChannel

private volatile MessageChannel discardChannel

sendTimeout

private volatile long sendTimeout

completionStrategy

private volatile CompletionStrategy completionStrategy

barriers

private final java.util.concurrent.ConcurrentMap<java.lang.Object,AggregationBarrier> barriers

timeout

private volatile long timeout

sendPartialResultOnTimeout

private volatile boolean sendPartialResultOnTimeout

reaperInterval

private volatile long reaperInterval

trackedCorrelationIdCapacity

private volatile int trackedCorrelationIdCapacity

trackedCorrelationIds

private volatile java.util.concurrent.BlockingQueue<java.lang.Object> trackedCorrelationIds

executor

private final java.util.concurrent.ScheduledExecutorService executor

initialized

private volatile boolean initialized
Constructor Detail

AggregatingMessageHandler

public AggregatingMessageHandler(Aggregator aggregator,
                                 java.util.concurrent.ScheduledExecutorService executor)
Create a handler that delegates to the provided aggregator to combine a group of messages into a single message. The executor will be used for scheduling a background maintenance thread. If null, a new single-threaded executor will be created.


AggregatingMessageHandler

public AggregatingMessageHandler(Aggregator aggregator)
Method Detail

setDefaultReplyChannel

public void setDefaultReplyChannel(MessageChannel defaultReplyChannel)
Set the default channel for sending aggregated Messages. Note that precedence will be given to the 'returnAddress' of the aggregated message itself, then to the 'returnAddress' of the original message.


setDiscardChannel

public void setDiscardChannel(MessageChannel discardChannel)
Specify a channel for sending Messages that arrive after their aggregation group has either completed or timed-out.


setSendTimeout

public void setSendTimeout(long sendTimeout)
Set the timeout for sending aggregation results and discarded Messages.


setSendPartialResultOnTimeout

public void setSendPartialResultOnTimeout(boolean sendPartialResultOnTimeout)
Specify whether to aggregate and send the resulting Message when the timeout elapses prior to the CompletionStrategy.


setReaperInterval

public void setReaperInterval(long reaperInterval)
Set the interval in milliseconds for the reaper thread. Default is 1000.


setTrackedCorrelationIdCapacity

public void setTrackedCorrelationIdCapacity(int trackedCorrelationIdCapacity)
Set the number of completed correlationIds to track. Default is 1000.


afterPropertiesSet

public void afterPropertiesSet()
Initialize this handler.

Specified by:
afterPropertiesSet in interface org.springframework.beans.factory.InitializingBean

setCompletionStrategy

public void setCompletionStrategy(CompletionStrategy completionStrategy)
Strategy to determine whether the group of messages is complete.


setTimeout

public void setTimeout(long timeout)
Maximum time to wait (in milliseconds) for the completion strategy to become true. The default is 60000 (1 minute).


handle

public Message<?> handle(Message<?> message)
Specified by:
handle in interface MessageHandler

sendToDiscardChannelIfAvailable

private void sendToDiscardChannelIfAvailable(Message<?> message)

aggregationCompleted

private void aggregationCompleted(java.lang.Object correlationId,
                                  java.util.List<Message<?>> messages)

removeBarrier

private void removeBarrier(java.lang.Object correlationId)

resolveReplyChannelFromMessage

private MessageChannel resolveReplyChannelFromMessage(Message<?> message)