|
|||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Objectorg.springframework.integration.router.AggregatingMessageHandler
public class AggregatingMessageHandler
A MessageHandler
implementation that waits for a complete
group of Messages
to arrive and then delegates to an
Aggregator
to combine them into a single Message
.
Each Message
that is received by this handler will be associated with
a group based upon the 'correlationId
' property of its
header. If no such property is available, a MessageHandlingException
will be thrown.
The default strategy for determining whether a group is complete is based on
the 'sequenceSize
' property of the header. Alternatively, a
custom implementation of the CompletionStrategy
may be provided.
The 'timeout
' value determines how long to wait for the
complete group after the arrival of the first Message
of the group.
The default value is 1 minute. If the timeout elapses prior to completion,
then Messages with that timed-out 'correlationId' will be sent to the
'discardChannel' if provided.
Nested Class Summary | |
---|---|
private class |
AggregatingMessageHandler.ReaperTask
|
Field Summary | |
---|---|
private Aggregator |
aggregator
|
private java.util.concurrent.ConcurrentMap<java.lang.Object,AggregationBarrier> |
barriers
|
private CompletionStrategy |
completionStrategy
|
static long |
DEFAULT_REAPER_INTERVAL
|
static long |
DEFAULT_SEND_TIMEOUT
|
static long |
DEFAULT_TIMEOUT
|
static int |
DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY
|
private MessageChannel |
defaultReplyChannel
|
private MessageChannel |
discardChannel
|
private java.util.concurrent.ScheduledExecutorService |
executor
|
private boolean |
initialized
|
private org.apache.commons.logging.Log |
logger
|
private long |
reaperInterval
|
private boolean |
sendPartialResultOnTimeout
|
private long |
sendTimeout
|
private long |
timeout
|
private int |
trackedCorrelationIdCapacity
|
private java.util.concurrent.BlockingQueue<java.lang.Object> |
trackedCorrelationIds
|
Constructor Summary | |
---|---|
AggregatingMessageHandler(Aggregator aggregator)
|
|
AggregatingMessageHandler(Aggregator aggregator,
java.util.concurrent.ScheduledExecutorService executor)
Create a handler that delegates to the provided aggregator to combine a group of messages into a single message. |
Method Summary | |
---|---|
void |
afterPropertiesSet()
Initialize this handler. |
private void |
aggregationCompleted(java.lang.Object correlationId,
java.util.List<Message<?>> messages)
|
Message<?> |
handle(Message<?> message)
|
private void |
removeBarrier(java.lang.Object correlationId)
|
private MessageChannel |
resolveReplyChannelFromMessage(Message<?> message)
|
private void |
sendToDiscardChannelIfAvailable(Message<?> message)
|
void |
setCompletionStrategy(CompletionStrategy completionStrategy)
Strategy to determine whether the group of messages is complete. |
void |
setDefaultReplyChannel(MessageChannel defaultReplyChannel)
Set the default channel for sending aggregated Messages. |
void |
setDiscardChannel(MessageChannel discardChannel)
Specify a channel for sending Messages that arrive after their aggregation group has either completed or timed-out. |
void |
setReaperInterval(long reaperInterval)
Set the interval in milliseconds for the reaper thread. |
void |
setSendPartialResultOnTimeout(boolean sendPartialResultOnTimeout)
Specify whether to aggregate and send the resulting Message when the timeout elapses prior to the CompletionStrategy. |
void |
setSendTimeout(long sendTimeout)
Set the timeout for sending aggregation results and discarded Messages. |
void |
setTimeout(long timeout)
Maximum time to wait (in milliseconds) for the completion strategy to become true. |
void |
setTrackedCorrelationIdCapacity(int trackedCorrelationIdCapacity)
Set the number of completed correlationIds to track. |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Field Detail |
---|
public static final long DEFAULT_SEND_TIMEOUT
public static final long DEFAULT_TIMEOUT
public static final long DEFAULT_REAPER_INTERVAL
public static final int DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY
private final org.apache.commons.logging.Log logger
private final Aggregator aggregator
private volatile MessageChannel defaultReplyChannel
private volatile MessageChannel discardChannel
private volatile long sendTimeout
private volatile CompletionStrategy completionStrategy
private final java.util.concurrent.ConcurrentMap<java.lang.Object,AggregationBarrier> barriers
private volatile long timeout
private volatile boolean sendPartialResultOnTimeout
private volatile long reaperInterval
private volatile int trackedCorrelationIdCapacity
private volatile java.util.concurrent.BlockingQueue<java.lang.Object> trackedCorrelationIds
private final java.util.concurrent.ScheduledExecutorService executor
private volatile boolean initialized
Constructor Detail |
---|
public AggregatingMessageHandler(Aggregator aggregator, java.util.concurrent.ScheduledExecutorService executor)
null
, a new
single-threaded executor will be created.
public AggregatingMessageHandler(Aggregator aggregator)
Method Detail |
---|
public void setDefaultReplyChannel(MessageChannel defaultReplyChannel)
public void setDiscardChannel(MessageChannel discardChannel)
public void setSendTimeout(long sendTimeout)
public void setSendPartialResultOnTimeout(boolean sendPartialResultOnTimeout)
public void setReaperInterval(long reaperInterval)
public void setTrackedCorrelationIdCapacity(int trackedCorrelationIdCapacity)
public void afterPropertiesSet()
afterPropertiesSet
in interface org.springframework.beans.factory.InitializingBean
public void setCompletionStrategy(CompletionStrategy completionStrategy)
public void setTimeout(long timeout)
public Message<?> handle(Message<?> message)
handle
in interface MessageHandler
private void sendToDiscardChannelIfAvailable(Message<?> message)
private void aggregationCompleted(java.lang.Object correlationId, java.util.List<Message<?>> messages)
private void removeBarrier(java.lang.Object correlationId)
private MessageChannel resolveReplyChannelFromMessage(Message<?> message)
|
|||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |