Class AbstractPollingEndpoint
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.endpoint.AbstractPollingEndpoint
- All Implemented Interfaces:
Aware
,BeanClassLoaderAware
,BeanFactoryAware
,BeanNameAware
,DisposableBean
,InitializingBean
,ApplicationContextAware
,Lifecycle
,Phased
,SmartLifecycle
,ExpressionCapable
,NamedComponent
,ManageableLifecycle
,ManageableSmartLifecycle
- Direct Known Subclasses:
PollingConsumer
,SourcePollingChannelAdapter
public abstract class AbstractPollingEndpoint extends AbstractEndpoint implements BeanClassLoaderAware
An
AbstractEndpoint
extension for Polling Consumer pattern basics.
The standard polling logic is based on a periodic task scheduling according the provided
Trigger
.
When this endpoint is treated as isReactive()
, a polling logic is turned into a
Flux.generate(java.util.function.Consumer)
and Mono.delay(Duration)
combination based on the
SimpleTriggerContext
state.- Author:
- Mark Fisher, Oleg Zhurakousky, Gary Russell, Artem Bilan, Andreas Baer
-
Field Summary
Fields Modifier and Type Field Description static long
DEFAULT_POLLING_PERIOD
A default polling period forPeriodicTrigger
.Fields inherited from class org.springframework.integration.endpoint.AbstractEndpoint
lifecycleCondition, lifecycleLock
Fields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, logger
-
Constructor Summary
Constructors Constructor Description AbstractPollingEndpoint()
-
Method Summary
Modifier and Type Method Description protected void
applyReceiveOnlyAdviceChain(Collection<Advice> chain)
Add the advice chain to the component that responds toreceiveMessage()
calls.protected void
doStart()
Subclasses must implement this method with the start behavior.protected void
doStop()
Subclasses must implement this method with the stop behavior.protected ClassLoader
getBeanClassLoader()
MessageChannel
getDefaultErrorChannel()
Return the default error channel if the error handler is explicitly provided and it is aMessagePublishingErrorHandler
.long
getMaxMessagesPerPoll()
protected reactor.core.publisher.Flux<Message<?>>
getPollingFlux()
protected Object
getReceiveMessageSource()
protected String
getResourceKey()
Return the key under which the resource will be made available as an attribute on theIntegrationResourceHolder
.protected Object
getResourceToBind()
Return a resource (MessageSource etc) to bind when using transaction synchronization.protected Executor
getTaskExecutor()
protected abstract void
handleMessage(Message<?> message)
Handle a message.protected boolean
isReactive()
protected boolean
isReceiveOnlyAdvice(Advice advice)
Return true if this advice should be applied only to thereceiveMessage()
operation rather than the whole poll.protected boolean
isSyncExecutor()
protected void
onInit()
Subclasses may implement this for initialization logic.protected abstract Message<?>
receiveMessage()
Obtain the next message (if one is available).void
setAdviceChain(List<Advice> adviceChain)
void
setBeanClassLoader(ClassLoader classLoader)
void
setErrorHandler(ErrorHandler errorHandler)
void
setMaxMessagesPerPoll(long maxMessagesPerPoll)
Configure a cap for messages to poll from the source per scheduling cycle.protected void
setReceiveMessageSource(Object source)
void
setTaskExecutor(Executor taskExecutor)
void
setTransactionSynchronizationFactory(TransactionSynchronizationFactory transactionSynchronizationFactory)
void
setTrigger(Trigger trigger)
Methods inherited from class org.springframework.integration.endpoint.AbstractEndpoint
destroy, doStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stop
Methods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getComponentType, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
-
Field Details
-
DEFAULT_POLLING_PERIOD
public static final long DEFAULT_POLLING_PERIODA default polling period forPeriodicTrigger
.- See Also:
- Constant Field Values
-
-
Constructor Details
-
AbstractPollingEndpoint
public AbstractPollingEndpoint()
-
-
Method Details
-
setTaskExecutor
-
getTaskExecutor
-
isSyncExecutor
protected boolean isSyncExecutor() -
setTrigger
-
setAdviceChain
-
setMaxMessagesPerPoll
Configure a cap for messages to poll from the source per scheduling cycle. A negative number means retrieve unlimited messages until theMessageSource
returnsnull
. Zero means do not poll for any records - it can be considered as pausing if 'maxMessagesPerPoll' is later changed to a non-zero value. The polling cycle may exit earlier if the source returns null for the current receive call.- Parameters:
maxMessagesPerPoll
- the number of message to poll per schedule.
-
getMaxMessagesPerPoll
public long getMaxMessagesPerPoll() -
setErrorHandler
-
setBeanClassLoader
- Specified by:
setBeanClassLoader
in interfaceBeanClassLoaderAware
-
setTransactionSynchronizationFactory
public void setTransactionSynchronizationFactory(TransactionSynchronizationFactory transactionSynchronizationFactory) -
getDefaultErrorChannel
Return the default error channel if the error handler is explicitly provided and it is aMessagePublishingErrorHandler
.- Returns:
- the channel or null.
- Since:
- 4.3
-
getBeanClassLoader
-
isReceiveOnlyAdvice
Return true if this advice should be applied only to thereceiveMessage()
operation rather than the whole poll.- Parameters:
advice
- The advice.- Returns:
- true to only advise the receive operation.
-
applyReceiveOnlyAdviceChain
Add the advice chain to the component that responds toreceiveMessage()
calls.- Parameters:
chain
- the advice chainCollection
.
-
isReactive
protected boolean isReactive() -
getPollingFlux
-
getReceiveMessageSource
-
setReceiveMessageSource
-
onInit
protected void onInit()Description copied from class:IntegrationObjectSupport
Subclasses may implement this for initialization logic.- Overrides:
onInit
in classAbstractEndpoint
-
doStart
protected void doStart()Description copied from class:AbstractEndpoint
Subclasses must implement this method with the start behavior. This method will be invoked while holding theAbstractEndpoint.lifecycleLock
.- Specified by:
doStart
in classAbstractEndpoint
-
doStop
protected void doStop()Description copied from class:AbstractEndpoint
Subclasses must implement this method with the stop behavior. This method will be invoked while holding theAbstractEndpoint.lifecycleLock
.- Specified by:
doStop
in classAbstractEndpoint
-
receiveMessage
Obtain the next message (if one is available). MAY return null if no message is immediately available.- Returns:
- The message or null.
-
handleMessage
Handle a message.- Parameters:
message
- The message.
-
getResourceToBind
Return a resource (MessageSource etc) to bind when using transaction synchronization.- Returns:
- The resource, or null if transaction synchronization is not required.
-
getResourceKey
Return the key under which the resource will be made available as an attribute on theIntegrationResourceHolder
. The defaultExpressionEvaluatingTransactionSynchronizationProcessor
makes this attribute available as a variable in SpEL expressions.- Returns:
- The key, or null (default) if the resource shouldn't be made available as a attribute.
-