Poller
This section describes how polling works in Spring Integration.
Polling Consumer
When Message Endpoints (Channel Adapters) are connected to channels and instantiated, they produce one of the following instances:
The actual implementation depends on the type of channel to which these endpoints connect.
A channel adapter connected to a channel that implements the org.springframework.messaging.SubscribableChannel
interface produces an instance of EventDrivenConsumer
.
On the other hand, a channel adapter connected to a channel that implements the org.springframework.messaging.PollableChannel
interface (such as a QueueChannel
) produces an instance of PollingConsumer
.
Polling consumers let Spring Integration components actively poll for Messages rather than process messages in an event-driven manner.
They represent a critical cross-cutting concern in many messaging scenarios. In Spring Integration, polling consumers are based on the pattern with the same name, which is described in the book Enterprise Integration Patterns, by Gregor Hohpe and Bobby Woolf. You can find a description of the pattern on the book’s website.
Pollable Message Source
Spring Integration offers a second variation of the polling consumer pattern.
When inbound channel adapters are used, these adapters are often wrapped by a SourcePollingChannelAdapter
.
For example, when retrieving messages from a remote FTP Server location, the adapter described in FTP Inbound Channel Adapter is configured with a poller to periodically retrieve messages.
So, when components are configured with pollers, the resulting instances are of one of the following types:
This means that pollers are used in both inbound and outbound messaging scenarios. Here are some use cases in which pollers are used:
-
Polling certain external systems, such as FTP Servers, Databases, and Web Services
-
Polling internal (pollable) message channels
-
Polling internal services (such as repeatedly executing methods on a Java class)
AOP advice classes can be applied to pollers, in an advice-chain , such as a transaction advice to start a transaction.
Starting with version 4.1, a PollSkipAdvice is provided.
Pollers use triggers to determine the time of the next poll.
The PollSkipAdvice can be used to suppress (skip) a poll, perhaps because there is some downstream condition that would prevent the message being processed.
To use this advice, you have to provide it with an implementation of a PollSkipStrategy .
Starting with version 4.2.5, a SimplePollSkipStrategy is provided.
To use it, you can add an instance as a bean to the application context, inject it into a PollSkipAdvice , and add that to the poller’s advice chain.
To skip polling, call skipPolls() .
To resume polling, call reset() .
Version 4.2 added more flexibility in this area.
See Conditional Pollers for Message Sources.
|
This chapter is meant to only give a high-level overview of polling consumers and how they fit into the concept of message channels (see Message Channels) and channel adapters (see Channel Adapter). For more information regarding messaging endpoints in general and polling consumers in particular, see Message Endpoints.
Deferred Acknowledgment Pollable Message Source
Starting with version 5.0.1, certain modules provide MessageSource
implementations that support deferring acknowledgment until the downstream flow completes (or hands off the message to another thread).
This is currently limited to the AmqpMessageSource
and the KafkaMessageSource
provided by the spring-integration-kafka
extension project.
With these message sources, the IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
header (see MessageHeaderAccessor
API) is added to the message.
When used with pollable message sources, the value of the header is an instance of AcknowledgmentCallback
, as the following example shows:
@FunctionalInterface
public interface AcknowledgmentCallback {
void acknowledge(Status status);
boolean isAcknowledged();
void noAutoAck();
default boolean isAutoAck();
enum Status {
/**
* Mark the message as accepted.
*/
ACCEPT,
/**
* Mark the message as rejected.
*/
REJECT,
/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE
}
}
Not all message sources (for example, Kafka) support the REJECT
status.
It is treated the same as ACCEPT
.
Applications can acknowledge a message at any time, as the following example shows:
Message<?> received = source.receive();
...
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(Status.ACCEPT);
If the MessageSource
is wired into a SourcePollingChannelAdapter
, when the poller thread returns to the adapter after the downstream flow completes, the adapter checks whether the acknowledgment has already been acknowledged and, if not, sets its status to ACCEPT
it (or REJECT
if the flow throws an exception).
The status values are defined in the AcknowledgmentCallback.Status
enumeration.
Spring Integration provides MessageSourcePollingTemplate
to perform ad-hoc polling of a MessageSource
.
This, too, takes care of setting ACCEPT
or REJECT
on the AcknowledgmentCallback
when the MessageHandler
callback returns (or throws an exception).
The following example shows how to poll with the MessageSourcePollingTemplate
:
MessageSourcePollingTemplate template =
new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
...
});
In both cases (SourcePollingChannelAdapter
and MessageSourcePollingTemplate
), you can disable auto ack/nack by calling noAutoAck()
on the callback.
You might do this if you hand off the message to another thread and wish to acknowledge later.
Not all implementations support this (for example, Apache Kafka does not, because the offset commit has to be performed on the same thread).
Conditional Pollers for Message Sources
This section covers how to use conditional pollers.
Background
Advice
objects, in an advice-chain
on a poller, advise the whole polling task (both message retrieval and processing).
These “around advice” methods do not have access to any context for the poll — only the poll itself.
This is fine for requirements such as making a task transactional or skipping a poll due to some external condition, as discussed earlier.
What if we wish to take some action depending on the result of the receive
part of the poll or if we want to adjust the poller depending on conditions? For those instances, Spring Integration offers “Smart” Polling.
“Smart” Polling
Version 4.2 introduced the AbstractMessageSourceAdvice
.
Any Advice
objects in the advice-chain
that subclass this class are applied only to the receive operation.
Such classes implement the following methods:
-
beforeReceive(MessageSource<?> source)
This method is called before theMessageSource.receive()
method. It lets you examine and reconfigure the source. Returningfalse
cancels this poll (similar to thePollSkipAdvice
mentioned earlier). -
Message<?> afterReceive(Message<?> result, MessageSource<?> source)
This method is called after thereceive()
method. Again, you can reconfigure the source or take any action (perhaps depending on the result, which can benull
if there was no message created by the source). You can even return a different message
Thread safety
If an advice mutates the |
Advice Chain Ordering
You should understand how the advice chain is processed during initialization.
|
SimpleActiveIdleMessageSourceAdvice
This advice is a simple implementation of AbstractMessageSourceAdvice
.
When used in conjunction with a DynamicPeriodicTrigger
, it adjusts the polling frequency, depending on whether or not the previous poll resulted in a message or not.
The poller must also have a reference to the same DynamicPeriodicTrigger
.
Important: Async Handoff
SimpleActiveIdleMessageSourceAdvice modifies the trigger based on the receive() result.
This works only if the advice is called on the poller thread.
It does not work if the poller has a task-executor .
To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using an ExecutorChannel .
|
CompoundTriggerAdvice
This advice allows the selection of one of two triggers based on whether a poll returns a message or not.
Consider a poller that uses a CronTrigger
.
CronTrigger
instances are immutable, so they cannot be altered once constructed.
Consider a use case where we want to use a cron expression to trigger a poll once each hour but, if no message is received, poll once per minute and, when a message is retrieved, revert to using the cron expression.
The advice (and poller) use a CompoundTrigger
for this purpose.
The trigger’s primary
trigger can be a CronTrigger
.
When the advice detects that no message is received, it adds the secondary trigger to the CompoundTrigger
.
When the CompoundTrigger
instance’s nextExecutionTime
method is invoked, it delegates to the secondary trigger, if present.
Otherwise, it delegates to the primary trigger.
The poller must also have a reference to the same CompoundTrigger
.
The following example shows the configuration for the hourly cron expression with a fallback to every minute:
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
<int:poller trigger="compoundTrigger">
<int:advice-chain>
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
<constructor-arg ref="compoundTrigger"/>
<constructor-arg ref="secondary"/>
</bean>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
<constructor-arg ref="primary" />
</bean>
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
<constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
<constructor-arg value="60000" />
</bean>
Important: Async Handoff
CompoundTriggerAdvice modifies the trigger based on the receive() result.
This works only if the advice is called on the poller thread.
It does not work if the poller has a task-executor .
To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using an ExecutorChannel .
|