S
- the target CorrelationHandlerSpec
implementation type.H
- the AbstractCorrelatingMessageHandler
implementation type.public abstract class CorrelationHandlerSpec<S extends CorrelationHandlerSpec<S,H>,H extends AbstractCorrelatingMessageHandler> extends ConsumerEndpointSpec<S,H>
adviceChain
componentsToRegister, endpointFactoryBean, handler
PARSER, target
logger
DEFAULT_PHASE
OBJECT_TYPE_ATTRIBUTE
Modifier | Constructor and Description |
---|---|
protected |
CorrelationHandlerSpec(H messageHandler) |
Modifier and Type | Method and Description |
---|---|
S |
correlationExpression(String correlationExpression)
Configure the handler with an
ExpressionEvaluatingCorrelationStrategy
for the given expression. |
S |
correlationStrategy(CorrelationStrategy correlationStrategy) |
S |
correlationStrategy(Object target,
String methodName)
Configure the handler with an
MethodInvokingCorrelationStrategy
for the target object and method name. |
S |
discardChannel(MessageChannel discardChannel) |
S |
discardChannel(String discardChannelName) |
S |
expireGroupsUponTimeout(boolean expireGroupsUponTimeout)
Expire (completely remove) a group if it is completed due to timeout.
|
S |
forceReleaseAdvice(Advice... advice)
Configure a list of
Advice objects to be applied to the
forceComplete() operation. |
S |
groupTimeout(java.util.function.Function<MessageGroup,Long> groupTimeoutFunction)
Configure the handler with a function that will be invoked to resolve the group timeout,
based on the message group.
|
S |
groupTimeout(long groupTimeout)
Configure the handler with a group timeout expression that evaluates to
this constant value.
|
S |
groupTimeoutExpression(String groupTimeoutExpression) |
S |
lockRegistry(LockRegistry lockRegistry)
Used to obtain a
Lock based on the groupId for concurrent operations
on the MessageGroup . |
S |
messageStore(MessageGroupStore messageStore) |
S |
minimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups) |
S |
popSequence(boolean popSequence)
Perform a
MessageBuilder.popSequenceDetails()
for output message or not. |
S |
processor(Object target)
Configure the handler with
MethodInvokingCorrelationStrategy
and MethodInvokingReleaseStrategy using the target
object which should have methods annotated appropriately for each function. |
S |
releaseExpression(String releaseExpression)
Configure the handler with an
ExpressionEvaluatingReleaseStrategy for the
given expression. |
S |
releaseStrategy(Object target,
String methodName)
Configure the handler with an
MethodInvokingReleaseStrategy
for the target object and method name. |
S |
releaseStrategy(ReleaseStrategy releaseStrategy) |
S |
sendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) |
S |
setExpireDuration(java.time.Duration expireDuration)
Configure a
Duration how often to run a scheduled purge task. |
S |
setExpireTimeout(long expireTimeout)
Configure a timeout for old groups in the store to purge.
|
S |
taskScheduler(TaskScheduler taskScheduler)
Configure a
TaskScheduler for scheduling tasks, for example in the
Polling Consumer. |
advice, async, autoStartup, customizeMonoReply, doGet, handleMessageAdvice, notPropagatedHeaders, order, phase, poller, requiresReply, role, sendTimeout, transactional, transactional, transactional, transactional, transactional
assertHandler, getComponentsToRegister, id, obtainInputChannelFromFlow, obtainInputChannelFromFlow, poller, poller
_this, createInstance, destroyInstance, get, getId, getObjectType, getPhase, isAutoStartup, isRunning, start, stop, stop
afterPropertiesSet, destroy, getBeanFactory, getBeanTypeConverter, getEarlySingletonInterfaces, getObject, isSingleton, setBeanClassLoader, setBeanFactory, setSingleton
protected CorrelationHandlerSpec(H messageHandler)
public S messageStore(MessageGroupStore messageStore)
messageStore
- the message group store.AbstractCorrelatingMessageHandler.setMessageStore(MessageGroupStore)
public S sendPartialResultOnExpiry(boolean sendPartialResultOnExpiry)
sendPartialResultOnExpiry
- the sendPartialResultOnExpiry.AbstractCorrelatingMessageHandler.setSendPartialResultOnExpiry(boolean)
public S minimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups)
minimumTimeoutForEmptyGroups
- the minimumTimeoutForEmptyGroupsAbstractCorrelatingMessageHandler.setMinimumTimeoutForEmptyGroups(long)
public S groupTimeout(long groupTimeout)
groupTimeout
- the group timeout in milliseconds.AbstractCorrelatingMessageHandler.setGroupTimeoutExpression(org.springframework.expression.Expression)
,
ValueExpression
public S groupTimeoutExpression(String groupTimeoutExpression)
groupTimeoutExpression
- the group timeout expression string.AbstractCorrelatingMessageHandler.setGroupTimeoutExpression(org.springframework.expression.Expression)
public S groupTimeout(java.util.function.Function<MessageGroup,Long> groupTimeoutFunction)
.groupTimeout(g -> g.size() * 2000L)
.
groupTimeoutFunction
- a function invoked to resolve the group timeout in milliseconds.AbstractCorrelatingMessageHandler.setGroupTimeoutExpression(org.springframework.expression.Expression)
public S taskScheduler(TaskScheduler taskScheduler)
ConsumerEndpointSpec
TaskScheduler
for scheduling tasks, for example in the
Polling Consumer. By default the global ThreadPoolTaskScheduler
bean is used.
This configuration is useful when there are requirements to dedicate particular threads
for polling task, for example.taskScheduler
in class ConsumerEndpointSpec<S extends CorrelationHandlerSpec<S,H>,H extends AbstractCorrelatingMessageHandler>
taskScheduler
- the task scheduler.IntegrationObjectSupport.setTaskScheduler(TaskScheduler)
public S discardChannel(MessageChannel discardChannel)
discardChannel
- the discard channel.AbstractCorrelatingMessageHandler.setDiscardChannel(MessageChannel)
public S discardChannel(String discardChannelName)
discardChannelName
- the discard channel.AbstractCorrelatingMessageHandler.setDiscardChannelName(String)
public S processor(Object target)
MethodInvokingCorrelationStrategy
and MethodInvokingReleaseStrategy
using the target
object which should have methods annotated appropriately for each function.target
- the target objectAbstractCorrelatingMessageHandler.setCorrelationStrategy(CorrelationStrategy)
,
AbstractCorrelatingMessageHandler.setReleaseStrategy(ReleaseStrategy)
public S correlationExpression(String correlationExpression)
ExpressionEvaluatingCorrelationStrategy
for the given expression.correlationExpression
- the correlation expression.AbstractCorrelatingMessageHandler.setCorrelationStrategy(CorrelationStrategy)
public S correlationStrategy(Object target, String methodName)
MethodInvokingCorrelationStrategy
for the target object and method name.target
- the target object.methodName
- the method name.AbstractCorrelatingMessageHandler.setCorrelationStrategy(CorrelationStrategy)
public S correlationStrategy(CorrelationStrategy correlationStrategy)
correlationStrategy
- the correlation strategy.AbstractCorrelatingMessageHandler.setCorrelationStrategy(CorrelationStrategy)
public S releaseExpression(String releaseExpression)
ExpressionEvaluatingReleaseStrategy
for the
given expression.releaseExpression
- the correlation expression.AbstractCorrelatingMessageHandler.setReleaseStrategy(ReleaseStrategy)
public S releaseStrategy(Object target, String methodName)
MethodInvokingReleaseStrategy
for the target object and method name.target
- the target object.methodName
- the method name.AbstractCorrelatingMessageHandler.setReleaseStrategy(ReleaseStrategy)
public S releaseStrategy(ReleaseStrategy releaseStrategy)
releaseStrategy
- the release strategy.AbstractCorrelatingMessageHandler.setReleaseStrategy(ReleaseStrategy)
public S expireGroupsUponTimeout(boolean expireGroupsUponTimeout)
true
for aggregator and false
for resequencer.expireGroupsUponTimeout
- the expireGroupsUponTimeout to setAbstractCorrelatingMessageHandler.setExpireGroupsUponTimeout(boolean)
public S forceReleaseAdvice(Advice... advice)
Advice
objects to be applied to the
forceComplete()
operation.advice
- the advice chain.public S lockRegistry(LockRegistry lockRegistry)
Lock
based on the groupId
for concurrent operations
on the MessageGroup
.
By default, an internal DefaultLockRegistry
is used.
Use of a distributed LockRegistry
, such as the RedisLockRegistry
,
ensures only one instance of the aggregator will operate on a group concurrently.lockRegistry
- the LockRegistry
to use.public S popSequence(boolean popSequence)
MessageBuilder.popSequenceDetails()
for output message or not.popSequence
- the boolean flag to use.AbstractCorrelatingMessageHandler.setPopSequence(boolean)
public S setExpireTimeout(long expireTimeout)
expireTimeout
- the timeout in milliseconds to use.AbstractCorrelatingMessageHandler.setExpireTimeout(long)
public S setExpireDuration(java.time.Duration expireDuration)
Duration
how often to run a scheduled purge task.expireDuration
- the duration for scheduled purge task.AbstractCorrelatingMessageHandler.setExpireDuration(Duration)