org.springframework.integration.aggregator
Class AbstractMessageBarrierHandler

java.lang.Object
  extended by org.springframework.integration.aggregator.AbstractMessageBarrierHandler
All Implemented Interfaces:
org.springframework.beans.factory.InitializingBean, MessageHandler
Direct Known Subclasses:
AggregatingMessageHandler, ResequencingMessageHandler

public abstract class AbstractMessageBarrierHandler
extends java.lang.Object
implements MessageHandler, org.springframework.beans.factory.InitializingBean

Base class for MessageBarrier-based MessageHandlers. A MessageHandler implementation that waits for a group of Messages to arrive and processes them together. Uses a MessageBarrier to store messages and to decide how the messages should be released.

Each Message that is received by this handler will be associated with a group based upon the 'correlationId' property of its header. If no such property is available, a MessageHandlingException will be thrown.

The 'timeout' value determines how long to wait for the complete group after the arrival of the first Message of the group. The default value is 1 minute. If the timeout elapses prior to completion, then Messages with that timed-out 'correlationId' will be sent to the 'discardChannel' if provided.

Author:
Mark Fisher, Marius Bogoevici

Nested Class Summary
private  class AbstractMessageBarrierHandler.ReaperTask
           
 
Field Summary
protected  java.util.concurrent.ConcurrentMap<java.lang.Object,MessageBarrier> barriers
           
static long DEFAULT_REAPER_INTERVAL
           
static long DEFAULT_SEND_TIMEOUT
           
static long DEFAULT_TIMEOUT
           
static int DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY
           
private  MessageChannel discardChannel
           
protected  java.util.concurrent.ScheduledExecutorService executor
           
private  boolean initialized
           
protected  org.apache.commons.logging.Log logger
           
protected  MessageChannel outputChannel
           
private  long reaperInterval
           
private  boolean sendPartialResultOnTimeout
           
protected  long sendTimeout
           
private  long timeout
           
private  int trackedCorrelationIdCapacity
           
protected  java.util.concurrent.BlockingQueue<java.lang.Object> trackedCorrelationIds
           
 
Constructor Summary
AbstractMessageBarrierHandler(java.util.concurrent.ScheduledExecutorService executor)
           
 
Method Summary
 void afterPropertiesSet()
          Initialize this handler.
private  void afterRelease(java.lang.Object correlationId, java.util.List<Message<?>> releasedMessages)
           
protected abstract  MessageBarrier createMessageBarrier()
          Factory method for creating a suitable MessageBarrier implementation.
 Message<?> handle(Message<?> message)
           
protected abstract  boolean isBarrierRemovable(java.lang.Object correlationId, java.util.List<Message<?>> releasedMessages)
          Implements the logic for deciding whether, based on what the MessageBarrier has released so far, work for the correlationId can be considered done and the barrier can be released.
protected abstract  Message<?>[] processReleasedMessages(java.lang.Object correlationId, java.util.List<Message<?>> messages)
          Implements the logic for transforming the released Messages.
private  void removeBarrier(java.lang.Object correlationId)
           
protected  MessageTarget resolveReplyTargetFromMessage(Message<?> message)
           
private  void sendToDiscardChannelIfAvailable(Message<?> message)
           
 void setDiscardChannel(MessageChannel discardChannel)
          Specify a channel for sending Messages that arrive after their aggregation group has either completed or timed-out.
 void setOutputChannel(MessageChannel outputChannel)
          Set the output channel for sending aggregated Messages.
 void setReaperInterval(long reaperInterval)
          Set the interval in milliseconds for the reaper thread.
 void setSendPartialResultOnTimeout(boolean sendPartialResultOnTimeout)
          Specify whether to aggregate and send the resulting Message when the timeout elapses prior to the CompletionStrategy.
 void setSendTimeout(long sendTimeout)
          Set the timeout for sending aggregation results and discarded Messages.
 void setTimeout(long timeout)
          Maximum time to wait (in milliseconds) for the completion strategy to become true.
 void setTrackedCorrelationIdCapacity(int trackedCorrelationIdCapacity)
          Set the number of completed correlationIds to track.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

