Poller

This section describes how polling works in Spring Integration.

Polling Consumer

When Message Endpoints (Channel Adapters) are connected to channels and instantiated, they produce one of the following instances:

The actual implementation depends on the type of channel to which these endpoints connect. A channel adapter connected to a channel that implements the org.springframework.messaging.SubscribableChannel interface produces an instance of EventDrivenConsumer. On the other hand, a channel adapter connected to a channel that implements the org.springframework.messaging.PollableChannel interface (such as a QueueChannel) produces an instance of PollingConsumer.

Polling consumers let Spring Integration components actively poll for Messages rather than process messages in an event-driven manner.

They represent a critical cross-cutting concern in many messaging scenarios. In Spring Integration, polling consumers are based on the pattern with the same name, which is described in the book Enterprise Integration Patterns, by Gregor Hohpe and Bobby Woolf. You can find a description of the pattern on the book’s website.

Pollable Message Source

Spring Integration offers a second variation of the polling consumer pattern. When inbound channel adapters are used, these adapters are often wrapped by a SourcePollingChannelAdapter. For example, when retrieving messages from a remote FTP Server location, the adapter described in FTP Inbound Channel Adapter is configured with a poller to periodically retrieve messages. So, when components are configured with pollers, the resulting instances are of one of the following types:

This means that pollers are used in both inbound and outbound messaging scenarios. Here are some use cases in which pollers are used:

  • Polling certain external systems, such as FTP Servers, Databases, and Web Services

  • Polling internal (pollable) message channels

  • Polling internal services (such as repeatedly executing methods on a Java class)

AOP advice classes can be applied to pollers, in an advice-chain, such as a transaction advice to start a transaction. Starting with version 4.1, a PollSkipAdvice is provided. Pollers use triggers to determine the time of the next poll. The PollSkipAdvice can be used to suppress (skip) a poll, perhaps because there is some downstream condition that would prevent the message being processed. To use this advice, you have to provide it with an implementation of a PollSkipStrategy. Starting with version 4.2.5, a SimplePollSkipStrategy is provided. To use it, you can add an instance as a bean to the application context, inject it into a PollSkipAdvice, and add that to the poller’s advice chain. To skip polling, call skipPolls(). To resume polling, call reset(). Version 4.2 added more flexibility in this area. See Conditional Pollers for Message Sources.

This chapter is meant to only give a high-level overview of polling consumers and how they fit into the concept of message channels (see Message Channels) and channel adapters (see Channel Adapter). For more information regarding messaging endpoints in general and polling consumers in particular, see Message Endpoints.

Deferred Acknowledgment Pollable Message Source

Starting with version 5.0.1, certain modules provide MessageSource implementations that support deferring acknowledgment until the downstream flow completes (or hands off the message to another thread). This is currently limited to the AmqpMessageSource and the KafkaMessageSource provided by the spring-integration-kafka extension project.

With these message sources, the IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK header (see MessageHeaderAccessor API) is added to the message. When used with pollable message sources, the value of the header is an instance of AcknowledgmentCallback, as the following example shows:

@FunctionalInterface
public interface AcknowledgmentCallback {

    void acknowledge(Status status);

    boolean isAcknowledged();

    void noAutoAck();

    default boolean isAutoAck();

    enum Status {

        /**
         * Mark the message as accepted.
         */
        ACCEPT,

        /**
         * Mark the message as rejected.
         */
        REJECT,

        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE

    }

}

Not all message sources (for example, Kafka) support the REJECT status. It is treated the same as ACCEPT.

Applications can acknowledge a message at any time, as the following example shows:

Message<?> received = source.receive();

...

StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
        .acknowledge(Status.ACCEPT);

If the MessageSource is wired into a SourcePollingChannelAdapter, when the poller thread returns to the adapter after the downstream flow completes, the adapter checks whether the acknowledgment has already been acknowledged and, if not, sets its status to ACCEPT it (or REJECT if the flow throws an exception). The status values are defined in the AcknowledgmentCallback.Status enumeration.

Spring Integration provides MessageSourcePollingTemplate to perform ad-hoc polling of a MessageSource. This, too, takes care of setting ACCEPT or REJECT on the AcknowledgmentCallback when the MessageHandler callback returns (or throws an exception). The following example shows how to poll with the MessageSourcePollingTemplate:

MessageSourcePollingTemplate template =
    new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
    ...
});

