org.springframework.integration.aggregator
Class AbstractMessageBarrierHandler<T extends java.util.Collection<? extends Message<?>>>

java.lang.Object
  extended by org.springframework.integration.handler.AbstractMessageHandler
      extended by org.springframework.integration.aggregator.AbstractMessageBarrierHandler<T>
All Implemented Interfaces:
org.springframework.beans.factory.BeanFactoryAware, org.springframework.beans.factory.InitializingBean, org.springframework.core.Ordered, MessageHandler
Direct Known Subclasses:
AbstractMessageAggregator, Resequencer

public abstract class AbstractMessageBarrierHandler<T extends java.util.Collection<? extends Message<?>>>
extends AbstractMessageHandler
implements org.springframework.beans.factory.BeanFactoryAware, org.springframework.beans.factory.InitializingBean

Base class for MessageBarrier-based Message Handlers. A MessageHandler implementation that waits for a group of Messages to arrive and processes them together. Uses a MessageBarrier to store messages and to decide how the messages should be released.

Each Message that is received by this handler will be associated with a group based upon the 'correlationId' property of its header. If no such property is available, a MessageHandlingException will be thrown.

The 'timeout' value determines how long to wait for the complete group after the arrival of the first Message of the group. The default value is 1 minute. If the timeout elapses prior to completion, then Messages with that timed-out 'correlationId' will be sent to the 'discardChannel' if provided unless 'sendPartialResultsOnTimeout' is set to true in which case the incomplete group will be sent to the output channel.

Subclasses must decide what kind of a Collection they want to use. The semantics of adding a Message to the MessageBarrier will be decided by the Collection type.

Note: this class is not part of the Spring Integration API, but an internal class, used for implementing components that need to keep a list of messages until they are ready to be released or processed (e.g. Resequencer or Aggregator). As such it is subject to change in future versions.

Author:
Mark Fisher, Marius Bogoevici

Field Summary
protected  java.util.concurrent.ConcurrentMap<java.lang.Object,MessageBarrier<T>> barriers
           
static long DEFAULT_REAPER_INTERVAL
           
static long DEFAULT_SEND_TIMEOUT
           
static long DEFAULT_TIMEOUT
           
static int DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY
           
protected  org.apache.commons.logging.Log logger
           
protected  java.util.concurrent.BlockingQueue<java.lang.Object> trackedCorrelationIds
           
 
Fields inherited from interface org.springframework.core.Ordered
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCE
 
Constructor Summary
AbstractMessageBarrierHandler()
           
 
Method Summary
 void afterPropertiesSet()
           
protected  boolean canAddMessage(Message<?> message, MessageBarrier<T> barrier)
          Verifies that a message can be added to the barrier.
protected abstract  MessageBarrier<T> createMessageBarrier(java.lang.Object correlationKey)
          Factory method for creating a MessageBarrier implementation.
protected  void discardBarrier(MessageBarrier<T> barrier)
          A method for discarding the content of the message barrier.
protected  void handleMessageInternal(Message<?> message)
           
 boolean isRunning()
           
protected abstract  void processBarrier(MessageBarrier<T> barrier)
          A method for processing the information in the message barrier after a message has been added or on pruning.
protected  void removeBarrier(java.lang.Object correlationId)
           
protected  MessageChannel resolveReplyChannelFromMessage(Message<?> message)
           
protected  void sendReplies(java.util.Collection<Message<?>> messages, MessageChannel defaultReplyChannel)
           
protected  void sendReply(Message<?> message, MessageChannel defaultReplyChannel)
           
 void setAutoStartup(boolean autoStartup)
           
 void setBeanFactory(org.springframework.beans.factory.BeanFactory beanFactory)
           
 void setCorrelationStrategy(CorrelationStrategy correlationStrategy)
           
 void setDiscardChannel(MessageChannel discardChannel)
          Specify a channel for sending Messages that arrive after their aggregation group has either completed or timed-out.
 void setOutputChannel(MessageChannel outputChannel)
           
 void setReaperInterval(long reaperInterval)
          Set the interval in milliseconds for the reaper thread.
 void setSendPartialResultOnTimeout(boolean sendPartialResultOnTimeout)
          Specify whether to aggregate and send the resulting Message when the timeout elapses prior to the CompletionStrategy returning true.
 void setSendTimeout(long sendTimeout)
           
 void setTaskScheduler(TaskScheduler taskScheduler)
           
 void setTimeout(long timeout)
          Maximum time to wait (in milliseconds) for the completion strategy to become true.
 void setTrackedCorrelationIdCapacity(int trackedCorrelationIdCapacity)
          Set the number of completed correlationIds to track.
 void start()
           
 void stop()
           
 