DEFAULT_SEND_TIMEOUT

public static final long DEFAULT_SEND_TIMEOUT
See Also:
Constant Field Values

DEFAULT_TIMEOUT

public static final long DEFAULT_TIMEOUT
See Also:
Constant Field Values

DEFAULT_REAPER_INTERVAL

public static final long DEFAULT_REAPER_INTERVAL
See Also:
Constant Field Values

DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY

public static final int DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY
See Also:
Constant Field Values

logger

protected final org.apache.commons.logging.Log logger

outputChannel

protected volatile MessageChannel outputChannel

discardChannel

private volatile MessageChannel discardChannel

sendTimeout

protected volatile long sendTimeout

barriers

protected final java.util.concurrent.ConcurrentMap<java.lang.Object,MessageBarrier> barriers

timeout

private volatile long timeout

sendPartialResultOnTimeout

private volatile boolean sendPartialResultOnTimeout

reaperInterval

private volatile long reaperInterval

trackedCorrelationIdCapacity

private volatile int trackedCorrelationIdCapacity

trackedCorrelationIds

protected volatile java.util.concurrent.BlockingQueue<java.lang.Object> trackedCorrelationIds

executor

protected final java.util.concurrent.ScheduledExecutorService executor

initialized

private volatile boolean initialized
Constructor Detail

AbstractMessageBarrierHandler

public AbstractMessageBarrierHandler(java.util.concurrent.ScheduledExecutorService executor)
Method Detail

setOutputChannel

public void setOutputChannel(MessageChannel outputChannel)
Set the output channel for sending aggregated Messages. Note that precedence will be given to the 'returnAddress' of the aggregated message itself, then to the 'returnAddress' of the original message.


setDiscardChannel

public void setDiscardChannel(MessageChannel discardChannel)
Specify a channel for sending Messages that arrive after their aggregation group has either completed or timed-out.


setSendTimeout

public void setSendTimeout(long sendTimeout)
Set the timeout for sending aggregation results and discarded Messages.


setSendPartialResultOnTimeout

public void setSendPartialResultOnTimeout(boolean sendPartialResultOnTimeout)
Specify whether to aggregate and send the resulting Message when the timeout elapses prior to the CompletionStrategy.


setReaperInterval

public void setReaperInterval(long reaperInterval)
Set the interval in milliseconds for the reaper thread. Default is 1000.


setTrackedCorrelationIdCapacity

public void setTrackedCorrelationIdCapacity(int trackedCorrelationIdCapacity)
Set the number of completed correlationIds to track. Default is 1000.


afterPropertiesSet

public void afterPropertiesSet()
Initialize this handler.

Specified by:
afterPropertiesSet in interface org.springframework.beans.factory.InitializingBean

setTimeout

public void setTimeout(long timeout)
Maximum time to wait (in milliseconds) for the completion strategy to become true. The default is 60000 (1 minute).


handle

public Message<?> handle(Message<?> message)
Specified by:
handle in interface MessageHandler

afterRelease

private void afterRelease(java.lang.Object correlationId,
                          java.util.List<Message<?>> releasedMessages)

sendToDiscardChannelIfAvailable

private void sendToDiscardChannelIfAvailable(Message<?> message)

resolveReplyTargetFromMessage

protected MessageTarget resolveReplyTargetFromMessage(Message<?> message)

removeBarrier

private void removeBarrier(java.lang.Object correlationId)

createMessageBarrier

protected abstract MessageBarrier createMessageBarrier()
Factory method for creating a suitable MessageBarrier implementation.


isBarrierRemovable

protected abstract boolean isBarrierRemovable(java.lang.Object correlationId,
                                              java.util.List<Message<?>> releasedMessages)
Implements the logic for deciding whether, based on what the MessageBarrier has released so far, work for the correlationId can be considered done and the barrier can be released.


processReleasedMessages

protected abstract Message<?>[] processReleasedMessages(java.lang.Object correlationId,
                                                        java.util.List<Message<?>> messages)
Implements the logic for transforming the released Messages.