Class AbstractPollingEndpoint

All Implemented Interfaces:
Aware, BeanClassLoaderAware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, ApplicationContextAware, Lifecycle, Phased, SmartLifecycle, ComponentSourceAware, ExpressionCapable, NamedComponent, ManageableLifecycle, ManageableSmartLifecycle
Direct Known Subclasses:
PollingConsumer, SourcePollingChannelAdapter

public abstract class AbstractPollingEndpoint extends AbstractEndpoint implements BeanClassLoaderAware
An AbstractEndpoint extension for Polling Consumer pattern basics. The standard polling logic is based on a periodic task scheduling according the provided Trigger. When this endpoint is treated as isReactive(), a polling logic is turned into a Flux.generate(java.util.function.Consumer) and Mono.delay(Duration) combination based on the SimpleTriggerContext state.
Author:
Mark Fisher, Oleg Zhurakousky, Gary Russell, Artem Bilan, Andreas Baer, Christian Tzolov, Ngoc Nhan
  • Field Details

  • Constructor Details

    • AbstractPollingEndpoint

      public AbstractPollingEndpoint()
  • Method Details

    • setTaskExecutor

      public void setTaskExecutor(Executor taskExecutor)
    • getTaskExecutor

      protected Executor getTaskExecutor()
    • isSyncExecutor

      protected boolean isSyncExecutor()
    • setTrigger

      public void setTrigger(Trigger trigger)
    • setAdviceChain

      public void setAdviceChain(List<Advice> adviceChain)
    • setMaxMessagesPerPoll

      @ManagedAttribute public void setMaxMessagesPerPoll(long maxMessagesPerPoll)
      Configure a cap for messages to poll from the source per scheduling cycle. A negative number means retrieve unlimited messages until the MessageSource returns null. Zero means do not poll for any records - it can be considered as pausing if 'maxMessagesPerPoll' is later changed to a non-zero value. The polling cycle may exit earlier if the source returns null for the current receive call.
      Parameters:
      maxMessagesPerPoll - the number of message to poll per schedule.
    • getMaxMessagesPerPoll

      public long getMaxMessagesPerPoll()
    • setErrorHandler

      public void setErrorHandler(ErrorHandler errorHandler)
    • setBeanClassLoader

      public void setBeanClassLoader(ClassLoader classLoader)
      Specified by:
      setBeanClassLoader in interface BeanClassLoaderAware
    • setTransactionSynchronizationFactory

      public void setTransactionSynchronizationFactory(TransactionSynchronizationFactory transactionSynchronizationFactory)
    • getDefaultErrorChannel

      public MessageChannel getDefaultErrorChannel()
      Return the default error channel if the error handler is explicitly provided and it is a MessagePublishingErrorHandler.
      Returns:
      the channel or null.
      Since:
      4.3
    • getBeanClassLoader

      protected ClassLoader getBeanClassLoader()
    • isReceiveOnlyAdvice

      protected boolean isReceiveOnlyAdvice(Advice advice)
      Return true if this advice should be applied only to the receiveMessage() operation rather than the whole poll.
      Parameters:
      advice - The advice.
      Returns:
      true to only advise the receive operation.
    • applyReceiveOnlyAdviceChain

      protected void applyReceiveOnlyAdviceChain(Collection<Advice> chain)
      Add the advice chain to the component that responds to receiveMessage() calls.
      Parameters:
      chain - the advice chain Collection.
    • isReactive

      protected boolean isReactive()
    • getPollingFlux

      protected reactor.core.publisher.Flux<Message<?>> getPollingFlux()
    • getReceiveMessageSource

      protected Object getReceiveMessageSource()
    • setReceiveMessageSource

      protected void setReceiveMessageSource(Object source)
    • onInit

      protected void onInit()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this for initialization logic.
      Overrides:
      onInit in class AbstractEndpoint
    • doStart

      protected void doStart()
      Description copied from class: AbstractEndpoint
      Subclasses must implement this method with the start behavior. This method will be invoked while holding the AbstractEndpoint.lifecycleLock.
      Specified by:
      doStart in class AbstractEndpoint
    • messageReceived

      protected void messageReceived(@Nullable IntegrationResourceHolder holder, Message<?> message)
    • donePollingTask

      protected void donePollingTask(@Nullable Exception pollingTaskError)
    • doStop

      protected void doStop()
      Description copied from class: AbstractEndpoint
      Subclasses must implement this method with the stop behavior. This method will be invoked while holding the AbstractEndpoint.lifecycleLock.
      Specified by:
      doStop in class AbstractEndpoint
    • receiveMessage

      protected abstract Message<?> receiveMessage()
      Obtain the next message (if one is available). MAY return null if no message is immediately available.
      Returns:
      The message or null.
    • handleMessage

      protected abstract void handleMessage(Message<?> message)
      Handle a message.
      Parameters:
      message - The message.
    • getResourceToBind

      protected Object getResourceToBind()
      Return a resource (MessageSource etc.) to bind when using transaction synchronization.
      Returns:
      The resource, or null if transaction synchronization is not required.
    • getResourceKey

      protected String getResourceKey()
      Return the key under which the resource will be made available as an attribute on the IntegrationResourceHolder. The default ExpressionEvaluatingTransactionSynchronizationProcessor makes this attribute available as a variable in SpEL expressions.
      Returns:
      The key, or null (default) if the resource shouldn't be made available as a attribute.