In both cases (SourcePollingChannelAdapter and MessageSourcePollingTemplate), you can disable auto ack/nack by calling noAutoAck() on the callback. You might do this if you hand off the message to another thread and wish to acknowledge later. Not all implementations support this (for example, Apache Kafka does not, because the offset commit has to be performed on the same thread).

Conditional Pollers for Message Sources

This section covers how to use conditional pollers.

Background

Advice objects, in an advice-chain on a poller, advise the whole polling task (both message retrieval and processing). These “around advice” methods do not have access to any context for the poll — only the poll itself. This is fine for requirements such as making a task transactional or skipping a poll due to some external condition, as discussed earlier. What if we wish to take some action depending on the result of the receive part of the poll or if we want to adjust the poller depending on conditions? For those instances, Spring Integration offers “Smart” Polling.

“Smart” Polling

Version 4.2 introduced the AbstractMessageSourceAdvice. Any Advice objects in the advice-chain that subclass this class are applied only to the receive operation. Such classes implement the following methods:

  • beforeReceive(MessageSource<?> source) This method is called before the MessageSource.receive() method. It lets you examine and reconfigure the source. Returning false cancels this poll (similar to the PollSkipAdvice mentioned earlier).

  • Message<?> afterReceive(Message<?> result, MessageSource<?> source) This method is called after the receive() method. Again, you can reconfigure the source or take any action (perhaps depending on the result, which can be null if there was no message created by the source). You can even return a different message

Thread safety

If an advice mutates the MessageSource, you should not configure the poller with a TaskExecutor. If an advice mutates the source, such mutations are not thread safe and could cause unexpected results, especially with high frequency pollers. If you need to process poll results concurrently, consider using a downstream ExecutorChannel instead of adding an executor to the poller.

Advice Chain Ordering

You should understand how the advice chain is processed during initialization. Advice objects that do not extend AbstractMessageSourceAdvice are applied to the whole poll process and are all invoked first, in order, before any AbstractMessageSourceAdvice. Then AbstractMessageSourceAdvice objects are invoked in order around the MessageSource receive() method. If you have, for example, Advice objects a, b, c, d, where b and d are AbstractMessageSourceAdvice, the objects are applied in the following order: a, c, b, d. Also, if a MessageSource is already a Proxy, the AbstractMessageSourceAdvice is invoked after any existing Advice objects. If you wish to change the order, you must wire up the proxy yourself.

SimpleActiveIdleMessageSourceAdvice

This advice is a simple implementation of AbstractMessageSourceAdvice. When used in conjunction with a DynamicPeriodicTrigger, it adjusts the polling frequency, depending on whether or not the previous poll resulted in a message or not. The poller must also have a reference to the same DynamicPeriodicTrigger.

Important: Async Handoff
SimpleActiveIdleMessageSourceAdvice modifies the trigger based on the receive() result. This works only if the advice is called on the poller thread. It does not work if the poller has a task-executor. To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using an ExecutorChannel.
CompoundTriggerAdvice

This advice allows the selection of one of two triggers based on whether a poll returns a message or not. Consider a poller that uses a CronTrigger. CronTrigger instances are immutable, so they cannot be altered once constructed. Consider a use case where we want to use a cron expression to trigger a poll once each hour but, if no message is received, poll once per minute and, when a message is retrieved, revert to using the cron expression.

The advice (and poller) use a CompoundTrigger for this purpose. The trigger’s primary trigger can be a CronTrigger. When the advice detects that no message is received, it adds the secondary trigger to the CompoundTrigger. When the CompoundTrigger instance’s nextExecutionTime method is invoked, it delegates to the secondary trigger, if present. Otherwise, it delegates to the primary trigger.

The poller must also have a reference to the same CompoundTrigger.

The following example shows the configuration for the hourly cron expression with a fallback to every minute:

<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
    <bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
    <int:poller trigger="compoundTrigger">
        <int:advice-chain>
            <bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
                <constructor-arg ref="compoundTrigger"/>
                <constructor-arg ref="secondary"/>
            </bean>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
    <constructor-arg ref="primary" />
</bean>

<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
    <constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>

<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="60000" />
</bean>
Important: Async Handoff
CompoundTriggerAdvice modifies the trigger based on the receive() result. This works only if the advice is called on the poller thread. It does not work if the poller has a task-executor. To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using an ExecutorChannel.