Message Endpoints
The first part of this chapter covers some background theory and reveals quite a bit about the underlying API that drives Spring Integration’s various messaging components. This information can be helpful if you want to really understand what goes on behind the scenes. However, if you want to get up and running with the simplified namespace-based configuration of the various elements, feel free to skip ahead to Endpoint Namespace Support for now.
As mentioned in the overview, message endpoints are responsible for connecting the various messaging components to channels. Over the next several chapters, we cover a number of different components that consume messages. Some of these are also capable of sending reply messages. Sending messages is quite straightforward. As shown earlier in Message Channels, you can send a message to a message channel. However, receiving is a bit more complicated. The main reason is that there are two types of consumers: polling consumers and event-driven consumers.
Of the two, event-driven consumers are much simpler.
Without any need to manage and schedule a separate poller thread, they are essentially listeners with a callback method.
When connecting to one of Spring Integration’s subscribable message channels, this simple option works great.
However, when connecting to a buffering, pollable message channel, some component has to schedule and manage the polling threads.
Spring Integration provides two different endpoint implementations to accommodate these two types of consumers.
Therefore, the consumers themselves need only implement the callback interface.
When polling is required, the endpoint acts as a container for the consumer instance.
The benefit is similar to that of using a container for hosting message-driven beans, but, since these consumers are Spring-managed objects running within an ApplicationContext
, it more closely resembles Spring’s own MessageListener
containers.
Message Handler
Spring Integration’s MessageHandler
interface is implemented by many of the components within the framework.
In other words, this is not part of the public API, and you would not typically implement MessageHandler
directly.
Nevertheless, it is used by a message consumer for actually handling the consumed messages, so being aware of this strategy interface does help in terms of understanding the overall role of a consumer.
The interface is defined as follows:
public interface MessageHandler {
void handleMessage(Message<?> message);
}
Despite its simplicity, this interface provides the foundation for most of the components (routers, transformers, splitters, aggregators, service activators, and others) covered in the following chapters. Those components each perform very different functionality with the messages they handle, but the requirements for actually receiving a message are the same, and the choice between polling and event-driven behavior is also the same. Spring Integration provides two endpoint implementations that host these callback-based handlers and let them be connected to message channels.
Event-driven Consumer
Because it is the simpler of the two, we cover the event-driven consumer endpoint first.
You may recall that the SubscribableChannel
interface provides a subscribe()
method and that the method accepts a MessageHandler
parameter (as shown in SubscribableChannel
).
The following listing shows the definition of the subscribe
method:
subscribableChannel.subscribe(messageHandler);
Since a handler that is subscribed to a channel does not have to actively poll that channel, this is an event-driven consumer, and the implementation provided by Spring Integration accepts a SubscribableChannel
and a MessageHandler
, as the following example shows:
SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);
Polling Consumer
Spring Integration also provides a PollingConsumer
, and it can be instantiated in the same way except that the channel must implement PollableChannel
, as the following example shows:
PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);
PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
For more information regarding polling consumers, see Poller and Channel Adapter. |
There are many other configuration options for the polling consumer. For example, the trigger is a required property. The following example shows how to set the trigger:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new PeriodicTrigger(30, TimeUnit.SECONDS));
The PeriodicTrigger
is typically defined with a simple interval (in milliseconds) but also supports an initialDelay
property and a boolean fixedRate
property (the default is false
— that is, no fixed delay).
The following example sets both properties:
PeriodicTrigger trigger = new PeriodicTrigger(1000);
trigger.setInitialDelay(5000);
trigger.setFixedRate(true);
The result of the three settings in the preceding example is a trigger that waits five seconds and then triggers every second.
The CronTrigger
requires a valid cron expression.
See the Javadoc for details.
The following example sets a new CronTrigger
:
CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");
The result of the trigger defined in the previous example is a trigger that triggers every ten seconds, Monday through Friday.
In addition to the trigger, you can specify two other polling-related configuration properties: maxMessagesPerPoll
and receiveTimeout
.
The following example shows how to set these two properties:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);
The maxMessagesPerPoll
property specifies the maximum number of messages to receive within a given poll operation.
This means that the poller continues calling receive()
without waiting, until either null
is returned or the maximum value is reached.
For example, if a poller has a ten-second interval trigger and a maxMessagesPerPoll
setting of 25
, and it is polling a channel that has 100 messages in its queue, all 100 messages can be retrieved within 40 seconds.
It grabs 25, waits ten seconds, grabs the next 25, and so on.
If maxMessagesPerPoll
is configured with a negative value, then MessageSource.receive()
is called within a single polling cycle until it returns null
.
Starting with version 5.5, a 0
value has a special meaning - skip the MessageSource.receive()
call altogether, which may be considered as pausing for this polling endpoint until the maxMessagesPerPoll
is changed to a n non-zero value at a later time, e.g. via a Control Bus.
The receiveTimeout
property specifies the amount of time the poller should wait if no messages are available when it invokes the receive operation.
For example, consider two options that seem similar on the surface but are actually quite different: The first has an interval trigger of 5 seconds and a receive timeout of 50 milliseconds, while the second has an interval trigger of 50 milliseconds and a receive timeout of 5 seconds.
The first one may receive a message up to 4950 milliseconds later than it arrived on the channel (if that message arrived immediately after one of its poll calls returned).
On the other hand, the second configuration never misses a message by more than 50 milliseconds.
The difference is that the second option requires a thread to wait.
However, as a result, it can respond much more quickly to arriving messages.
This technique, known as “long polling”, can be used to emulate event-driven behavior on a polled source.
A polling consumer can also delegate to a Spring TaskExecutor
, as the following example shows:
PollingConsumer consumer = new PollingConsumer(channel, handler);
TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);
Furthermore, a PollingConsumer
has a property called adviceChain
.
This property lets you to specify a List
of AOP advices for handling additional cross-cutting concerns including transactions.
These advices are applied around the doPoll()
method.
For more in-depth information, see the sections on AOP advice chains and transaction support under Endpoint Namespace Support.
The earlier examples show dependency lookups.
However, keep in mind that these consumers are most often configured as Spring bean definitions.
In fact, Spring Integration also provides a FactoryBean
called ConsumerEndpointFactoryBean
that creates the appropriate consumer type based on the type of channel.
Also, Spring Integration has full XML namespace support to even further hide those details.
The namespace-based configuration is in this guide featured as each component type is introduced.
Many of the MessageHandler implementations can generate reply messages.
As mentioned earlier, sending messages is trivial when compared to receiving messages.
Nevertheless, when and how many reply messages are sent depends on the handler type.
For example, an aggregator waits for a number of messages to arrive and is often configured as a downstream consumer for a splitter, which can generate multiple replies for each message it handles.
When using the namespace configuration, you do not strictly need to know all of the details.
However, it still might be worth knowing that several of these components share a common base class, the AbstractReplyProducingMessageHandler , and that it provides a setOutputChannel(..) method.
|
Endpoint Namespace Support
Throughout this reference manual, you can find specific configuration examples for endpoint elements, such as router, transformer, service-activator, and so on.
Most of these support an input-channel
attribute and many support an output-channel
attribute.
After being parsed, these endpoint elements produce an instance of either the PollingConsumer
or the EventDrivenConsumer
, depending on the type of the input-channel
that is referenced: PollableChannel
or SubscribableChannel
, respectively.
When the channel is pollable, the polling behavior is based on the endpoint element’s poller
sub-element and its attributes.
The following listing lists all of the available configuration options for a poller
:
<int:poller cron="" (1)
default="false" (2)
error-channel="" (3)
fixed-delay="" (4)
fixed-rate="" (5)
id="" (6)
max-messages-per-poll="" (7)
receive-timeout="" (8)
ref="" (9)
task-executor="" (10)
time-unit="MILLISECONDS" (11)
trigger=""> (12)
<int:advice-chain /> (13)
<int:transactional /> (14)
</int:poller>
1 | Provides the ability to configure pollers by using Cron expressions.
The underlying implementation uses an org.springframework.scheduling.support.CronTrigger .
If this attribute is set, none of the following attributes must be specified: fixed-delay , trigger , fixed-rate , and ref . |
2 | By setting this attribute to true , you can define exactly one global default poller.
An exception is raised if more than one default poller is defined in the application context.
Any endpoints connected to a PollableChannel (PollingConsumer ) or any SourcePollingChannelAdapter that does not have an explicitly configured poller then uses the global default poller.
It defaults to false .
Optional. |
3 | Identifies the channel to which error messages are sent if a failure occurs in this poller’s invocation.
To completely suppress exceptions, you can provide a reference to the nullChannel .
Optional. |
4 | The fixed delay trigger uses a PeriodicTrigger under the covers.
If you do not use the time-unit attribute, the specified value is represented in milliseconds.
If this attribute is set, none of the following attributes must be specified: fixed-rate , trigger , cron , and ref . |
5 | The fixed rate trigger uses a PeriodicTrigger under the covers.
If you do not use the time-unit attribute, the specified value is represented in milliseconds.
If this attribute is set, none of the following attributes must be specified: fixed-delay , trigger , cron , and ref . |
6 | The ID referring to the poller’s underlying bean-definition, which is of type org.springframework.integration.scheduling.PollerMetadata .
The id attribute is required for a top-level poller element, unless it is the default poller (default="true" ). |
7 | See Configuring An Inbound Channel Adapter for more information.
If not specified, the default value depends on the context.
If you use a PollingConsumer , this attribute defaults to -1 .
However, if you use a SourcePollingChannelAdapter , the max-messages-per-poll attribute defaults to 1 .
Optional. |
8 | Value is set on the underlying class PollerMetadata .
If not specified, it defaults to 1000 (milliseconds).
Optional. |
9 | Bean reference to another top-level poller.
The ref attribute must not be present on the top-level poller element.
However, if this attribute is set, none of the following attributes must be specified: fixed-rate , trigger , cron , and fixed-delay . |
10 | Provides the ability to reference a custom task executor. See TaskExecutor Support for further information. Optional. |
11 | This attribute specifies the java.util.concurrent.TimeUnit enum value on the underlying org.springframework.scheduling.support.PeriodicTrigger .
Therefore, this attribute can be used only in combination with the fixed-delay or fixed-rate attributes.
If combined with either cron or a trigger reference attribute, it causes a failure.
The minimal supported granularity for a PeriodicTrigger is milliseconds.
Therefore, the only available options are milliseconds and seconds.
If this value is not provided, any fixed-delay or fixed-rate value is interpreted as milliseconds.
Basically, this enum provides a convenience for seconds-based interval trigger values.
For hourly, daily, and monthly settings, we recommend using a cron trigger instead. |
12 | Reference to any Spring-configured bean that implements the org.springframework.scheduling.Trigger interface.
However, if this attribute is set, none of the following attributes must be specified: fixed-delay , fixed-rate , cron , and ref .
Optional. |
13 | Allows specifying extra AOP advices to handle additional cross-cutting concerns. See Transaction Support for further information. Optional. |
14 | Pollers can be made transactional. See AOP Advice chains for further information. Optional. |
Examples
A simple interval-based poller with a 1-second interval can be configured as follows:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>
As an alternative to using the fixed-rate
attribute, you can also use the fixed-delay
attribute.
For a poller based on a Cron expression, use the cron
attribute instead, as the following example shows:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>
If the input channel is a PollableChannel
, the poller configuration is required.
Specifically, as mentioned earlier, the trigger
is a required property of the PollingConsumer
class.
Therefore, if you omit the poller
sub-element for a polling consumer endpoint’s configuration, an exception may be thrown.
The exception may also be thrown if you attempt to configure a poller on the element that is connected to a non-pollable channel.
It is also possible to create top-level pollers, in which case only a ref
attribute is required, as the following example shows:
<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
The ref attribute is allowed only on the inner poller definitions.
Defining this attribute on a top-level poller results in a configuration exception being thrown during initialization of the application context.
|
Global Default Poller
To simplify the configuration even further, you can define a global default poller.
A single top-level poller component in XML DSL may have the default
attribute set to true
.
For Java configuration a PollerMetadata
bean with the PollerMetadata.DEFAULT_POLLER
name must be declared in this case.
In that case, any endpoint with a PollableChannel
for its input channel, that is defined within the same ApplicationContext
, and has no explicitly configured poller
uses that default.
The following example shows such a poller and a transformer that uses it:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
return IntegrationFlows.from(MessageChannels.queue("pollable"))
.transform(transformer) // No 'poller' attribute because there is a default global poller
.channel("output")
.get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
@Bean
public QueueChannel pollable() {
return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
PollerMetadata()
.also {
it.maxMessagesPerPoll = 5
it.trigger = PeriodicTrigger(3000)
}
@Bean
fun convertFlow() =
integrationFlow(MessageChannels.queue("pollable")) {
transform(transformer) // No 'poller' attribute because there is a default global poller
channel("output")
}
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>
<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output"/>
Transaction Support
Spring Integration also provides transaction support for the pollers so that each receive-and-forward operation can be performed as an atomic unit of work.
To configure transactions for a poller, add the <transactional/>
sub-element.
The following example shows the available attributes:
<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>
For more information, see Poller Transaction Support.
AOP Advice chains
Since Spring transaction support depends on the proxy mechanism with TransactionInterceptor
(AOP Advice) handling transactional behavior of the message flow initiated by the poller, you must sometimes provide extra advices to handle other cross cutting behavior associated with the poller.
For that, the poller
defines an advice-chain
element that lets you add more advices in a class that implements the MethodInterceptor
interface.
The following example shows how to define an advice-chain
for a poller
:
<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>
For more information on how to implement the MethodInterceptor
interface, see the AOP sections of the Spring Framework Reference Guide.
An advice chain can also be applied on a poller that does not have any transaction configuration, letting you enhance the behavior of the message flow initiated by the poller.
When using an advice chain, the <transactional/> child element cannot be specified.
Instead, declare a <tx:advice/> bean and add it to the <advice-chain/> .
See Poller Transaction Support for complete configuration details.
|
TaskExecutor Support
The polling threads may be executed by any instance of Spring’s TaskExecutor
abstraction.
This enables concurrency for an endpoint or group of endpoints.
As of Spring 3.0, the core Spring Framework has a task
namespace, and its <executor/>
element supports the creation of a simple thread pool executor.
That element accepts attributes for common concurrency settings, such as pool-size and queue-capacity.
Configuring a thread-pooling executor can make a substantial difference in how the endpoint performs under load.
These settings are available for each endpoint, since the performance of an endpoint is one of the major factors to consider (the other major factor being the expected volume on the channel to which the endpoint subscribes).
To enable concurrency for a polling endpoint that is configured with the XML namespace support, provide the task-executor
reference on its <poller/>
element and then provide one or more of the properties shown in the following example:
<int:poller task-executor="pool" fixed-rate="1000"/>
<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
If you do not provide a task-executor, the consumer’s handler is invoked in the caller’s thread.
Note that the caller is usually the default TaskScheduler
(see Configuring the Task Scheduler).
You should also keep in mind that the task-executor
attribute can provide a reference to any implementation of Spring’s TaskExecutor
interface by specifying the bean name.
The executor
element shown earlier is provided for convenience.
As mentioned earlier in the background section for polling consumers, you can also configure a polling consumer in such a way as to emulate event-driven behavior.
With a long receive timeout and a short interval in the trigger, you can ensure a very timely reaction to arriving messages even on a polled message source.
Note that this applies only to sources that have a blocking wait call with a timeout.
For example, the file poller does not block.
Each receive()
call returns immediately and either contains new files or not.
Therefore, even if a poller contains a long receive-timeout
, that value would never be used in such a scenario.
On the other hand, when using Spring Integration’s own queue-based channels, the timeout value does have a chance to participate.
The following example shows how a polling consumer can receive messages nearly instantaneously:
<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>
</int:service-activator>
Using this approach does not carry much overhead, since, internally, it is nothing more then a timed-wait thread, which does not require nearly as much CPU resource usage as (for example) a thrashing, infinite while loop.
Changing Polling Rate at Runtime
When configuring a poller with a fixed-delay
or a fixed-rate
attribute, the default implementation uses a PeriodicTrigger
instance.
The PeriodicTrigger
is part of the core Spring Framework.
It accepts the interval only as a constructor argument.
Therefore, it cannot be changed at runtime.
However, you can define your own implementation of the org.springframework.scheduling.Trigger
interface.
You could even use the PeriodicTrigger
as a starting point.
Then you can add a setter for the interval (period), or you can even embed your own throttling logic within the trigger itself.
The period
property is used with each call to nextExecutionTime
to schedule the next poll.
To use this custom trigger within pollers, declare the bean definition of the custom trigger in your application context and inject the dependency into your poller configuration by using the trigger
attribute, which references the custom trigger bean instance.
You can now obtain a reference to the trigger bean and change the polling interval between polls.
For an example, see the Spring Integration Samples project.
It contains a sample called dynamic-poller
, which uses a custom trigger and demonstrates the ability to change the polling interval at runtime.
The sample provides a custom trigger that implements the org.springframework.scheduling.Trigger
interface.
The sample’s trigger is based on Spring’s PeriodicTrigger
implementation.
However, the fields of the custom trigger are not final, and the properties have explicit getters and setters, letting you dynamically change the polling period at runtime.
It is important to note, though, that because the Trigger method is nextExecutionTime() , any changes to a dynamic trigger do not take effect until the next poll, based on the existing configuration.
It is not possible to force a trigger to fire before its currently configured next execution time.
|
Payload Type Conversion
Throughout this reference manual, you can also see specific configuration and implementation examples of various endpoints that accept a message or any arbitrary Object
as an input parameter.
In the case of an Object
, such a parameter is mapped to a message payload or part of the payload or header (when using the Spring Expression Language).
However, the type of input parameter of the endpoint method sometimes does not match the type of the payload or its part.
In this scenario, we need to perform type conversion.
Spring Integration provides a convenient way for registering type converters (by using the Spring ConversionService
) within its own instance of a conversion service bean named integrationConversionService
.
That bean is automatically created as soon as the first converter is defined by using the Spring Integration infrastructure.
To register a converter, you can implement org.springframework.core.convert.converter.Converter
, org.springframework.core.convert.converter.GenericConverter
, or org.springframework.core.convert.converter.ConverterFactory
.
The Converter
implementation is the simplest and converts from a single type to another.
For more sophistication, such as converting to a class hierarchy, you can implement a GenericConverter
and possibly a ConditionalConverter
.
These give you complete access to the from
and to
type descriptors, enabling complex conversions.
For example, if you have an abstract class called Something
that is the target of your conversion (parameter type, channel data type, and so on), you have two concrete implementations called Thing1
and Thing
, and you wish to convert to one or the other based on the input type, the GenericConverter
would be a good fit.
For more information, see the Javadoc for these interfaces:
When you have implemented your converter, you can register it with convenient namespace support, as the following example shows:
<int:converter ref="sampleConverter"/>
<bean id="sampleConverter" class="foo.bar.TestConverter"/>
Alternately, you can use an inner bean, as the following example shows:
<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
Starting with Spring Integration 4.0, you can use annotations to create the preceding configuration, as the following example shows:
@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {
public Number convert(Boolean source) {
return source ? 1 : 0;
}
}
Alternately, you can use the @Configuration
annotation, as the following example shows:
@Configuration
@EnableIntegration
public class ContextConfiguration {
@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}
}
When configuring an application context, the Spring Framework lets you add a In contrast, the However, if you do want to use the Spring
In this case, the converters provided by the |
Content Type Conversion
Starting with version 5.0, by default, the method invocation mechanism is based on the org.springframework.messaging.handler.invocation.InvocableHandlerMethod
infrastructure.
Its HandlerMethodArgumentResolver
implementations (such as PayloadArgumentResolver
and MessageMethodArgumentResolver
) can use the MessageConverter
abstraction to convert an incoming payload
to the target method argument type.
The conversion can be based on the contentType
message header.
For this purpose, Spring Integration provides the ConfigurableCompositeMessageConverter
, which delegates to a list of registered converters to be invoked until one of them returns a non-null result.
By default, this converter provides (in strict order):
-
MappingJackson2MessageConverter
if the Jackson processor is present on the classpath
See the Javadoc (linked in the preceding list) for more information about their purpose and appropriate contentType
values for conversion.
The ConfigurableCompositeMessageConverter
is used because it can be supplied with any other MessageConverter
implementations, including or excluding the previously mentioned default converters.
It can also be registered as an appropriate bean in the application context, overriding the default converter, as the following example shows:
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}
Those two new converters are registered in the composite before the defaults.
You can also not use a ConfigurableCompositeMessageConverter
but provide your own MessageConverter
by registering a bean with the name, integrationArgumentResolverMessageConverter
(by setting the IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
property).
The MessageConverter -based (including contentType header) conversion is not available when using SpEL method invocation.
In this case, only the regular class-to-class conversion mentioned above in the Payload Type Conversion is available.
|
Asynchronous Polling
If you want the polling to be asynchronous, a poller can optionally specify a task-executor
attribute that points to an existing instance of any TaskExecutor
bean (Spring 3.0 provides a convenient namespace configuration through the task
namespace).
However, there are certain things you must understand when configuring a poller with a TaskExecutor
.
The problem is that there are two configurations in place, the poller and the TaskExecutor
.
They must be in tune with each other.
Otherwise, you might end up creating an artificial memory leak.
Consider the following configuration:
<int:channel id="publishChannel">
<int:queue />
</int:channel>
<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="20" />
The preceding configuration demonstrates an out-of-tune configuration.
By default, the task executor has an unbounded task queue. The poller keeps scheduling new tasks even though all the threads are blocked, waiting for either a new message to arrive or the timeout to expire. Given that there are 20 threads executing tasks with a five-second timeout, they are executed at a rate of 4 per second. However, new tasks are being scheduled at a rate of 20 per second, so the internal queue in the task executor grows at a rate of 16 per second (while the process is idle), so we have a memory leak.
One of the ways to handle this is to set the queue-capacity
attribute of the task executor.
Even 0 is a reasonable value.
You can also manage it by specifying what to do with messages that can not be queued by setting the rejection-policy
attribute of the Task Executor (for example, to DISCARD
).
In other words, there are certain details you must understand when configuring TaskExecutor
.
See “Task Execution and Scheduling” in the Spring reference manual for more detail on the subject.
Endpoint Inner Beans
Many endpoints are composite beans.
This includes all consumers and all polled inbound channel adapters.
Consumers (polled or event-driven) delegate to a MessageHandler
.
Polled adapters obtain messages by delegating to a MessageSource
.
Often, it is useful to obtain a reference to the delegate bean, perhaps to change configuration at runtime or for testing.
These beans can be obtained from the ApplicationContext
with well known names.
MessageHandler
instances are registered with the application context with bean IDs similar to someConsumer.handler
(where 'consumer' is the value of the endpoint’s id
attribute).
MessageSource
instances are registered with bean IDs similar to somePolledAdapter.source
, where 'somePolledAdapter' is the ID of the adapter.
The preceding only applies to the framework component itself. You can instead use an inner bean definition, as the following example shows:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>
The bean is treated like any inner bean declared and is not registered with the application context.
If you wish to access this bean in some other manner, declare it at the top level with an id
and use the ref
attribute instead.
See the Spring Documentation for more information.
Endpoint Roles
Starting with version 4.2, endpoints can be assigned to roles.
Roles let endpoints be started and stopped as a group.
This is particularly useful when using leadership election, where a set of endpoints can be started or stopped when leadership is granted or revoked, respectively.
For this purpose the framework registers a SmartLifecycleRoleController
bean in the application context with the name IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER
.
Whenever it is necessary to control lifecycles, this bean can be injected or @Autowired
:
<bean class="com.some.project.SomeLifecycleControl">
<property name="roleController" ref="integrationLifecycleRoleController"/>
</bean>
You can assign endpoints to roles using XML, Java configuration, or programmatically. The following example shows how to configure endpoint roles with XML:
<int:inbound-channel-adapter id="ica" channel="someChannel" expression="'foo'" role="cluster"
auto-startup="false">
<int:poller fixed-rate="60000" />
</int:inbound-channel-adapter>
The following example shows how to configure endpoint roles for a bean created in Java:
@Bean
@ServiceActivator(inputChannel = "sendAsyncChannel", autoStartup="false")
@Role("cluster")
public MessageHandler sendAsyncHandler() {
return // some MessageHandler
}
The following example shows how to configure endpoint roles on a method in Java:
@Payload("#args[0].toLowerCase()")
@Role("cluster")
public String handle(String payload) {
return payload.toUpperCase();
}
The following example shows how to configure endpoint roles by using the SmartLifecycleRoleController
in Java:
@Autowired
private SmartLifecycleRoleController roleController;
...
this.roleController.addSmartLifeCycleToRole("cluster", someEndpoint);
...
The following example shows how to configure endpoint roles by using an IntegrationFlow
in Java:
IntegrationFlow flow -> flow
.handle(..., e -> e.role("cluster"));
Each of these adds the endpoint to the cluster
role.
Invoking roleController.startLifecyclesInRole("cluster")
and the corresponding stop…
method starts and stops the endpoints.
Any object that implements SmartLifecycle can be programmatically added — not just endpoints.
|
The SmartLifecycleRoleController
implements ApplicationListener<AbstractLeaderEvent>
and it automatically starts and stops its configured SmartLifecycle
objects when leadership is granted or revoked (when some bean publishes OnGrantedEvent
or OnRevokedEvent
, respectively).
When using leadership election to start and stop components, it is important to set the auto-startup XML attribute (autoStartup bean property) to false so that the application context does not start the components during context initialization.
|
Starting with version 4.3.8, the SmartLifecycleRoleController
provides several status methods:
public Collection<String> getRoles() (1)
public boolean allEndpointsRunning(String role) (2)
public boolean noEndpointsRunning(String role) (3)
public Map<String, Boolean> getEndpointsRunningStatus(String role) (4)
1 | Returns a list of the roles being managed. |
2 | Returns true if all endpoints in the role are running. |
3 | Returns true if none of the endpoints in the role are running. |
4 | Returns a map of component name : running status .
The component name is usually the bean name. |
Leadership Event Handling
Groups of endpoints can be started and stopped based on leadership being granted or revoked, respectively. This is useful in clustered scenarios where shared resources must be consumed by only a single instance. An example of this is a file inbound channel adapter that is polling a shared directory. (See Reading Files).
To participate in a leader election and be notified when elected leader, when leadership is revoked, or on failure to acquire the resources to become leader, an application creates a component in the application context called a “leader initiator”.
Normally, a leader initiator is a SmartLifecycle
, so it starts (optionally) when the context starts and then publishes notifications when leadership changes.
You can also receive failure notifications by setting the publishFailedEvents
to true
(starting with version 5.0), for cases when you want to take a specific action if a failure occurs.
By convention, you should provide a Candidate
that receives the callbacks.
You can also revoke the leadership through a Context
object provided by the framework.
Your code can also listen for o.s.i.leader.event.AbstractLeaderEvent
instances (the super class of OnGrantedEvent
and OnRevokedEvent
) and respond accordingly (for instance, by using a SmartLifecycleRoleController
).
The events contain a reference to the Context
object.
The following listing shows the definition of the Context
interface:
public interface Context {
boolean isLeader();
void yield();
String getRole();
}
Starting with version 5.0.6, the context provides a reference to the candidate’s role.
Spring Integration provides a basic implementation of a leader initiator that is based on the LockRegistry
abstraction.
To use it, you need to create an instance as a bean, as the following example shows:
@Bean
public LockRegistryLeaderInitiator leaderInitiator(LockRegistry locks) {
return new LockRegistryLeaderInitiator(locks);
}
If the lock registry is implemented correctly, there is only ever at most one leader.
If the lock registry also provides locks that throw exceptions (ideally, InterruptedException
) when they expire or are broken, the duration of the leaderless periods can be as short as is allowed by the inherent latency in the lock implementation.
By default, the busyWaitMillis
property adds some additional latency to prevent CPU starvation in the (more usual) case that the locks are imperfect, and you only know they expired when you try to obtain one again.
See Zookeeper Leadership Event Handling for more information about leadership election and events that use Zookeeper.