public class QueueChannel extends AbstractPollableChannel implements QueueChannelOperations, QueueChannelManagement
Message
is placed in
a BlockingQueue
whose capacity may be specified upon construction.
The capacity must be a positive integer value. For a zero-capacity version
based upon a SynchronousQueue
, consider the
RendezvousChannel
.AbstractMessageChannel.ChannelInterceptorList
IntegrationManagement.ManagementOverrides
Modifier and Type | Field and Description |
---|---|
protected Semaphore |
queueSemaphore |
interceptors, meters
EXPRESSION_PARSER, logger
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
INDEFINITE_TIMEOUT
Constructor and Description |
---|
QueueChannel()
Create a channel with "unbounded" queue capacity.
|
QueueChannel(int capacity)
Create a channel with the specified queue capacity.
|
QueueChannel(Queue<Message<?>> queue)
Create a channel with the specified queue.
|
Modifier and Type | Method and Description |
---|---|
List<Message<?>> |
clear()
Remove all
Messages from this channel. |
protected Message<?> |
doReceive(long timeout)
Subclasses must implement this method.
|
protected boolean |
doSend(Message<?> message,
long timeout)
Subclasses must implement this method.
|
int |
getQueueSize()
Obtain the current number of queued
Messages in this channel. |
int |
getRemainingCapacity()
Obtain the remaining capacity of this channel.
|
List<Message<?>> |
purge(MessageSelector selector)
Remove any
Messages that are not accepted by the provided selector. |
addInterceptor, addInterceptor, getIntegrationPatternType, getReceiveCount, getReceiveCountLong, getReceiveErrorCount, getReceiveErrorCountLong, hasExecutorInterceptors, receive, receive, removeInterceptor, removeInterceptor, setInterceptors
configureMetrics, destroy, getComponentType, getErrorRate, getFullChannelName, getIChannelInterceptorList, getInterceptors, getMaxSendDuration, getMeanErrorRate, getMeanErrorRatio, getMeanSendDuration, getMeanSendRate, getMetrics, getMetricsCaptor, getMinSendDuration, getOverrides, getSendCount, getSendCountLong, getSendDuration, getSendErrorCount, getSendErrorCountLong, getSendRate, getStandardDeviationSendDuration, getTimeSinceLastSend, isCountsEnabled, isLoggingEnabled, isStatsEnabled, onInit, registerMetricsCaptor, reset, send, send, setCountsEnabled, setDatatypes, setLoggingEnabled, setMessageConverter, setShouldTrack, setStatsEnabled
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getReceiveCount, getReceiveCountLong, getReceiveErrorCount, getReceiveErrorCountLong, receiveCount, receiveErrorCount
isStatsEnabled, setStatsEnabled
destroy, getOverrides, isCountsEnabled, isLoggingEnabled, registerMetricsCaptor, reset, setCountsEnabled, setLoggingEnabled
send, send
getInterceptors
getBeanName, getComponentName
sendCount, sendErrorCount
protected final Semaphore queueSemaphore
public QueueChannel(Queue<Message<?>> queue)
queue
- The queue.public QueueChannel(int capacity)
capacity
- The capacity.public QueueChannel()
Integer.MAX_VALUE
. Note that a bounded queue is recommended, since an
unbounded queue may lead to OutOfMemoryErrors.protected boolean doSend(Message<?> message, long timeout)
AbstractMessageChannel
doSend
in class AbstractMessageChannel
message
- The message.timeout
- The timeout.@Nullable protected Message<?> doReceive(long timeout)
AbstractPollableChannel
doReceive
in class AbstractPollableChannel
timeout
- The timeout.public List<Message<?>> clear()
QueueChannelOperations
Messages
from this channel.clear
in interface QueueChannelOperations
public List<Message<?>> purge(@Nullable MessageSelector selector)
QueueChannelOperations
Messages
that are not accepted by the provided selector.purge
in interface QueueChannelOperations
selector
- The message selector.public int getQueueSize()
QueueChannelOperations
Messages
in this channel.getQueueSize
in interface QueueChannelOperations
getQueueSize
in interface QueueChannelManagement
Messages
in this channel.public int getRemainingCapacity()
QueueChannelOperations
getRemainingCapacity
in interface QueueChannelOperations
getRemainingCapacity
in interface QueueChannelManagement