org.springframework.integration.aggregator
Class AbstractMessageBarrierHandler

java.lang.Object
  extended by org.springframework.integration.handler.AbstractMessageHandler
      extended by org.springframework.integration.aggregator.AbstractMessageBarrierHandler
All Implemented Interfaces:
org.springframework.beans.factory.InitializingBean, MessageHandler, TaskSchedulerAware
Direct Known Subclasses:
AbstractMessageAggregator, Resequencer

public abstract class AbstractMessageBarrierHandler
extends AbstractMessageHandler
implements TaskSchedulerAware, org.springframework.beans.factory.InitializingBean

Base class for MessageBarrier-based Message Handlers. A MessageHandler implementation that waits for a group of Messages to arrive and processes them together. Uses a MessageBarrier to store messages and to decide how the messages should be released.

Each Message that is received by this handler will be associated with a group based upon the 'correlationId' property of its header. If no such property is available, a MessageHandlingException will be thrown.

The 'timeout' value determines how long to wait for the complete group after the arrival of the first Message of the group. The default value is 1 minute. If the timeout elapses prior to completion, then Messages with that timed-out 'correlationId' will be sent to the 'discardChannel' if provided unless 'sendPartialResultsOnTimeout' is set to true in which case the incomplete group will be sent to the output channel.

Author:
Mark Fisher, Marius Bogoevici

Field Summary
protected  java.util.concurrent.ConcurrentMap<java.lang.Object,MessageBarrier> barriers
           
static long DEFAULT_REAPER_INTERVAL
           
static long DEFAULT_SEND_TIMEOUT
           
static long DEFAULT_TIMEOUT
           
static int DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY
           
protected  org.apache.commons.logging.Log logger
           
protected  java.util.concurrent.BlockingQueue<java.lang.Object> trackedCorrelationIds
           
 
Constructor Summary
AbstractMessageBarrierHandler()
           
 
Method Summary
 void afterPropertiesSet()
           
protected abstract  MessageBarrier createMessageBarrier()
          Factory method for creating a suitable MessageBarrier implementation.
protected  void handleMessageInternal(Message<?> message)
           
protected abstract  boolean isBarrierRemovable(java.lang.Object correlationId, java.util.List<Message<?>> releasedMessages)
          Implements the logic for deciding whether, based on what the MessageBarrier has released so far, work for the correlationId can be considered complete and the barrier can be released.
 boolean isRunning()
           
protected abstract  Message<?>[] processReleasedMessages(java.lang.Object correlationId, java.util.List<Message<?>> messages)
          Implements the logic for transforming the released Messages.
protected  MessageChannel resolveReplyChannelFromMessage(Message<?> message)
           
 void setDiscardChannel(MessageChannel discardChannel)
          Specify a channel for sending Messages that arrive after their aggregation group has either completed or timed-out.
 void setOutputChannel(MessageChannel outputChannel)
           
 void setReaperInterval(long reaperInterval)
          Set the interval in milliseconds for the reaper thread.
 void setSendPartialResultOnTimeout(boolean sendPartialResultOnTimeout)
          Specify whether to aggregate and send the resulting Message when the timeout elapses prior to the CompletionStrategy returning true.
 void setSendTimeout(long sendTimeout)
           
 void setTaskScheduler(TaskScheduler taskScheduler)
           
 void setTimeout(long timeout)
          Maximum time to wait (in milliseconds) for the completion strategy to become true.
 void setTrackedCorrelationIdCapacity(int trackedCorrelationIdCapacity)
          Set the number of completed correlationIds to track.
 void start()
           
 void stop()
           
 
Methods inherited from class org.springframework.integration.handler.AbstractMessageHandler
handleMessage
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

DEFAULT_SEND_TIMEOUT

public static final long DEFAULT_SEND_TIMEOUT
See Also:
Constant Field Values

DEFAULT_TIMEOUT

public static final long DEFAULT_TIMEOUT
See Also:
Constant Field Values

DEFAULT_REAPER_INTERVAL

public static final long DEFAULT_REAPER_INTERVAL
See Also:
Constant Field Values

DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY

public static final int DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY
See Also:
Constant Field Values

logger

protected final org.apache.commons.logging.Log logger

barriers

protected final java.util.concurrent.ConcurrentMap<java.lang.Object,MessageBarrier> barriers

trackedCorrelationIds

protected volatile java.util.concurrent.BlockingQueue<java.lang.Object> trackedCorrelationIds
Constructor Detail

AbstractMessageBarrierHandler

public AbstractMessageBarrierHandler()
Method Detail

setOutputChannel

public void setOutputChannel(MessageChannel outputChannel)

setDiscardChannel

public void setDiscardChannel(MessageChannel discardChannel)
Specify a channel for sending Messages that arrive after their aggregation group has either completed or timed-out.


setSendPartialResultOnTimeout

public void setSendPartialResultOnTimeout(boolean sendPartialResultOnTimeout)
Specify whether to aggregate and send the resulting Message when the timeout elapses prior to the CompletionStrategy returning true.


setReaperInterval

public void setReaperInterval(long reaperInterval)
Set the interval in milliseconds for the reaper thread. Default is 1000.


setTrackedCorrelationIdCapacity

public void setTrackedCorrelationIdCapacity(int trackedCorrelationIdCapacity)
Set the number of completed correlationIds to track. Default is 1000.


setTimeout

public void setTimeout(long timeout)
Maximum time to wait (in milliseconds) for the completion strategy to become true. The default is 60000 (1 minute).


setSendTimeout

public void setSendTimeout(long sendTimeout)

setTaskScheduler

public void setTaskScheduler(TaskScheduler taskScheduler)
Specified by:
setTaskScheduler in interface TaskSchedulerAware

afterPropertiesSet

public void afterPropertiesSet()
Specified by:
afterPropertiesSet in interface org.springframework.beans.factory.InitializingBean

isRunning

public boolean isRunning()

start

public void start()

stop

public void stop()

handleMessageInternal

protected final void handleMessageInternal(Message<?> message)
Specified by:
handleMessageInternal in class AbstractMessageHandler

resolveReplyChannelFromMessage

protected MessageChannel resolveReplyChannelFromMessage(Message<?> message)

createMessageBarrier

protected abstract MessageBarrier createMessageBarrier()
Factory method for creating a suitable MessageBarrier implementation.


isBarrierRemovable

protected abstract boolean isBarrierRemovable(java.lang.Object correlationId,
                                              java.util.List<Message<?>> releasedMessages)
Implements the logic for deciding whether, based on what the MessageBarrier has released so far, work for the correlationId can be considered complete and the barrier can be released.


processReleasedMessages

protected abstract Message<?>[] processReleasedMessages(java.lang.Object correlationId,
                                                        java.util.List<Message<?>> messages)
Implements the logic for transforming the released Messages.