Methods inherited from class org.springframework.integration.handler.AbstractMessageHandler
getOrder, handleMessage, setOrder
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

DEFAULT_SEND_TIMEOUT

public static final long DEFAULT_SEND_TIMEOUT
See Also:
Constant Field Values

DEFAULT_TIMEOUT

public static final long DEFAULT_TIMEOUT
See Also:
Constant Field Values

DEFAULT_REAPER_INTERVAL

public static final long DEFAULT_REAPER_INTERVAL
See Also:
Constant Field Values

DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY

public static final int DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY
See Also:
Constant Field Values

logger

protected final org.apache.commons.logging.Log logger

barriers

protected final java.util.concurrent.ConcurrentMap<java.lang.Object,MessageBarrier<T extends java.util.Collection<? extends Message<?>>>> barriers

trackedCorrelationIds

protected volatile java.util.concurrent.BlockingQueue<java.lang.Object> trackedCorrelationIds
Constructor Detail

AbstractMessageBarrierHandler

public AbstractMessageBarrierHandler()
Method Detail

setOutputChannel

public void setOutputChannel(MessageChannel outputChannel)

setDiscardChannel

public void setDiscardChannel(MessageChannel discardChannel)
Specify a channel for sending Messages that arrive after their aggregation group has either completed or timed-out.


setSendPartialResultOnTimeout

public void setSendPartialResultOnTimeout(boolean sendPartialResultOnTimeout)
Specify whether to aggregate and send the resulting Message when the timeout elapses prior to the CompletionStrategy returning true.


setReaperInterval

public void setReaperInterval(long reaperInterval)
Set the interval in milliseconds for the reaper thread. Default is 1000.


setTrackedCorrelationIdCapacity

public void setTrackedCorrelationIdCapacity(int trackedCorrelationIdCapacity)
Set the number of completed correlationIds to track. Default is 1000.


setTimeout

public void setTimeout(long timeout)
Maximum time to wait (in milliseconds) for the completion strategy to become true. The default is 60000 (1 minute).


setSendTimeout

public void setSendTimeout(long sendTimeout)

setTaskScheduler

public void setTaskScheduler(TaskScheduler taskScheduler)

setAutoStartup

public void setAutoStartup(boolean autoStartup)

setBeanFactory

public void setBeanFactory(org.springframework.beans.factory.BeanFactory beanFactory)
Specified by:
setBeanFactory in interface org.springframework.beans.factory.BeanFactoryAware

setCorrelationStrategy

public void setCorrelationStrategy(CorrelationStrategy correlationStrategy)

afterPropertiesSet

public final void afterPropertiesSet()
Specified by:
afterPropertiesSet in interface org.springframework.beans.factory.InitializingBean

isRunning

public boolean isRunning()

start

public void start()

stop

public void stop()

handleMessageInternal

protected final void handleMessageInternal(Message<?> message)
Specified by:
handleMessageInternal in class AbstractMessageHandler

sendReplies

protected final void sendReplies(java.util.Collection<Message<?>> messages,
                                 MessageChannel defaultReplyChannel)

sendReply

protected final void sendReply(Message<?> message,
                               MessageChannel defaultReplyChannel)

resolveReplyChannelFromMessage

protected final MessageChannel resolveReplyChannelFromMessage(Message<?> message)

removeBarrier

protected final void removeBarrier(java.lang.Object correlationId)

canAddMessage

protected boolean canAddMessage(Message<?> message,
                                MessageBarrier<T> barrier)
Verifies that a message can be added to the barrier. To be overridden by subclasses, which may add their own verifications. Subclasses overriding this method must call the method from the superclass.


createMessageBarrier

protected abstract MessageBarrier<T> createMessageBarrier(java.lang.Object correlationKey)
Factory method for creating a MessageBarrier implementation.


processBarrier

protected abstract void processBarrier(MessageBarrier<T> barrier)
A method for processing the information in the message barrier after a message has been added or on pruning. The decision as to whether the messages from the MessageBarrier can be released normally belongs here, although calling code may forcibly set the MessageBarrier's 'complete' flag to true before invoking the method.

Parameters:
barrier - the MessageBarrier to be processed

discardBarrier

protected void discardBarrier(MessageBarrier<T> barrier)
A method for discarding the content of the message barrier. Can be overridden by subclasses.

Parameters:
entry -
barrier -