|
|||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Objectorg.springframework.integration.aggregator.AbstractMessageBarrierHandler
public abstract class AbstractMessageBarrierHandler
Base class for MessageBarrier
-based MessageHandlers.
A MessageHandler
implementation that waits for a group of
Messages
to arrive and processes them together.
Uses a MessageBarrier
to store messages and to decide how
the messages should be released.
Each Message
that is received by this handler will be associated with
a group based upon the 'correlationId
' property of its
header. If no such property is available, a MessageHandlingException
will be thrown.
The 'timeout
' value determines how long to wait for the
complete group after the arrival of the first Message
of the group.
The default value is 1 minute. If the timeout elapses prior to completion,
then Messages with that timed-out 'correlationId' will be sent to the
'discardChannel' if provided.
Nested Class Summary | |
---|---|
private class |
AbstractMessageBarrierHandler.ReaperTask
|
Field Summary | |
---|---|
protected java.util.concurrent.ConcurrentMap<java.lang.Object,MessageBarrier> |
barriers
|
static long |
DEFAULT_REAPER_INTERVAL
|
static long |
DEFAULT_SEND_TIMEOUT
|
static long |
DEFAULT_TIMEOUT
|
static int |
DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY
|
private MessageChannel |
discardChannel
|
protected java.util.concurrent.ScheduledExecutorService |
executor
|
private boolean |
initialized
|
protected org.apache.commons.logging.Log |
logger
|
protected MessageChannel |
outputChannel
|
private long |
reaperInterval
|
private boolean |
sendPartialResultOnTimeout
|
protected long |
sendTimeout
|
private long |
timeout
|
private int |
trackedCorrelationIdCapacity
|
protected java.util.concurrent.BlockingQueue<java.lang.Object> |
trackedCorrelationIds
|
Constructor Summary | |
---|---|
AbstractMessageBarrierHandler(java.util.concurrent.ScheduledExecutorService executor)
|
Method Summary | |
---|---|
void |
afterPropertiesSet()
Initialize this handler. |
private void |
afterRelease(java.lang.Object correlationId,
java.util.List<Message<?>> releasedMessages)
|
protected abstract MessageBarrier |
createMessageBarrier()
Factory method for creating a suitable MessageBarrier implementation. |
Message<?> |
handle(Message<?> message)
|
protected abstract boolean |
isBarrierRemovable(java.lang.Object correlationId,
java.util.List<Message<?>> releasedMessages)
Implements the logic for deciding whether, based on what the MessageBarrier has released so far, work for the correlationId can be considered done and the barrier can be released. |
protected abstract Message<?>[] |
processReleasedMessages(java.lang.Object correlationId,
java.util.List<Message<?>> messages)
Implements the logic for transforming the released Messages. |
private void |
removeBarrier(java.lang.Object correlationId)
|
protected MessageTarget |
resolveReplyTargetFromMessage(Message<?> message)
|
private void |
sendToDiscardChannelIfAvailable(Message<?> message)
|
void |
setDiscardChannel(MessageChannel discardChannel)
Specify a channel for sending Messages that arrive after their aggregation group has either completed or timed-out. |
void |
setOutputChannel(MessageChannel outputChannel)
Set the output channel for sending aggregated Messages. |
void |
setReaperInterval(long reaperInterval)
Set the interval in milliseconds for the reaper thread. |
void |
setSendPartialResultOnTimeout(boolean sendPartialResultOnTimeout)
Specify whether to aggregate and send the resulting Message when the timeout elapses prior to the CompletionStrategy. |
void |
setSendTimeout(long sendTimeout)
Set the timeout for sending aggregation results and discarded Messages. |
void |
setTimeout(long timeout)
Maximum time to wait (in milliseconds) for the completion strategy to become true. |
void |
setTrackedCorrelationIdCapacity(int trackedCorrelationIdCapacity)
Set the number of completed correlationIds to track. |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Field Detail |
---|
public static final long DEFAULT_SEND_TIMEOUT
public static final long DEFAULT_TIMEOUT
public static final long DEFAULT_REAPER_INTERVAL
public static final int DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY
protected final org.apache.commons.logging.Log logger
protected volatile MessageChannel outputChannel
private volatile MessageChannel discardChannel
protected volatile long sendTimeout
protected final java.util.concurrent.ConcurrentMap<java.lang.Object,MessageBarrier> barriers
private volatile long timeout
private volatile boolean sendPartialResultOnTimeout
private volatile long reaperInterval
private volatile int trackedCorrelationIdCapacity
protected volatile java.util.concurrent.BlockingQueue<java.lang.Object> trackedCorrelationIds
protected final java.util.concurrent.ScheduledExecutorService executor
private volatile boolean initialized
Constructor Detail |
---|
public AbstractMessageBarrierHandler(java.util.concurrent.ScheduledExecutorService executor)
Method Detail |
---|
public void setOutputChannel(MessageChannel outputChannel)
public void setDiscardChannel(MessageChannel discardChannel)
public void setSendTimeout(long sendTimeout)
public void setSendPartialResultOnTimeout(boolean sendPartialResultOnTimeout)
public void setReaperInterval(long reaperInterval)
public void setTrackedCorrelationIdCapacity(int trackedCorrelationIdCapacity)
public void afterPropertiesSet()
afterPropertiesSet
in interface org.springframework.beans.factory.InitializingBean
public void setTimeout(long timeout)
public Message<?> handle(Message<?> message)
handle
in interface MessageHandler
private void afterRelease(java.lang.Object correlationId, java.util.List<Message<?>> releasedMessages)
private void sendToDiscardChannelIfAvailable(Message<?> message)
protected MessageTarget resolveReplyTargetFromMessage(Message<?> message)
private void removeBarrier(java.lang.Object correlationId)
protected abstract MessageBarrier createMessageBarrier()
protected abstract boolean isBarrierRemovable(java.lang.Object correlationId, java.util.List<Message<?>> releasedMessages)
protected abstract Message<?>[] processReleasedMessages(java.lang.Object correlationId, java.util.List<Message<?>> messages)
|
|||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |