This chapter covers the details of using Spring Integration to route messages.
This section covers how routers work. It includes the following topics:
Routers are a crucial element in many messaging architectures. They consume messages from a message channel and forward each consumed message to one or more different message channels depending on a set of conditions.
Spring Integration provides the following routers:
Router implementations share many configuration parameters. However, certain differences exist between routers. Furthermore, the availability of configuration parameters depends on whether routers are used inside or outside of a chain. In order to provide a quick overview, all available attributes are listed in the two following tables .
The following table shows the configuration parameters available for a router outside of a chain:
Table 8.1. Routers Outside of a Chain
Attribute | router | header value router | xpath router | payload type router | recipient list route | exception type router |
---|---|---|---|---|---|---|
apply-sequence | ||||||
default-output-channel | ||||||
resolution-required | ||||||
ignore-send-failures | ||||||
timeout | ||||||
id | ||||||
auto-startup | ||||||
input-channel | ||||||
order | ||||||
method | ||||||
ref | ||||||
expression | ||||||
header-name | ||||||
evaluate-as-string | ||||||
xpath-expression-ref | ||||||
converter |
The following table shows the configuration parameters available for a router inside of a chain:
Table 8.2. Routers Inside of a Chain
Attribute | router | header value router | xpath router | payload type router | recipient list router | exception type router |
---|---|---|---|---|---|---|
apply-sequence | ||||||
default-output-channel | ||||||
resolution-required | ||||||
ignore-send-failures | ||||||
timeout | ||||||
id | ||||||
auto-startup | ||||||
input-channel | ||||||
order | ||||||
method | ||||||
ref | ||||||
expression | ||||||
header-name | ||||||
evaluate-as-string | ||||||
xpath-expression-ref | ||||||
converter |
Important | |
---|---|
As of Spring Integration 2.1, router parameters have been more standardized across all router implementations. Consequently, a few minor changes may break older Spring Integration based applications. Since Spring Integration 2.1, the Prior to these changes, the If you do desire to drop messages silently, you can set |
This section describes the parameters common to all router parameters (the parameters with all their boxes ticked in the two tables shown earlier in this chapter).
The following parameters are valid for all routers inside and outside of chains.
apply-sequence
false
.
default-output-channel
If set, this attribute provides a reference to the channel where messages should be sent if channel resolution fails to return any channels.
If no default output channel is provided, the router throws an exception.
If you would like to silently drop those messages instead, set the default output channel attribute value to nullChannel
.
Note | |
---|---|
A message is sent only to the |
resolution-required
This attribute specifies whether channel names must always be successfully resolved to channel instances that exist.
If set to true
, a MessagingException
is raised when the channel cannot be resolved.
Setting this attribute to false
causes any unresovable channels to be ignored.
This optional attribute defaults to true
.
Note | |
---|---|
A Message is sent only to the |
ignore-send-failures
If set to true
, failures to send to a message channel is ignored.
If set to false
, a MessageDeliveryException
is thrown instead, and, if the router resolves more than one channel, any subsequent channels do not receive the message.
The exact behavior of this attribute depends on the type of the Channel
to which the messages are sent.
For example, when using direct channels (single threaded), send failures can be caused by exceptions thrown by components much further downstream.
However, when sending messages to a simple queue channel (asynchronous), the likelihood of an exception to be thrown is rather remote.
Note | |
---|---|
While most routers route to a single channel, they can return more than one channel name.
The |
This attribute defaults to false
.
timeout
timeout
attribute specifies the maximum amount of time in milliseconds to wait when sending messages to the target Message Channels.
By default, the send operation blocks indefinitely.
The following parameters are valid only across all top-level routers that are outside of chains.
id
EventDrivenConsumer
or PollingConsumer
, depending on whether the router’s input-channel
is a SubscribableChannel
or a PollableChannel
, respectively.
This is an optional attribute.
auto-startup
lifecycle
" attribute signaled whether this component should be started during startup of the application context.
This optional attribute defaults to true
.
input-channel
order
Since content-based routing often requires some domain-specific logic, most use cases require Spring Integration’s options for delegating to POJOs by using either the XML namespace support or annotations. Both of these are discussed later. However, we first present a couple of implementations that fulfill common requirements.
A PayloadTypeRouter
sends messages to the channel defined by payload-type mappings, as the following example shows:
<bean id="payloadTypeRouter" class="org.springframework.integration.router.PayloadTypeRouter"> <property name="channelMapping"> <map> <entry key="java.lang.String" value-ref="stringChannel"/> <entry key="java.lang.Integer" value-ref="integerChannel"/> </map> </property> </bean>
Configuration of the PayloadTypeRouter
is also supported by the namespace provided by Spring Integration (see <<configuration-namespace>>
), which essentially simplifies configuration by combining the <router/>
configuration and its corresponding implementation (defined by using a <bean/>
element) into a single and more concise configuration element.
The following example shows a PayloadTypeRouter
configuration that is equivalent to the one above but uses the namespace support:
<int:payload-type-router input-channel="routingChannel"> <int:mapping type="java.lang.String" channel="stringChannel" /> <int:mapping type="java.lang.Integer" channel="integerChannel" /> </int:payload-type-router>
The following example shows the equivalent router configured in Java:
@ServiceActivator(inputChannel = "routingChannel") @Bean public PayloadTypeRouter router() { PayloadTypeRouter router = new PayloadTypeRouter(); router.setChannelMapping(String.class.getName(), "stringChannel"); router.setChannelMapping(Integer.class.getName(), "integerChannel"); return router; }
When using the Java DSL, there are two options.
First, you can define the router object as shown in the preceding example:
@Bean public IntegrationFlow routerFlow1() { return IntegrationFlows.from("routingChannel") .route(router()) .get(); } public PayloadTypeRouter router() { PayloadTypeRouter router = new PayloadTypeRouter(); router.setChannelMapping(String.class.getName(), "stringChannel"); router.setChannelMapping(Integer.class.getName(), "integerChannel"); return router; }
Note that the router can be, but does not have to be, a @Bean
.
The flow registers it if it is not a @Bean
.
Second, you can define the routing function within the DSL flow itself, as the following example shows:
@Bean public IntegrationFlow routerFlow2() { return IntegrationFlows.from("routingChannel") .<Object, Class<?>>route(Object::getClass, m -> m .channelMapping(String.class, "stringChannel") .channelMapping(Integer.class, "integerChannel")) .get(); }
A HeaderValueRouter
sends Messages to the channel based on the individual header value mappings.
When a HeaderValueRouter
is created, it is initialized with the name of the header to be evaluated.
The value of the header could be one of two things:
If it is an arbitrary value, additional mappings for these header values to channel names are required. Otherwise, no additional configuration is needed.
Spring Integration provides a simple namespace-based XML configuration to configure a HeaderValueRouter
.
The following example demonstrates configuration for the HeaderValueRouter
when mapping of header values to channels is required:
<int:header-value-router input-channel="routingChannel" header-name="testHeader"> <int:mapping value="someHeaderValue" channel="channelA" /> <int:mapping value="someOtherHeaderValue" channel="channelB" /> </int:header-value-router>
During the resolution process, the router defined in the preceding example may encounter channel resolution failures, causing an exception.
If you want to suppress such exceptions and send unresolved messages to the default output channel (identified with the default-output-channel
attribute) set resolution-required
to false
.
Normally, messages for which the header value is not explicitly mapped to a channel are sent to the default-output-channel
.
However, when the header value is mapped to a channel name but the channel cannot be resolved, setting the resolution-required
attribute to false
results in routing such messages to the default-output-channel
.
Important | |
---|---|
As of Spring Integration 2.1, the attribute was changed from |
The following example shows the equivalent router configured in Java:
@ServiceActivator(inputChannel = "routingChannel") @Bean public HeaderValueRouter router() { HeaderValueRouter router = new HeaderValueRouter("testHeader"); router.setChannelMapping("someHeaderValue", "channelA"); router.setChannelMapping("someOtherHeaderValue", "channelB"); return router; }
When using the Java DSL, there are two options. First, you can define the router object as shown in the preceding example:
@Bean public IntegrationFlow routerFlow1() { return IntegrationFlows.from("routingChannel") .route(router()) .get(); } public HeaderValueRouter router() { HeaderValueRouter router = new HeaderValueRouter("testHeader"); router.setChannelMapping("someHeaderValue", "channelA"); router.setChannelMapping("someOtherHeaderValue", "channelB"); return router; }
Note that the router can be, but does not have to be, a @Bean
.
The flow registers it if it is not a @Bean
.
Second, you can define the routing function within the DSL flow itself, as the following example shows:
@Bean public IntegrationFlow routerFlow2() { return IntegrationFlows.from("routingChannel") .<Message<?>, String>route(m -> m.getHeaders().get("testHeader", String.class), m -> m .channelMapping("someHeaderValue", "channelA") .channelMapping("someOtherHeaderValue", "channelB"), e -> e.id("headerValueRouter")) .get(); }
Configuration where mapping of header values to channel names is not required, because header values themselves represent channel names. The following example shows a router that does not require mapping of header values to channel names:
<int:header-value-router input-channel="routingChannel" header-name="testHeader"/>
Note | |
---|---|
Since Spring Integration 2.1, the behavior of resolving channels is more explicit.
For example, if you omit the Basically, by default, the router must be able to route messages successfully to at least one channel.
If you really want to drop messages, you must also have |
A RecipientListRouter
sends each received message to a statically defined list of message channels.
The following example creates a RecipientListRouter
:
<bean id="recipientListRouter" class="org.springframework.integration.router.RecipientListRouter"> <property name="channels"> <list> <ref bean="channel1"/> <ref bean="channel2"/> <ref bean="channel3"/> </list> </property> </bean>
Spring Integration also provides namespace support for the RecipientListRouter
configuration (see Section E.1, “Namespace Support”) as the following example shows:
<int:recipient-list-router id="customRouter" input-channel="routingChannel" timeout="1234" ignore-send-failures="true" apply-sequence="true"> <int:recipient channel="channel1"/> <int:recipient channel="channel2"/> </int:recipient-list-router>
The following example shows the equivalent router configured in Java:
@ServiceActivator(inputChannel = "routingChannel") @Bean public RecipientListRouter router() { RecipientListRouter router = new RecipientListRouter(); router.setSendTimeout(1_234L); router.setIgnoreSendFailures(true); router.setApplySequence(true); router.addRecipient("channel1"); router.addRecipient("channel2"); router.addRecipient("channel3"); return router; }
The following example shows the equivalent router configured by using the Java DSL:
@Bean public IntegrationFlow routerFlow() { return IntegrationFlows.from("routingChannel") .routeToRecipients(r -> r .applySequence(true) .ignoreSendFailures(true) .recipient("channel1") .recipient("channel2") .recipient("channel3") .sendTimeout(1_234L)) .get(); }
Note | |
---|---|
The apply-sequence flag here has the same effect as it does for a publish-subscribe-channel, and, as with a publish-subscribe-channel, it is disabled by default on the |
Another convenient option when configuring a RecipientListRouter
is to use Spring Expression Language (SpEL) support as selectors for individual recipient channels.
Doing so is similar to using a filter at the beginning of a chain to act as a "selective consumer
".
However, in this case, it is all combined rather concisely into the router’s configuration, as the following example shows:
<int:recipient-list-router id="customRouter" input-channel="routingChannel"> <int:recipient channel="channel1" selector-expression="payload.equals('foo')"/> <int:recipient channel="channel2" selector-expression="headers.containsKey('bar')"/> </int:recipient-list-router>
In the preceding configuration, a SpEL expression identified by the selector-expression
attribute is evaluated to determine whether this recipient should be included in the recipient list for a given input message.
The evaluation result of the expression must be a boolean.
If this attribute is not defined, the channel is always among the list of recipients.
Starting with version 4.1, the RecipientListRouter
provides several operations to manipulate recipients dynamically at runtime.
These management operations are presented by RecipientListRouterManagement
through the @ManagedResource
annotation.
They are available by using Section 12.6, “Control Bus” as well as by using JMX, as the following example shows:
<control-bus input-channel="controlBus"/> <recipient-list-router id="simpleRouter" input-channel="routingChannelA"> <recipient channel="channel1"/> </recipient-list-router> <channel id="channel2"/>
messagingTemplate.convertAndSend(controlBus, "@'simpleRouter.handler'.addRecipient('channel2')");
From the application start up the simpleRouter
, has only one channel1
recipient.
But after the addRecipient
command, channel2
recipient is added.
It is a "registering an interest in something that is part of the message
" use case, when we may be interested in messages from the router at some time period, so we are subscribing to the the recipient-list-router
and, at some point, decide to unsubscribe.
Because of the runtime management operation for the <recipient-list-router>
, it can be configured without any <recipient>
from the start.
In this case, the behavior of RecipientListRouter
is the same when there is no one matching recipient for the message.
If defaultOutputChannel
is configured, the message is sent there.
Otherwise the MessageDeliveryException
is thrown.
The XPath Router is part of the XML Module. See Section 38.5, “Routing XML Messages with XPath”.
Spring Integration also provides a special type-based router called ErrorMessageExceptionTypeRouter
for routing error messages (defined as messages whose payload
is a Throwable
instance).
ErrorMessageExceptionTypeRouter
is similar to the PayloadTypeRouter
.
In fact, they are almost identical.
The only difference is that, while PayloadTypeRouter
navigates the instance hierarchy of a payload instance (for example, payload.getClass().getSuperclass()
) to find the most specific type and channel mappings,
the ErrorMessageExceptionTypeRouter
navigates the hierarchy of exception causes (for example, payload.getCause()
)
to find the most specific Throwable
type or channel mappings and uses mappingClass.isInstance(cause)
to match the
cause
to the class or any super class.
Note | |
---|---|
Since version 4.3 the |
The following example shows a sample configuration for ErrorMessageExceptionTypeRouter
:
<int:exception-type-router input-channel="inputChannel" default-output-channel="defaultChannel"> <int:mapping exception-type="java.lang.IllegalArgumentException" channel="illegalChannel"/> <int:mapping exception-type="java.lang.NullPointerException" channel="npeChannel"/> </int:exception-type-router> <int:channel id="illegalChannel" /> <int:channel id="npeChannel" />
Spring Integration provides a generic router. You can use it for general-purpose routing (as opposed to the other routers provided by Spring Integration, each of which has some form of specialization).
The router
element provides a way to connect a router to an input channel and also accepts the optional default-output-channel
attribute.
The ref
attribute references the bean name of a custom router implementation (which must extend AbstractMessageRouter
).
The following example shows three generic routers:
<int:router ref="payloadTypeRouter" input-channel="input1" default-output-channel="defaultOutput1"/> <int:router ref="recipientListRouter" input-channel="input2" default-output-channel="defaultOutput2"/> <int:router ref="customRouter" input-channel="input3" default-output-channel="defaultOutput3"/> <beans:bean id="customRouterBean" class="org.foo.MyCustomRouter"/>
Alternatively, ref
may point to a POJO that contains the @Router
annotation (shown later), or you can combine the ref
with an explicit method name.
Specifying a method applies the same behavior described in the @Router
annotation section, later in this document.
The following example defines a router that points to a POJO in its ref
attribute:
<int:router input-channel="input" ref="somePojo" method="someMethod"/>
We generally recommend using a ref
attribute if the custom router implementation is referenced in other <router>
definitions.
However if the custom router implementation should be scoped to a single definition of the <router>
, you can provide an inner bean definition, as the following example shows:
<int:router method="someMethod" input-channel="input3" default-output-channel="defaultOutput3"> <beans:bean class="org.foo.MyCustomRouter"/> </int:router>
Note | |
---|---|
Using both the |
Important | |
---|---|
If the |
The following example shows the equivalent router configured in Java:
@Bean @Router(inputChannel = "routingChannel") public AbstractMessageRouter myCustomRouter() { return new AbstractMessageRouter() { @Override protected Collection<MessageChannel> determineTargetChannels(Message<?> message) { return // determine channel(s) for message } }; }
The following example shows the equivalent router configured by using the Java DSL:
@Bean public IntegrationFlow routerFlow() { return IntegrationFlows.from("routingChannel") .route(myCustomRouter()) .get(); } public AbstractMessageRouter myCustomRouter() { return new AbstractMessageRouter() { @Override protected Collection<MessageChannel> determineTargetChannels(Message<?> message) { return // determine channel(s) for message } }; }
Alternately, you can route on data from the message payload, as the following example shows:
@Bean public IntegrationFlow routerFlow() { return IntegrationFlows.from("routingChannel") .route(String.class, p -> p.contains("foo") ? "fooChannel" : "barChannel") .get(); }
Sometimes, the routing logic may be simple, and writing a separate class for it and configuring it as a bean may seem like overkill. As of Spring Integration 2.0, we offer an alternative that lets you use SpEL to implement simple computations that previously required a custom POJO router.
Note | |
---|---|
For more information about the Spring Expression Language, see the relevant chapter in the Spring Framework Reference Guide: |
Generally, a SpEL expression is evaluated and its result is mapped to a channel, as the following example shows:
<int:router input-channel="inChannel" expression="payload.paymentType"> <int:mapping value="CASH" channel="cashPaymentChannel"/> <int:mapping value="CREDIT" channel="authorizePaymentChannel"/> <int:mapping value="DEBIT" channel="authorizePaymentChannel"/> </int:router>
The following example shows the equivalent router configured in Java:
@Router(inputChannel = "routingChannel") @Bean public ExpressionEvaluatingRouter router() { ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter("payload.paymentType"); router.setChannelMapping("CASH", "cashPaymentChannel"); router.setChannelMapping("CREDIT", "authorizePaymentChannel"); router.setChannelMapping("DEBIT", "authorizePaymentChannel"); return router; }
The following example shows the equivalent router configured in the Java DSL:
@Bean public IntegrationFlow routerFlow() { return IntegrationFlows.from("routingChannel") .route("payload.paymentType", r -> r .channelMapping("CASH", "cashPaymentChannel") .channelMapping("CREDIT", "authorizePaymentChannel") .channelMapping("DEBIT", "authorizePaymentChannel")) .get(); }
To simplify things even more, the SpEL expression may evaluate to a channel name, as the following expression shows:
<int:router input-channel="inChannel" expression="payload + 'Channel'"/>
In the preceding configuration, the result channel is computed by the SpEL expression, which concatenates the value of the payload
with the literal String
, Channel.
Another virtue of SpEL for configuring routers is that an expression can return a Collection
, effectively making every <router>
a recipient list router.
Whenever the expression returns multiple channel values, the message is forwarded to each channel.
The following example shows such an expression:
<int:router input-channel="inChannel" expression="headers.channels"/>
In the above configuration, if the message includes a header with a name of channels and the value of that header is a List
of channel names, the message is sent to each channel in the list.
You may also find collection projection and collection selection expressions useful when you need to select multiple channels.
For further information, see:
When using @Router
to annotate a method, the method may return either a MessageChannel
or a String
type.
In the latter case, the endpoint resolves the channel name as it does for the default output channel.
Additionally, the method may return either a single value or a collection.
If a collection is returned, the reply message is sent to multiple channels.
To summarize, the following method signatures are all valid:
@Router public MessageChannel route(Message message) {...} @Router public List<MessageChannel> route(Message message) {...} @Router public String route(Foo payload) {...} @Router public List<String> route(Foo payload) {...}
In addition to payload-based routing, a message may be routed based on metadata available within the message header as either a property or an attribute.
In this case, a method annotated with @Router
may include a parameter annotated with @Header
, which is mapped to a header value as the following example shows and documented in Section E.5, “Annotation Support”:
@Router public List<String> route(@Header("orderStatus") OrderStatus status)
Note | |
---|---|
For routing of XML-based Messages, including XPath support, see Chapter 38, XML Support - Dealing with XML Payloads. |
See also Section 11.7, “Message Routers” in the Java DSL chapter for more information about router configuration.
Spring Integration provides quite a few different router configurations for common content-based routing use cases as well as the option of implementing custom routers as POJOs.
For example, PayloadTypeRouter
provides a simple way to configure a router that computes channels based on the payload type of the incoming message while HeaderValueRouter
provides the same convenience in configuring a router that computes channels by evaluating the value of a particular message Header.
There are also expression-based (SpEL) routers, in which the channel is determined based on evaluating an expression.
All of these type of routers exhibit some dynamic characteristics.
However, these routers all require static configuration. Even in the case of expression-based routers, the expression itself is defined as part of the router configuration, which means that the same expression operating on the same value always results in the computation of the same channel. This is acceptable in most cases, since such routes are well defined and therefore predictable. But there are times when we need to change router configurations dynamically so that message flows may be routed to a different channel.
For example, you might want to bring down some part of your system for maintenance and temporarily re-reroute messages to a different message flow.
As another example, you may want to introduce more granularity to your message flow by adding another route to handle a more concrete type of java.lang.Number
(in the case of PayloadTypeRouter
).
Unfortunately, with static router configuration to accomplish either of those goals, you would have to bring down your entire application, change the configuration of the router (change routes), and bring the application back up. This is obviously not a solution anyone wants.
The dynamic router pattern describes the mechanisms by which you can change or configure routers dynamically without bringing down the system or individual routers.
Before we get into the specifics of how Spring Integration supports dynamic routing, we need to consider the typical flow of a router:
MessageChannel
.
MessageChannel
There is not much that can be done with regard to dynamic routing if Step 1 results in the actual instance of the MessageChannel
, because the MessageChannel
is the final product of any router’s job.
However, if the first step results in a channel identifier that is not an instance of MessageChannel
, you have quite a few possible ways to influence the process of deriving the MessageChannel
.
Consider the following example of a payload type router:
<int:payload-type-router input-channel="routingChannel"> <int:mapping type="java.lang.String" channel="channel1" /> <int:mapping type="java.lang.Integer" channel="channel2" /> </int:payload-type-router>
Within the context of a payload type router, the three steps mentioned earlier would be realized as follows:
java.lang.String
).
mapping
element.
MessageChannel
as a reference to a bean within the application context (which is hopefully a MessageChannel
) identified by the result of the previous step.
In other words, each step feeds the next step until the process completes.
Now consider an example of a header value router:
<int:header-value-router input-channel="inputChannel" header-name="testHeader"> <int:mapping value="foo" channel="fooChannel" /> <int:mapping value="bar" channel="barChannel" /> </int:header-value-router>
Now we can consider how the three steps work for a header value router:
header-name
attribute.
mapping
element.
MessageChannel
as a reference to a bean within the application context (which is hopefully a MessageChannel
) identified by the result of the previous step.
The preceding two configurations of two different router types look almost identical.
However, if you look at the alternate configuration of the HeaderValueRouter
we clearly see that there is no mapping
sub element, as the following listing shows:
<int:header-value-router input-channel="inputChannel" header-name="testHeader">
However, the configuration is still perfectly valid. So the natural question is what about the mapping in the second step?
The second step is now optional.
If mapping
is not defined, then the channel identifier value computed in the first step is automatically treated as the channel name
, which is now resolved to the actual MessageChannel
, as in the third step.
What it also means is that the second step is one of the key steps to providing dynamic characteristics to the routers, since it introduces a process that lets you change the way channel identifier resolves to the channel name, thus influencing the process of determining the final instance of the MessageChannel
from the initial channel identifier.
For example, in the preceding configuration, assume that the testHeader
value is kermit, which is now a channel identifier (the first step).
Since there is no mapping in this router, resolving this channel identifier to a channel name (the second step) is impossible and this channel identifier is now treated as the channel name.
However, what if there was a mapping but for a different value?
The end result would still be the same, because, if a new value cannot be determined through the process of resolving the channel identifier to a channel name, the channel identifier becomes the channel name.
All that is left is for the third step to resolve the channel name (kermit) to an actual instance of the MessageChannel
identified by this name.
That basically involves a bean lookup for the provided name.
Now all messages that contain the header-value pair as testHeader=kermit
are going to be routed to a MessageChannel
whose bean name (its id
) is kermit.
But what if you want to route these messages to the simpson channel? Obviously changing a static configuration works, but doing so also requires bringing your system down.
However, if you had access to the channel identifier map, you could introduce a new mapping where the header-value pair is now kermit=simpson
, thus letting the second step treat kermit as a channel identifier while resolving it to simpson as the channel name.
The same obviously applies for PayloadTypeRouter
, where you can now remap or remove a particular payload type mapping.
In fact, it applies to every other router, including expression-based routers, since their computed values now have a chance to go through the second step to be resolved to the actual channel name
.
Any router that is a subclass of the AbstractMappingMessageRouter
(which includes most framework-defined routers) is a dynamic router, because the channelMapping
is defined at the AbstractMappingMessageRouter
level.
That map’s setter method is exposed as a public method along with the setChannelMapping and removeChannelMapping methods.
These let you change, add, and remove router mappings at runtime, as long as you have a reference to the router itself.
It also means that you could expose these same configuration options through JMX (see Section 12.2, “JMX Support”) or the Spring Integration control bus (see Section 12.6, “Control Bus”) functionality.
One way to manage the router mappings is through the control bus pattern, which exposes a control channel to which you can send control messages to manage and monitor Spring Integration components, including routers.
Note | |
---|---|
For more information about the control bus, see Section 12.6, “Control Bus”. |
Typically, you would send a control message asking to invoke a particular operation on a particular managed component (such as a router). The following managed operations (methods) are specific to changing the router resolution process:
public void setChannelMapping(String key, String channelName)
: Lets you add a new or modify an existing mapping between channel identifier
and channel name
public void removeChannelMapping(String key)
: Lets you remove a particular channel mapping, thus disconnecting the relationship between channel identifier
and channel name
Note that these methods can be used for simple changes (such as updating a single route or adding or removing a route). However, if you want to remove one route and add another, the updates are not atomic. This means that the routing table may be in an indeterminate state between the updates. Starting with version 4.0, you can now use the control bus to update the entire routing table atomically. The following methods let you do so:
public Map<String, String>getChannelMappings()
: Returns the current mappings.
public void replaceChannelMappings(Properties channelMappings)
: Updates the mappings.
Note that the channelMappings
parameter is a Properties
object.
This arrangement lets a control bus command use the built-in StringToPropertiesConverter
, as the following example shows:
"@'router.handler'.replaceChannelMappings('foo=qux \n baz=bar')"
Note that each mapping is separated by a newline character (\n
).
For programmatic changes to the map, we recommend that you use the setChannelMappings
method, due to type-safety concerns.
replaceChannelMappings
ignores keys or values that are not String
objects.
You can also use Spring’s JMX support to expose a router instance and then use your favorite JMX client (for example, JConsole) to manage those operations (methods) for changing the router’s configuration.
Note | |
---|---|
For more information about Spring Integration’s JMX support, see Section 12.2, “JMX Support”. |
Starting with version 4.1, Spring Integration provides an implementation of the routing slip enterprise integration pattern.
It is implemented as a routingSlip
message header, which is used to determine the next channel in AbstractMessageProducingHandler
instances, when an outputChannel
is not specified for the endpoint.
This pattern is useful in complex, dynamic cases, when it can become difficult to configure multiple routers to determine message flow.
When a message arrives at an endpoint that has no output-channel
, the routingSlip
is consulted to determine the next channel to which the message is sent.
When the routing slip is exhausted, normal replyChannel
processing resumes.
Configuration for the routing slip is presented as a HeaderEnricher
option — a semicolon-separated routing slip that contains path
entries, as the following example shows:
<util:properties id="properties"> <beans:prop key="myRoutePath1">channel1</beans:prop> <beans:prop key="myRoutePath2">request.headers[myRoutingSlipChannel]</beans:prop> </util:properties> <context:property-placeholder properties-ref="properties"/> <header-enricher input-channel="input" output-channel="process"> <routing-slip value="${myRoutePath1}; @routingSlipRoutingPojo.get(request, reply); routingSlipRoutingStrategy; ${myRoutePath2}; finishChannel"/> </header-enricher>
The preceding example has:
<context:property-placeholder>
configuration to demonstrate that the entries in the routing slip path
can be specified as resolvable keys.
<header-enricher>
<routing-slip>
sub-element is used to populate the RoutingSlipHeaderValueMessageProcessor
to the HeaderEnricher
handler.
RoutingSlipHeaderValueMessageProcessor
accepts a String
array of resolved routing slip path
entries and returns (from processMessage()
) a singletonMap
with the path
as key
and 0
as initial routingSlipIndex
.
Routing Slip path
entries can contain MessageChannel
bean names, RoutingSlipRouteStrategy
bean names, and Spring expressions (SpEL).
The RoutingSlipHeaderValueMessageProcessor
checks each routing slip path
entry against the BeanFactory
on the first processMessage
invocation.
It converts entries (which are not bean names in the application context) to ExpressionEvaluatingRoutingSlipRouteStrategy
instances.
RoutingSlipRouteStrategy
entries are invoked multiple times, until they return null or an empty String
.
Since the routing slip is involved in the getOutputChannel
process, we have a request-reply context.
The RoutingSlipRouteStrategy
has been introduced to determine the next outputChannel
that uses the requestMessage
and the reply
object.
An implementation of this strategy should be registered as a bean in the application context, and its bean name is used in the routing slip path
.
The ExpressionEvaluatingRoutingSlipRouteStrategy
implementation is provided.
It accepts a SpEL expression and an internal ExpressionEvaluatingRoutingSlipRouteStrategy.RequestAndReply
object is used as the root object of the evaluation context.
This is to avoid the overhead of EvaluationContext
creation for each ExpressionEvaluatingRoutingSlipRouteStrategy.getNextPath()
invocation.
It is a simple Java bean with two properties: Message<?> request
and Object reply
.
With this expression implementation, we can specify routing slip path
entries by using SpEL (for example, @routingSlipRoutingPojo.get(request, reply)
and request.headers[myRoutingSlipChannel]
) and avoid defining a bean for the RoutingSlipRouteStrategy
.
Note | |
---|---|
The |
Important | |
---|---|
If a routing slip is involved in a distributed environment, we recommend not using inline expressions for the Routing Slip |
For Java configuration, you can add a RoutingSlipHeaderValueMessageProcessor
instance to the HeaderEnricher
bean definition, as the following example shows:
@Bean @Transformer(inputChannel = "routingSlipHeaderChannel") public HeaderEnricher headerEnricher() { return new HeaderEnricher(Collections.singletonMap(IntegrationMessageHeaderAccessor.ROUTING_SLIP, new RoutingSlipHeaderValueMessageProcessor("myRoutePath1", "@routingSlipRoutingPojo.get(request, reply)", "routingSlipRoutingStrategy", "request.headers[myRoutingSlipChannel]", "finishChannel"))); }
The routing slip algorithm works as follows when an endpoint produces a reply and no outputChannel
has been defined:
routingSlipIndex
is used to get a value from the routing slip path
list.
routingSlipIndex
is String
, it is used to get a bean from BeanFactory
.
MessageChannel
, it is used as the next outputChannel
and the routingSlipIndex
is incremented in the reply message header (the routing slip path
entries remain unchanged).
RoutingSlipRouteStrategy
and its getNextPath
does not return an empty String
, that result is used as a bean name for the next outputChannel
.
The routingSlipIndex
remains unchanged.
RoutingSlipRouteStrategy.getNextPath
returns an empty String
, the routingSlipIndex
is incremented and the getOutputChannelFromRoutingSlip
is invoked recursively for the next Routing Slip path
item.
path
entry is not a String
, it must be an instance of RoutingSlipRouteStrategy
.
routingSlipIndex
exceeds the size of the routing slip path
list, the algorithm moves to the default behavior for the standard replyChannel
header.
Enterprise integration patterns include the process manager pattern.
You can now easily implement this pattern by using custom process manager logic encapsulated in a RoutingSlipRouteStrategy
within the routing slip.
In addition to a bean name, the RoutingSlipRouteStrategy
can return any MessageChannel
object, and there is no requirement that this MessageChannel
instance be a bean in the application context.
This way, we can provide powerful dynamic routing logic when there is no way to predict which channel should be used.
A MessageChannel
can be created within the RoutingSlipRouteStrategy
and returned.
A FixedSubscriberChannel
with an associated MessageHandler
implementation is a good combination for such cases.
For example, you can route to a reactor stream, as the following example shows:
@Bean public PollableChannel resultsChannel() { return new QueueChannel(); } @Bean public RoutingSlipRouteStrategy routeStrategy() { return (requestMessage, reply) -> requestMessage.getPayload() instanceof String ? new FixedSubscriberChannel(m -> Mono.just((String) m.getPayload()) .map(String::toUpperCase) .subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v))) : new FixedSubscriberChannel(m -> Mono.just((Integer) m.getPayload()) .map(v -> v * 2) .subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v))); }
Message filters are used to decide whether a Message
should be passed along or dropped based on some criteria, such as a message header value or message content itself.
Therefore, a message filter is similar to a router, except that, for each message received from the filter’s input channel, that same message may or may not be sent to the filter’s output channel.
Unlike the router, it makes no decision regarding which message channel to send the message to but decides only whether to send the message at all.
Note | |
---|---|
As we describe later in this section, the filter also supports a discard channel.
In certain cases, it can play the role of a very simple router (or " |
In Spring Integration, you can configure a message filter as a message endpoint that delegates to an implementation of the MessageSelector
interface.
That interface is itself quite simple, as the following listing shows:
public interface MessageSelector { boolean accept(Message<?> message); }
The MessageFilter
constructor accepts a selector instance, as the following example shows:
MessageFilter filter = new MessageFilter(someSelector);
In combination with the namespace and SpEL, you can configure powerful filters with very little Java code.
You can use the <filter>
element is used to create a message-selecting endpoint.
In addition to input-channel
and output-channel
attributes, it requires a ref
attribute.
The ref
can point to a MessageSelector
implementation, as the following example shows:
<int:filter input-channel="input" ref="selector" output-channel="output"/> <bean id="selector" class="example.MessageSelectorImpl"/>
Alternatively, you can add the method
attribute.
In that case, the ref
attribute may refer to any object.
The referenced method may expect either the Message
type or the payload type of inbound messages.
The method must return a boolean value.
If the method returns true, the message is sent to the output channel.
The following example shows how to configure a filter that uses the method
attribute:
<int:filter input-channel="input" output-channel="output" ref="exampleObject" method="someBooleanReturningMethod"/> <bean id="exampleObject" class="example.SomeObject"/>
If the selector or adapted POJO method returns false
, a few settings control the handling of the rejected message.
By default (if configured as in the preceding example), rejected messages are silently dropped.
If rejection should instead result in an error condition, set the throw-exception-on-rejection
attribute to true
, as the following example shows:
<int:filter input-channel="input" ref="selector" output-channel="output" throw-exception-on-rejection="true"/>
If you want rejected messages to be routed to a specific channel, provide that reference as the discard-channel
, as the following example shows:
<int:filter input-channel="input" ref="selector" output-channel="output" discard-channel="rejectedMessages"/>
See also Section 10.9.6, “Advising Filters”.
Note | |
---|---|
Message filters are commonly used in conjunction with a publish-subscribe channel. Many filter endpoints may be subscribed to the same channel, and they decide whether or not to pass the message to the next endpoint, which could be any of the supported types (such as a service activator). This provides a reactive alternative to the more proactive approach of using a message router with a single point-to-point input channel and multiple output channels. |
We recommend using a ref
attribute if the custom filter implementation is referenced in other <filter>
definitions.
However, if the custom filter implementation is scoped to a single <filter>
element, you should provide an inner bean definition, as the following example shows:
<int:filter method="someMethod" input-channel="inChannel" output-channel="outChannel"> <beans:bean class="org.foo.MyCustomFilter"/> </filter>
Note | |
---|---|
Using both the |
Important | |
---|---|
If the |
With the introduction of SpEL support, Spring Integration added the expression
attribute to the filter element.
It can be used to avoid Java entirely for simple filters, as the following example shows:
<int:filter input-channel="input" expression="payload.equals('nonsense')"/>
The string passed as the value of the expression attribute is evaluated as a SpEL expression with the message available in the evaluation context.
If you must include the result of an expression in the scope of the application context, you can use the #{}
notation, as defined in the SpEL reference documentation, as the following example shows:
<int:filter input-channel="input" expression="payload.matches(#{filterPatterns.nonsensePattern})"/>
If the expression itself needs to be dynamic, you can use an expression sub-element.
That provides a level of indirection for resolving the expression by its key from an ExpressionSource
.
That is a strategy interface that you can implement directly, or you can rely upon a version available in Spring Integration that loads expressions from a "resource bundle
" and can check for modifications after a given number of seconds.
All of this is demonstrated in the following configuration example, where the expression could be reloaded within one minute if the underlying file had been modified:
<int:filter input-channel="input" output-channel="output"> <int:expression key="filterPatterns.example" source="myExpressions"/> </int:filter> <beans:bean id="myExpressions" id="myExpressions" class="o.s.i.expression.ReloadableResourceBundleExpressionSource"> <beans:property name="basename" value="config/integration/expressions"/> <beans:property name="cacheSeconds" value="60"/> </beans:bean>
If the ExpressionSource
bean is named expressionSource
, you need not provide the` source` attribute on the <expression>
element. However, in the preceding example, we show it for completeness.
The config/integration/expressions.properties file (or any more-specific version with a locale extension to be resolved in the typical way that resource-bundles are loaded) can contain a key/value pair, as the following example shows:
filterPatterns.example=payload > 100
Note | |
---|---|
All of these examples that use |
The following example shows how to configure a filter by using annotations:
public class PetFilter { ... @Filter public boolean dogsOnly(String input) { ... } }
An annotation indicating that this method is to be used as a filter. It must be specified if this class is to be used as a filter. |
All of the configuration options provided by the XML element are also available for the @Filter
annotation.
The filter can be either referenced explicitly from XML or, if the @MessageEndpoint
annotation is defined on the class, detected automatically through classpath scanning.
See also Section 10.9.7, “Advising Endpoints Using Annotations”.
The splitter is a component whose role is to partition a message into several parts and send the resulting messages to be processed independently. Very often, they are upstream producers in a pipeline that includes an aggregator.
The API for performing splitting consists of one base class, AbstractMessageSplitter
.
It is a MessageHandler
implementation that encapsulates features common to splitters, such as filling in the appropriate message headers (CORRELATION_ID
, SEQUENCE_SIZE
, and SEQUENCE_NUMBER
) on the messages that are produced.
This filling enables tracking down the messages and the results of their processing (in a typical scenario, these headers get copied to the messages that are produced by the various transforming endpoints).
The values can then be used, for example, by a composed message processor.
The following example shows an excerpt from AbstractMessageSplitter
:
public abstract class AbstractMessageSplitter extends AbstractReplyProducingMessageConsumer { ... protected abstract Object splitMessage(Message<?> message); }
To implement a specific splitter in an application, you can extend AbstractMessageSplitter
and implement the splitMessage
method, which contains logic for splitting the messages.
The return value can be one of the following:
Collection
or an array of messages or an Iterable
(or Iterator
) that iterates over messages.
In this case, the messages are sent as messages (after the CORRELATION_ID
, SEQUENCE_SIZE
and SEQUENCE_NUMBER
are populated).
Using this approach gives you more control — for example, to populate custom message headers as part of the splitting process.
Collection
or an array of non-message objects or an Iterable
(or Iterator
) that iterates over non-message objects.
It works like the prior case, except that each collection element is used as a message payload.
Using this approach lets you focus on the domain objects without having to consider the messaging system and produces code that is easier to test.
Message
or non-message object (but not a collection or an array).
It works like the previous cases, except that a single message is sent out.
In Spring Integration, any POJO can implement the splitting algorithm, provided that it defines a method that accepts a single argument and has a return value.
In this case, the return value of the method is interpreted as described earlier.
The input argument might either be a Message
or a simple POJO.
In the latter case, the splitter receives the payload of the incoming message.
We recommend this approach, because it decouples the code from the Spring Integration API and is typically easier to test.
Starting with version 4.1, the AbstractMessageSplitter
supports the Iterator
type for the value
to split.
Note, in the case of an Iterator
(or Iterable
), we don’t have access to the number of underlying items and the SEQUENCE_SIZE
header is set to 0
.
This means that the default SequenceSizeReleaseStrategy
of an <aggregator>
won’t work and the group for the CORRELATION_ID
from the splitter
won’t be released; it will remain as incomplete
.
In this case you should use an appropriate custom ReleaseStrategy
or rely on send-partial-result-on-expiry
together with group-timeout
or a MessageGroupStoreReaper
.
Starting with version 5.0, the AbstractMessageSplitter
provides protected obtainSizeIfPossible()
methods to allow the determination of the size of the Iterable
and Iterator
objects if that is possible.
For example XPathMessageSplitter
can determine the size of the underlying NodeList
object.
And starting with version 5.0.9, this method also properly returns a size of the com.fasterxml.jackson.core.TreeNode
.
An Iterator
object is useful to avoid the need for building an entire collection in the memory before splitting.
For example, when underlying items are populated from some external system (e.g. DataBase or FTP MGET
) using iterations or streams.
Starting with version 5.0, the AbstractMessageSplitter
supports the Java Stream
and Reactive Streams Publisher
types for the value
to split.
In this case, the target Iterator
is built on their iteration functionality.
In addition, if the splitter’s output channel is an instance of a ReactiveStreamsSubscribableChannel
, the AbstractMessageSplitter
produces a Flux
result instead of an Iterator
, and the output channel is subscribed to this Flux
for back-pressure-based splitting on downstream flow demand.
A splitter can be configured through XML as follows:
<int:channel id="inputChannel"/> <int:splitter id="splitter" ref="splitterBean" method="split" input-channel="inputChannel" output-channel="outputChannel" /> <int:channel id="outputChannel"/> <beans:bean id="splitterBean" class="sample.PojoSplitter"/>
The ID of the splitter is optional. | |
A reference to a bean defined in the application context.
The bean must implement the splitting logic, as described in the earlier section.
Optional.
If a reference to a bean is not provided, it is assumed that the payload of the message that arrived on the | |
The method (defined on the bean) that implements the splitting logic. Optional. | |
The input channel of the splitter. Required. | |
The channel to which the splitter sends the results of splitting the incoming message. Optional (because incoming messages can specify a reply channel themselves). |
We recommend using a ref
attribute if the custom splitter implementation can be referenced in other <splitter>
definitions.
However if the custom splitter handler implementation should be scoped to a single definition of the <splitter>
, you can configure an inner bean definition, as the following example follows:
<int:splitter id="testSplitter" input-channel="inChannel" method="split" output-channel="outChannel"> <beans:bean class="org.foo.TestSplitter"/> </int:splitter>
Note | |
---|---|
Using both a |
Important | |
---|---|
If the |
The @Splitter
annotation is applicable to methods that expect either the Message
type or the message payload type, and the return values of the method should be a Collection
of any type.
If the returned values are not actual Message
objects, each item is wrapped in a Message
as the payload of the Message
.
Each resulting Message
is sent to the designated output channel for the endpoint on which the @Splitter
is defined.
The following example shows how to configure a splitter by using the @Splitter
annotation:
@Splitter List<LineItem> extractItems(Order order) { return order.getItems() }
See also Section 10.9.7, “Advising Endpoints Using Annotations”.
See also Section 11.8, “Splitters” in the Java DSL chapter.
Basically a mirror-image of the splitter, the aggregator is a type of message handler that receives multiple messages and combines them into a single message. In fact, an aggregator is often a downstream consumer in a pipeline that includes a splitter.
Technically, the aggregator is more complex than a splitter, because it is stateful.
It must hold the messages to be aggregated and determine when the complete group of messages is ready to be aggregated.
In order to do so, it requires a MessageStore
.
The Aggregator combines a group of related messages, by correlating and storing them, until the group is deemed to be complete. At that point, the aggregator creates a single message by processing the whole group and sends the aggregated message as output.
Implementing an aggregator requires providing the logic to perform the aggregation (that is, the creation of a single message from many). Two related concepts are correlation and release.
Correlation determines how messages are grouped for aggregation.
In Spring Integration, correlation is done by default, based on the IntegrationMessageHeaderAccessor.CORRELATION_ID
message header.
Messages with the same IntegrationMessageHeaderAccessor.CORRELATION_ID
are grouped together.
However, you can customize the correlation strategy to allow other ways of specifying how the messages should be grouped together.
To do so, you can implement a CorrelationStrategy
(covered later in this chapter).
To determine the point at which a group of messages is ready to be processed, a ReleaseStrategy
is consulted.
The default release strategy for the aggregator releases a group when all messages included in a sequence are present, based on the IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
header.
You can override this default strategy by providing a reference to a custom ReleaseStrategy
implementation.
The Aggregation API consists of a number of classes:
MessageGroupProcessor
, and its subclasses: MethodInvokingAggregatingMessageGroupProcessor
and ExpressionEvaluatingMessageGroupProcessor
ReleaseStrategy
interface and its default implementation: SimpleSequenceSizeReleaseStrategy
CorrelationStrategy
interface and its default implementation: HeaderAttributeCorrelationStrategy
The AggregatingMessageHandler
(a subclass of AbstractCorrelatingMessageHandler
) is a MessageHandler
implementation, encapsulating the common functionality of an aggregator (and other correlating use cases), which are as follows:
MessageStore
until the group can be released
The responsibility for deciding how the messages should be grouped together is delegated to a CorrelationStrategy
instance.
The responsibility for deciding whether the message group can be released is delegated to a ReleaseStrategy
instance.
The following listing shows a brief highlight of the base AbstractAggregatingMessageGroupProcessor
(the responsibility for implementing the aggregatePayloads
method is left to the developer):
public abstract class AbstractAggregatingMessageGroupProcessor implements MessageGroupProcessor { protected Map<String, Object> aggregateHeaders(MessageGroup group) { // default implementation exists } protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders); }
The CorrelationStrategy
is owned by the AbstractCorrelatingMessageHandler
and has a default value based on the IntegrationMessageHeaderAccessor.CORRELATION_ID
message header, as the following example shows:
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) { ... this.correlationStrategy = correlationStrategy == null ? new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy; this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy; ... }
As for the actual processing of the message group, the default implementation is the DefaultAggregatingMessageGroupProcessor
.
It creates a single Message
whose payload is a List
of the payloads received for a given group.
This works well for simple scatter-gather implementations with a splitter, a publish-subscribe channel, or a recipient list router upstream.
Note | |
---|---|
When using a publish-subscribe channel or a recipient list router in this type of scenario, be sure to enable the |
When implementing a specific aggregator strategy for an application, you can extend AbstractAggregatingMessageGroupProcessor
and implement the aggregatePayloads
method.
However, there are better solutions, less coupled to the API, for implementing the aggregation logic, which can be configured either through XML or through annotations.
In general, any POJO can implement the aggregation algorithm if it provides a method that accepts a single java.util.List
as an argument (parameterized lists are supported as well).
This method is invoked for aggregating messages as follows:
java.util.Collection<T>
and the parameter type T is assignable to Message
, the whole list of messages accumulated for aggregation is sent to the aggregator.
java.util.Collection
or the parameter type is not assignable to Message
, the method receives the payloads of the accumulated messages.
Message
, it is treated as the payload for a Message
that is automatically created by the framework.
Note | |
---|---|
In the interest of code simplicity and promoting best practices such as low coupling, testability, and others, the preferred way of implementing the aggregation logic is through a POJO and using the XML or annotation support for configuring it in the application. |
Starting with version 5.1, after processing message group, an AbstractCorrelatingMessageHandler
performs a MessageBuilder.popSequenceDetails()
message headers modification for the proper splitter-aggregator scenario with several nested levels.
It is done only if the message group release result is not a message or collection of messages.
In that case a target MessageGroupProcessor
is responsible for the MessageBuilder.popSequenceDetails()
call while building those messages.
This functionality can be controlled by a new popSequence
boolean
property, so the MessageBuilder.popSequenceDetails()
can be disabled in some scenarios when correlation details have not been populated by the standard splitter.
This property, essentially, undoes what has been done by the nearest upstream applySequence = true
in the AbstractMessageSplitter
.
See Section 8.3, “Splitter” for more information.
If the processMessageGroup
method of the MessageGroupProcessor
returns a collection, it must be a collection of Message<?>
objects.
In this case, the messages are individually released.
Prior to version 4.2, it was not possible to provide a MessageGroupProcessor
by using XML configuration.
Only POJO methods could be used for aggregation.
Now, if the framework detects that the referenced (or inner) bean implements MessageProcessor
, it is used as the aggregator’s output processor.
If you wish to release a collection of objects from a custom MessageGroupProcessor
as the payload of a message, your class should extend AbstractAggregatingMessageGroupProcessor
and implement aggregatePayloads()
.
Also, since version 4.2, a SimpleMessageGroupProcessor
is provided.
It returns the collection of messages from the group, which, as indicated earlier, causes the released messages to be sent individually.
This lets the aggregator work as a message barrier, where arriving messages are held until the release strategy fires and the group is released as a sequence of individual messages.
The ReleaseStrategy
interface is defined as follows:
public interface ReleaseStrategy { boolean canRelease(MessageGroup group); }
In general, any POJO can implement the completion decision logic if it provides a method that accepts a single java.util.List
as an argument (parameterized lists are supported as well) and returns a boolean value.
This method is invoked after the arrival of each new message, to decide whether the group is complete or not, as follows:
java.util.List<T>
and the parameter type T
is assignable to Message
, the whole list of messages accumulated in the group is sent to the method.
java.util.List
or the parameter type is not assignable to Message
, the method receives the payloads of the accumulated messages.
true
if the message group is ready for aggregation or false otherwise.
The following example shows how to use the @ReleaseStrategy
annotation for a List
of type Message
:
public class MyReleaseStrategy { @ReleaseStrategy public boolean canMessagesBeReleased(List<Message<?>>) {...} }
The following example shows how to use the @ReleaseStrategy
annotation for a List
of type String
:
public class MyReleaseStrategy { @ReleaseStrategy public boolean canMessagesBeReleased(List<String>) {...} }
Based on the signatures in the preceding two examples, the POJO-based release strategy is passed a Collection
of not-yet-released messages (if you need access to the whole Message
) or a Collection
of payload objects (if the type parameter is anything other than Message
).
This satisfies the majority of use cases.
However if, for some reason, you need to access the full MessageGroup
, you should provide an implementation of the ReleaseStrategy
interface.
Warning | |
---|---|
When handling potentially large groups, you should understand how these methods are invoked, because the release strategy may be invoked multiple times before the group is released.
The most efficient is an implementation of For these reasons, for large groups, we recommended that you implement |
When the group is released for aggregation, all its not-yet-released messages are processed and removed from the group.
If the group is also complete (that is, if all messages from a sequence have arrived or if there is no sequence defined), then the group is marked as complete.
Any new messages for this group are sent to the discard channel (if defined).
Setting expire-groups-upon-completion
to true
(the default is false
) removes the entire group, and any new messages (with the same correlation ID as the removed group) form a new group.
You can release partial sequences by using a MessageGroupStoreReaper
together with send-partial-result-on-expiry
being set to true
.
Important | |
---|---|
To facilitate discarding of late-arriving messages, the aggregator must maintain state about the group after it has been released.
This can eventually cause out-of-memory conditions.
To avoid such situations, you should consider configuring a |
Spring Integration provides an implementation for ReleaseStrategy
: SimpleSequenceSizeReleaseStrategy
.
This implementation consults the SEQUENCE_NUMBER
and SEQUENCE_SIZE
headers of each arriving message to decide when a message group is complete and ready to be aggregated.
As shown earlier, it is also the default strategy.
Note | |
---|---|
Before version 5.0, the default release strategy was |
If you are aggregating large groups, you don’t need to release partial groups, and you don’t need to detect/reject duplicate sequences, consider using the SimpleSequenceSizeReleaseStrategy
instead - it is much more efficient for these use cases, and is the default since version 5.0 when partial group release is not specified.
The 4.3 release changed the default Collection
for messages in a SimpleMessageGroup
to HashSet
(it was previously a BlockingQueue
).
This was expensive when removing individual messages from large groups (an O(n) linear scan was required).
Although the hash set is generally much faster to remove, it can be expensive for large messages, because the hash has to be calculated on both inserts and removes.
If you have messages that are expensive to hash, consider using some other collection type.
As discussed in Section 12.4.1, “Using MessageGroupFactory
”, a SimpleMessageGroupFactory
is provided so that you can select the Collection
that best suits your needs.
You can also provide your own factory implementation to create some other Collection<Message<?>>
.
The following example shows how to configure an aggregator with the previous implementation and a SimpleSequenceSizeReleaseStrategy
:
<int:aggregator input-channel="aggregate" output-channel="out" message-store="store" release-strategy="releaser" /> <bean id="store" class="org.springframework.integration.store.SimpleMessageStore"> <property name="messageGroupFactory"> <bean class="org.springframework.integration.store.SimpleMessageGroupFactory"> <constructor-arg value="BLOCKING_QUEUE"/> </bean> </property> </bean> <bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
The CorrelationStrategy
interface is defined as follows:
public interface CorrelationStrategy { Object getCorrelationKey(Message<?> message); }
The method returns an Object
that represents the correlation key used for associating the message with a message group.
The key must satisfy the criteria used for a key in a Map
with respect to the implementation of equals()
and hashCode()
.
In general, any POJO can implement the correlation logic, and the rules for mapping a message to a method’s argument (or arguments) are the same as for a ServiceActivator
(including support for @Header
annotations).
The method must return a value, and the value must not be null
.
Spring Integration provides an implementation for CorrelationStrategy
: HeaderAttributeCorrelationStrategy
.
This implementation returns the value of one of the message headers (whose name is specified by a constructor argument) as the correlation key.
By default, the correlation strategy is a HeaderAttributeCorrelationStrategy
that returns the value of the CORRELATION_ID
header attribute.
If you have a custom header name you would like to use for correlation, you can configure it on an instance of HeaderAttributeCorrelationStrategy
and provide that as a reference for the aggregator’s correlation strategy.
Changes to groups are thread safe.
So, when you send messages for the same correlation ID concurrently, only one of them will be processed in the aggregator, making it effectively as a single-threaded per message group.
A LockRegistry
is used to obtain a lock for the resolved correlation ID.
A DefaultLockRegistry
is used by default (in-memory).
For synchronizing updates across servers where a shared MessageGroupStore
is being used, you must configure a shared lock registry.
As discussed above, when message groups are mutated (messages added or released) a lock is held.
Consider the following flow:
...->aggregator1-> ... ->aggregator2-> ...
If there are multiple threads, and the aggregators share a common lock registry, it is possible to get a deadlock.
This will cause hung threads and jstack <pid>
might present a result such as:
Found one Java-level deadlock: ============================= "t2": waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync), which is held by "t1" "t1": waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync), which is held by "t2"
There are several ways to avoid this problem:
ExecutorChannel
or QueueChannel
as the output channel of the aggregator so that the downstream flow runs on a new thread
releaseLockBeforeSend
aggregator property to true
Note | |
---|---|
This problem can also be caused if, for some reason, the output of a single aggregator is eventually routed back to the same aggregator. Of course, the first solution above does not apply in this case. |
See Section 11.9, “Aggregators and Resequencers” for how to configure an aggregator in Java DSL.
Spring Integration supports the configuration of an aggregator with XML through the <aggregator/>
element.
The following example shows an example of an aggregator:
<channel id="inputChannel"/> <int:aggregator id="myAggregator" auto-startup="true" input-channel="inputChannel" output-channel="outputChannel" discard-channel="throwAwayChannel" message-store="persistentMessageStore" order="1" send-partial-result-on-expiry="false" send-timeout="1000" correlation-strategy="correlationStrategyBean" correlation-strategy-method="correlate" correlation-strategy-expression="headers['foo']" ref="aggregatorBean" method="aggregate" release-strategy="releaseStrategyBean" release-strategy-method="release" (16) release-strategy-expression="size() == 5" (17) expire-groups-upon-completion="false" (18) empty-group-min-timeout="60000" (19) lock-registry="lockRegistry" (20) group-timeout="60000" (21) group-timeout-expression="size() ge 2 ? 100 : -1" (22) expire-groups-upon-timeout="true" (23) scheduler="taskScheduler" > (24) <expire-transactional/> (25) <expire-advice-chain/> (26) </aggregator> <int:channel id="outputChannel"/> <int:channel id="throwAwayChannel"/> <bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore"> <constructor-arg ref="dataSource"/> </bean> <bean id="aggregatorBean" class="sample.PojoAggregator"/> <bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/> <bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
The id of the aggregator is optional. | |
Lifecycle attribute signaling whether the aggregator should be started during application context startup. Optional (the default is true). | |
The channel from which where aggregator receives messages. Required. | |
The channel to which the aggregator sends the aggregation results. Optional (because incoming messages can themselves specify a reply channel in the replyChannel message header). | |
The channel to which the aggregator sends the messages that timed out (if | |
A reference to a | |
The order of this aggregator when more than one handle is subscribed to the same | |
Indicates that expired messages should be aggregated and sent to the output-channel or replyChannel once their containing | |
The timeout interval to wait when sending a reply | |
A reference to a bean that implements the message correlation (grouping) algorithm.
The bean can be an implementation of the | |
A method defined on the bean referenced by | |
A SpEL expression representing the correlation strategy.
Example: | |
A reference to a bean defined in the application context. The bean must implement the aggregation logic, as described earlier. Optional (by default, the list of aggregated messages becomes a payload of the output message). | |
A method defined on the bean referenced by the | |
A reference to a bean that implements the release strategy.
The bean can be an implementation of the | |
A method defined on the bean referenced by the | |
A SpEL expression representing the release strategy.
The root object for the expression is a | |
When set to | |
Applies only if a | |
A reference to a | |
A timeout (in milliseconds) to force the | |
The SpEL expression that evaluates to a | |
When a group is completed due to a timeout (or by a | |
A | |
Since version 4.1.
It lets a transaction be started for the | |
Since version 4.1.
It allows the configuration of any |
Expiring Groups | |||
---|---|---|---|
There are two attributes related to expiring (completely removing) groups.
When a group is expired, there is no record of it, and, if a new message arrives with the same correlation, a new group is started.
When a group is completed (without expiry), the empty group remains and late-arriving messages are discarded.
Empty groups can be removed later by using a
If a group is not completed normally but is released or discarded because of a timeout, the group is normally expired.
Since version 4.1, you can control this behavior by using
Since version 5.0, empty groups are also scheduled for removal after |
We generally recommend using a ref
attribute if a custom aggregator handler implementation may be referenced in other <aggregator>
definitions.
However, if a custom aggregator implementation is only being used by a single definition of the <aggregator>
, you can use an inner bean definition (starting with version 1.0.3) to configure the aggregation POJO within the <aggregator>
element, as the following example shows:
<aggregator input-channel="input" method="sum" output-channel="output"> <beans:bean class="org.foo.PojoAggregator"/> </aggregator>
Note | |
---|---|
Using both a |
The following example shows an implementation of the aggregator bean:
public class PojoAggregator { public Long add(List<Long> results) { long total = 0l; for (long partialResult: results) { total += partialResult; } return total; } }
An implementation of the completion strategy bean for the preceding example might be as follows:
public class PojoReleaseStrategy { ... public boolean canRelease(List<Long> numbers) { int sum = 0; for (long number: numbers) { sum += number; } return sum >= maxValue; } }
Note | |
---|---|
Wherever it makes sense to do so, the release strategy method and the aggregator method can be combined into a single bean. |
An implementation of the correlation strategy bean for the example above might be as follows:
public class PojoCorrelationStrategy { ... public Long groupNumbersByLastDigit(Long number) { return number % 10; } }
The aggregator in the preceding example would group numbers by some criterion (in this case, the remainder after dividing by ten) and hold the group until the sum of the numbers provided by the payloads exceeds a certain value.
Note | |
---|---|
Wherever it makes sense to do so, the release strategy method, the correlation strategy method, and the aggregator method can be combined in a single bean. (Actually, all of them or any two of them can be combined.) |
Since Spring Integration 2.0, you can handle the various strategies (correlation, release, and aggregation) with SpEL, which we recommend if the logic behind such a release strategy is relatively simple.
Suppose you have a legacy component that was designed to receive an array of objects.
We know that the default release strategy assembles all aggregated messages in the List
.
Now we have two problems.
First, we need to extract individual messages from the list. Second, we need to extract the payload of each message and assemble the array of objects.
The following example solves both problems:
public String[] processRelease(List<Message<String>> messages){ List<String> stringList = new ArrayList<String>(); for (Message<String> message : messages) { stringList.add(message.getPayload()); } return stringList.toArray(new String[]{}); }
However, with SpEL, such a requirement could actually be handled relatively easily with a one-line expression, thus sparing you from writing a custom class and configuring it as a bean. The following example shows how to do so:
<int:aggregator input-channel="aggChannel" output-channel="replyChannel" expression="#this.![payload].toArray()"/>
In the preceding configuration, we use a collection projection expression to assemble a new collection from the payloads of all the messages in the list and then transform it to an array, thus achieving the same result as the earlier Java code.
You can apply the same expression-based approach when dealing with custom release and correlation strategies.
Instead of defining a bean for a custom CorrelationStrategy
in the correlation-strategy
attribute, you can implement your simple correlation logic as a SpEL expression and configure it in the correlation-strategy-expression
attribute, as the following example shows:
correlation-strategy-expression="payload.person.id"
In the preceding example, we assume that the payload has a person
attribute with an id
, which is going to be used to correlate messages.
Likewise, for the ReleaseStrategy
, you can implement your release logic as a SpEL expression and configure it in the release-strategy-expression
attribute.
The root object for evaluation context is the MessageGroup
itself.
The List
of messages can be referenced by using the message
property of the group within the expression.
Note | |
---|---|
In releases prior to version 5.0, the root object was the collection of |
release-strategy-expression="!messages.?[payload==5].empty"
In the preceding example, the root object of the SpEL evaluation context is the MessageGroup
itself, and you are stating that, as soon as there is a message with payload of 5
in this group, the group should be released.
Starting with version 4.0, two new mutually exclusive attributes have been introduced: group-timeout
and group-timeout-expression
(see the earlier description).
See the section called “Configuring an Aggregator with XML”.
In some cases, you may need to emit the aggregator result (or discard the group) after a timeout if the ReleaseStrategy
does not release when the current message arrives.
For this purpose, the groupTimeout
option lets scheduling the MessageGroup
be forced to complete, as the following example shows:
<aggregator input-channel="input" output-channel="output" send-partial-result-on-expiry="true" group-timeout-expression="size() ge 2 ? 10000 : -1" release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
With this example, the normal release is possible if the aggregator receives the last message in sequence as defined by the release-strategy-expression
.
If that specific message does not arrive, the groupTimeout
forces the group to complete after ten seconds, as long as the group contains at least two Messages.
The results of forcing the group to complete depends on the ReleaseStrategy
and the send-partial-result-on-expiry
.
First, the release strategy is again consulted to see if a normal release is to be made.
While the group has not changed, the ReleaseStrategy
can decide to release the group at this time.
If the release strategy still does not release the group, it is expired.
If send-partial-result-on-expiry
is true
, existing messages in the (partial) MessageGroup
are released as a normal aggregator reply message to the output-channel
.
Otherwise, it is discarded.
There is a difference between groupTimeout
behavior and MessageGroupStoreReaper
(see the section called “Configuring an Aggregator with XML”).
The reaper initiates forced completion for all MessageGroup
s in the MessageGroupStore
periodically.
The groupTimeout
does it for each MessageGroup
individually if a new message does not arrive during the groupTimeout
.
Also, the reaper can be used to remove empty groups (empty groups are retained in order to discard late messages if expire-groups-upon-completion
is false).
The following example shows an aggregator configured with annotations:
public class Waiter { ... @Aggregator public Delivery aggregatingMethod(List<OrderItem> items) { ... } @ReleaseStrategy public boolean releaseChecker(List<Message<?>> messages) { ... } @CorrelationStrategy public String correlateBy(OrderItem item) { ... } }
An annotation indicating that this method should be used as an aggregator. It must be specified if this class is used as an aggregator. | |
An annotation indicating that this method is used as the release strategy of an aggregator.
If not present on any method, the aggregator uses the | |
An annotation indicating that this method should be used as the correlation strategy of an aggregator.
If no correlation strategy is indicated, the aggregator uses the |
All of the configuration options provided by the XML element are also available for the @Aggregator
annotation.
The aggregator can be either referenced explicitly from XML or, if the @MessageEndpoint
is defined on the class, detected automatically through classpath scanning.
Annotation configuration (@Aggregator
and others) for the Aggregator component covers only simple use cases, where most default options are sufficient.
If you need more control over those options when using annotation configuration, consider using a @Bean
definition for the AggregatingMessageHandler
and mark its @Bean
method with @ServiceActivator
, as the following example shows:
@ServiceActivator(inputChannel = "aggregatorChannel") @Bean public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) { AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(), jdbcMessageGroupStore); aggregator.setOutputChannel(resultsChannel()); aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L)); aggregator.setTaskScheduler(this.taskScheduler); return aggregator; }
See Section 8.4.2, “Programming Model” and Section E.6.1, “Annotations on @Bean
Methods” for more information.
Note | |
---|---|
Starting with version 4.2, the |
Aggregator (and some other patterns in Spring Integration) is a stateful pattern that requires decisions to be made based on a group of messages that have arrived over a period of time, all with the same correlation key.
The design of the interfaces in the stateful patterns (such as ReleaseStrategy
) is driven by the principle that the components (whether defined by the framework or by a user) should be able to remain stateless.
All state is carried by the MessageGroup
and its management is delegated to the MessageGroupStore
.
The MessageGroupStore
interface is defined as follows:
public interface MessageGroupStore { int getMessageCountForAllMessageGroups(); int getMarkedMessageCountForAllMessageGroups(); int getMessageGroupCount(); MessageGroup getMessageGroup(Object groupId); MessageGroup addMessageToGroup(Object groupId, Message<?> message); MessageGroup markMessageGroup(MessageGroup group); MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove); MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark); void removeMessageGroup(Object groupId); void registerMessageGroupExpiryCallback(MessageGroupCallback callback); int expireMessageGroups(long timeout); }
For more information, see the Javadoc.
The MessageGroupStore
accumulates state information in MessageGroups
while waiting for a release strategy to be triggered, and that event might not ever happen.
So, to prevent stale messages from lingering, and for volatile stores to provide a hook for cleaning up when the application shuts down, the MessageGroupStore
lets you register callbacks to apply to its MessageGroups
when they expire.
The interface is very straightforward, as the following listing shows:
public interface MessageGroupCallback { void execute(MessageGroupStore messageGroupStore, MessageGroup group); }
The callback has direct access to the store and the message group so that it can manage the persistent state (for example, by entirely removing the group from the store).
The MessageGroupStore
maintains a list of these callbacks, which it applies, on demand, to all messages whose timestamps are earlier than a time supplied as a parameter (see the registerMessageGroupExpiryCallback(..)
and expireMessageGroups(..)
methods, described earlier).
For more detail, see Section 8.4.4, “Managing State in an Aggregator: MessageGroupStore
”.
Important | |
---|---|
It is important not to use the same |
You can call the expireMessageGroups
method with a timeout value.
Any message older than the current time minus this value is expired and has the callbacks applied.
Thus, it is the user of the store that defines what is meant by message group "expiry
".
As a convenience for users, Spring Integration provides a wrapper for the message expiry in the form of a MessageGroupStoreReaper
, as the following example shows:
<bean id="reaper" class="org...MessageGroupStoreReaper"> <property name="messageGroupStore" ref="messageStore"/> <property name="timeout" value="30000"/> </bean> <task:scheduled-tasks scheduler="scheduler"> <task:scheduled ref="reaper" method="run" fixed-rate="10000"/> </task:scheduled-tasks>
The reaper is a Runnable
.
In the preceding example, the message group store’s expire method is called every ten seconds.
The timeout itself is 30 seconds.
Note | |
---|---|
It is important to understand that the timeout property of |
In addition to the reaper, the expiry callbacks are invoked when the application shuts down through a lifecycle callback in the AbstractCorrelatingMessageHandler
.
The AbstractCorrelatingMessageHandler
registers its own expiry callback, and this is the link with the boolean flag send-partial-result-on-expiry
in the XML configuration of the aggregator.
If the flag is set to true
, then, when the expiry callback is invoked, any unmarked messages in groups that are not yet released can be sent on to the output channel.
Important | |
---|---|
When a shared Some For more information about the |
The resequencer is related to the aggregator but serves a different purpose. While the aggregator combines messages, the resequencer passes messages through without changing them.
The resequencer works in a similar way to the aggregator, in the sense that it uses the CORRELATION_ID
to store messages in groups. he difference is that the Resequencer does not process the messages in any way.
Instead, it releases them in the order of their SEQUENCE_NUMBER
header values.
With respect to that, you can opt to release all messages at once (after the whole sequence, according to the SEQUENCE_SIZE
, and other possibilities) or as soon as a valid sequence is available. (We cover what we mean by "a valid sequence" later in this chapter.)
Important | |
---|---|
The resequencer is intended to resequence relatively short sequences of messages with small gaps. If you have a large number of disjoint sequences with many gaps, you may experience performance issues. |
See Section 11.9, “Aggregators and Resequencers” for configuring a resequencer in Java DSL.
Configuring a resequencer requires only including the appropriate element in XML.
The following example shows a resequencer configuration:
<int:channel id="inputChannel"/> <int:channel id="outputChannel"/> <int:resequencer id="completelyDefinedResequencer" input-channel="inputChannel" output-channel="outputChannel" discard-channel="discardChannel" release-partial-sequences="true" message-store="messageStore" send-partial-result-on-expiry="true" send-timeout="86420000" correlation-strategy="correlationStrategyBean" correlation-strategy-method="correlate" correlation-strategy-expression="headers['something']" release-strategy="releaseStrategyBean" release-strategy-method="release" release-strategy-expression="size() == 10" empty-group-min-timeout="60000" lock-registry="lockRegistry" (16) group-timeout="60000" (17) group-timeout-expression="size() ge 2 ? 100 : -1" (18) scheduler="taskScheduler" /> (19) expire-group-upon-timeout="false" /> (20)
The id of the resequencer is optional. | |
The input channel of the resequencer. Required. | |
The channel to which the resequencer sends the reordered messages. Optional. | |
The channel to which the resequencer sends the messages that timed out (if | |
Whether to send out ordered sequences as soon as they are available or only after the whole message group arrives.
Optional. (The default is | |
A reference to a | |
Whether, upon the expiration of the group, the ordered group should be sent out (even if some of the messages are missing).
Optional. (The default is false.)
See Section 8.4.4, “Managing State in an Aggregator: | |
The timeout interval to wait when sending a reply | |
A reference to a bean that implements the message correlation (grouping) algorithm.
The bean can be an implementation of the | |
A method that is defined on the bean referenced by | |
A SpEL expression representing the correlation strategy.
Example: | |
A reference to a bean that implements the release strategy.
The bean can be an implementation of the | |
A method that is defined on the bean referenced by | |
A SpEL expression representing the release strategy.
The root object for the expression is a | |
Only applies if a | |
See the section called “Configuring an Aggregator with XML”. | |
See the section called “Configuring an Aggregator with XML”. | |
See the section called “Configuring an Aggregator with XML”. | |
See the section called “Configuring an Aggregator with XML”. | |
By default, when a group is completed due to a timeout (or by a |
Note | |
---|---|
Since there is no custom behavior to be implemented in Java classes for resequencers, there is no annotation support for it. |
The MessageHandlerChain
is an implementation of MessageHandler
that can be configured as a single message endpoint while actually delegating to a chain of other handlers, such as filters, transformers, splitters, and so on.
When several handlers need to be connected in a fixed, linear progression, this can lead to a much simpler configuration.
For example, it is fairly common to provide a transformer before other components.
Similarly, when you provide a filter before some other component in a chain, you essentially create a selective consumer.
In either case, the chain requires only a single input-channel
and a single output-channel
, eliminating the need to define channels for each individual component.
Tip | |
---|---|
Spring Integration’s |
The handler chain simplifies configuration while internally maintaining the same degree of loose coupling between components, and it is trivial to modify the configuration if at some point a non-linear arrangement is required.
Internally, the chain is expanded into a linear setup of the listed endpoints, separated by anonymous channels.
The reply channel header is not taken into account within the chain.
Only after the last handler is invoked is the resulting message forwarded to the reply channel or the chain’s output channel.
Because of this setup, all handlers except the last must implement the MessageProducer
interface (which provides a setOutputChannel() method).
If the outputChannel
on the MessageHandlerChain
is set, the last handler needs only an output channel.
Note | |
---|---|
As with other endpoints, the |
In most cases, you need not implement MessageHandler
yourself.
The next section focuses on namespace support for the chain element.
Most Spring Integration endpoints, such as service activators and transformers, are suitable for use within a MessageHandlerChain
.
The <chain>
element provides an input-channel
attribute.
If the last element in the chain is capable of producing reply messages (optional), it also supports an output-channel
attribute.
The sub-elements are then filters, transformers, splitters, and service-activators.
The last element may also be a router or an outbound channel adapter.
The following example shows a chain definition:
<int:chain input-channel="input" output-channel="output"> <int:filter ref="someSelector" throw-exception-on-rejection="true"/> <int:header-enricher> <int:header name="thing1" value="thing2"/> </int:header-enricher> <int:service-activator ref="someService" method="someMethod"/> </int:chain>
The <header-enricher>
element used in the preceding example sets a message header named thing1
with a value of thing2
on the message.
A header enricher is a specialization of Transformer
that touches only header values.
You could obtain the same result by implementing a MessageHandler
that did the header modifications and wiring that as a bean, but the header-enricher is a simpler option.
The <chain>
can be configured as the last black-box consumer of the message flow.
For this solution, you can to put it at the end of the <chain> some <outbound-channel-adapter>, as the following example shows:
<int:chain input-channel="input"> <int-xml:marshalling-transformer marshaller="marshaller" result-type="StringResult" /> <int:service-activator ref="someService" method="someMethod"/> <int:header-enricher> <int:header name="thing1" value="thing2"/> </int:header-enricher> <int:logging-channel-adapter level="INFO" log-full-message="true"/> </int:chain>
Disallowed Attributes and Elements | |
---|---|
Certain attributes, such as For the Spring Integration core components, the XML schema itself enforces some of these constraints. However, for non-core components or your own custom components, these constraints are enforced by the XML namespace parser, not by the XML schema. These XML namespace parser constraints were added with Spring Integration 2.2.
If you try to use disallowed attributes and elements, the XML namespace parser throws a |
Beginning with Spring Integration 3.0, if a chain element is given an id
attribute, the bean name for the element is a combination of the chain’s id
and the id
of the element itself.
Elements without id
attributes are not registered as beans, but each one is given a componentName
that includes the chain id
.
Consider the following example:
<int:chain id="somethingChain" input-channel="input"> <int:service-activator id="somethingService" ref="someService" method="someMethod"/> <int:object-to-json-transformer/> </int:chain>
In the preceding example:
<chain>
root element has an id
of somethingChain.
Consequently, the AbstractEndpoint
implementation (PollingConsumer
or EventDrivenConsumer
, depending on the input-channel
type) bean takes this value as its bean name.
MessageHandlerChain
bean acquires a bean alias (somethingChain.handler), which allows direct access to this bean from the BeanFactory
.
<service-activator>
is not a fully fledged messaging endpoint (it is not a PollingConsumer
or EventDrivenConsumer
).
It is a MessageHandler
within the <chain>
.
In this case, the bean name registered with the BeanFactory
is somethingChain$child.somethingService.handler.
componentName
of this ServiceActivatingHandler
takes the same value but without the .handler suffix.
It becomes somethingChain$child.somethingService.
<chain>
sub-component, <object-to-json-transformer>
, does not have an id
attribute.
Its componentName
is based on its position in the <chain>
.
In this case, it is somethingChain$child#1.
(The final element of the name is the order within the chain, beginning with #0).
Note, this transformer is not registered as a bean within the application context, so it does not get a beanName
.
However its componentName
has a value that is useful for logging and other purposes.
The id
attribute for <chain>
elements lets them be eligible for JMX export, and they are trackable in the message history.
You can access them from the BeanFactory
by using the appropriate bean name, as discussed earlier.
Tip | |
---|---|
It is useful to provide an explicit |
Sometimes, you need to make a nested call to another chain from within a chain and then come back and continue execution within the original chain. To accomplish this, you can use a messaging gateway by including a <gateway> element, as the following example shows:
<int:chain id="main-chain" input-channel="in" output-channel="out"> <int:header-enricher> <int:header name="name" value="Many" /> </int:header-enricher> <int:service-activator> <bean class="org.foo.SampleService" /> </int:service-activator> <int:gateway request-channel="inputA"/> </int:chain> <int:chain id="nested-chain-a" input-channel="inputA"> <int:header-enricher> <int:header name="name" value="Moe" /> </int:header-enricher> <int:gateway request-channel="inputB"/> <int:service-activator> <bean class="org.foo.SampleService" /> </int:service-activator> </int:chain> <int:chain id="nested-chain-b" input-channel="inputB"> <int:header-enricher> <int:header name="name" value="Jack" /> </int:header-enricher> <int:service-activator> <bean class="org.foo.SampleService" /> </int:service-activator> </int:chain>
In the preceding example, nested-chain-a
is called at the end of main-chain
processing by the gateway element configured there.
While in nested-chain-a
, a call to a nested-chain-b
is made after header enrichment.
Then the flow comes back to finish execution in nested-chain-b
.
Finally, the flow returns to main-chain
.
When the nested version of a <gateway>
element is defined in the chain, it does not require the service-interface
attribute.
Instead, it takes the message in its current state and places it on the channel defined in the request-channel
attribute.
When the downstream flow initiated by that gateway completes, a Message
is returned to the gateway and continues its journey within the current chain.
Starting with version 4.1, Spring Integration provides an implementation of the scatter-gather enterprise integration pattern.
It is a compound endpoint for which the goal is to send a message to the recipients and aggregate the results.
As noted in Enterprise Integration Patterns, it is a component for scenarios such as "best quote
", where we need to request information from several suppliers and decide which one provides us with the best term for the requested item.
Previously, the pattern could be configured by using discrete components. This enhancement brings more convenient configuration.
The ScatterGatherHandler
is a request-reply endpoint that combines a PublishSubscribeChannel
(or a RecipientListRouter
) and an AggregatingMessageHandler
.
The request message is sent to the scatter
channel, and the ScatterGatherHandler
waits for the reply that the aggregator sends to the outputChannel
.
The Scatter-Gather
pattern suggests two scenarios: "auction
" and "distribution
".
In both cases, the aggregation
function is the same and provides all the options available for the AggregatingMessageHandler
.
(Actually, the ScatterGatherHandler
requires only an AggregatingMessageHandler
as a constructor argument.)
See Section 8.4, “Aggregator” for more information.
The auction Scatter-Gather
variant uses "publish-subscribe
" logic for the request message, where the "scatter
" channel is a PublishSubscribeChannel
with apply-sequence="true"
.
However, this channel can be any MessageChannel
implementation (as is the case with the request-channel
in the ContentEnricher
— see Section 9.2, “Content Enricher”).
However, in this case, you should create your own custom correlationStrategy
for the aggregation
function.
The distribution Scatter-Gather
variant is based on the RecipientListRouter
(see the section called “RecipientListRouter
”) with all available options for the RecipientListRouter
.
This is the second ScatterGatherHandler
constructor argument.
If you want to rely on only the default correlationStrategy
for the recipient-list-router
and the aggregator
, you should specify apply-sequence="true"
.
Otherwise, you should supply a custom correlationStrategy
for the aggregator
.
Unlike the PublishSubscribeChannel
variant (the auction variant), having a recipient-list-router
selector
option lets filter target suppliers based on the message.
With apply-sequence="true"
, the default sequenceSize
is supplied, and the aggregator
can release the group correctly.
The distribution option is mutually exclusive with the auction option.
For both the auction and the distribution variants, the request (scatter) message is enriched with the gatherResultChannel
header to wait for a reply message from the aggregator
.
By default, all suppliers should send their result to the replyChannel
header (usually by omitting the output-channel
from the ultimate endpoint).
However, the gatherChannel
option is also provided, letting suppliers send their reply to that channel for the aggregation.
The following example shows Java configuration for the bean definition for Scatter-Gather
:
@Bean public MessageHandler distributor() { RecipientListRouter router = new RecipientListRouter(); router.setApplySequence(true); router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(), distributionChannel3())); return router; } @Bean public MessageHandler gatherer() { return new AggregatingMessageHandler( new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"), new SimpleMessageStore(), new HeaderAttributeCorrelationStrategy( IntegrationMessageHeaderAccessor.CORRELATION_ID), new ExpressionEvaluatingReleaseStrategy("size() == 2")); } @Bean @ServiceActivator(inputChannel = "distributionChannel") public MessageHandler scatterGatherDistribution() { ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer()); handler.setOutputChannel(output()); return handler; }
In the preceding example, we configure the RecipientListRouter
distributor
bean with applySequence="true"
and the list of recipient channels.
The next bean is for an AggregatingMessageHandler
.
Finally, we inject both those beans into the ScatterGatherHandler
bean definition and mark it as a @ServiceActivator
to wire the scatter-gather component into the integration flow.
The following example shows how to configure the <scatter-gather>
endpoint by using the XML namespace:
<scatter-gather id="" auto-startup="" input-channel="" output-channel="" scatter-channel="" gather-channel="" order="" phase="" send-timeout="" gather-timeout="" requires-reply="" > <scatterer/> <gatherer/> </scatter-gather>
The id of the endpoint.
The | |
Lifecycle attribute signaling whether the endpoint should be started during application context initialization.
In addition, the | |
The channel on which to receive request messages to handle them in the | |
The channel to which the | |
The channel to which to send the scatter message for the auction scenario.
Optional.
Mutually exclusive with the | |
The channel on which to receive replies from each supplier for the aggregation.
It is used as the | |
The order of this component when more than one handler is subscribed to the same | |
Specifies the phase in which the endpoint should be started and stopped.
The startup order proceeds from lowest to highest, and the shutdown order is from highest to lowest.
By default, this value is | |
The timeout interval to wait when sending a reply | |
Lets you specify how long the scatter-gather waits for the reply message before returning.
By default, it waits indefinitely.
null is returned if the reply times out.
Optional.
It defaults to | |
Specifies whether the scatter-gather must return a non-null value.
This value is | |
The | |
The |
Sometimes, we need to suspend a message flow thread until some other asynchronous event occurs. For example, consider an HTTP request that publishes a message to RabbitMQ. We might wish to not reply to the user until the RabbitMQ broker has issued an acknowledgment that the message was received.
In version 4.2, Spring Integration introduced the <barrier/>
component for this purpose.
The underlying MessageHandler
is the BarrierMessageHandler
.
This class also implements
MessageTriggerAction
, in which a message passed to the trigger()
method releases a corresponding thread in the
handleRequestMessage()
method (if present).
The suspended thread and trigger thread are correlated by invoking a CorrelationStrategy
on the messages.
When a message is sent to the input-channel
, the thread is suspended for up to timeout
milliseconds, waiting for
a corresponding trigger message.
The default correlation strategy uses the IntegrationMessageHeaderAccessor.CORRELATION_ID
header.
When a trigger message arrives with the same correlation, the thread is released.
The message sent to the output-channel
after release is constructed by using a MessageGroupProcessor
.
By default, the message is a Collection<?>
of the two payloads, and the headers are merged by using a
DefaultAggregatingMessageGroupProcessor
.
Caution | |
---|---|
If the |
The requires-reply
property determines the action to take if the suspended thread times out before the trigger message arrives.
By default, it is false
, which means the endpoint returns null
, the flow ends, and the thread returns to the
caller.
When true
, a ReplyRequiredException
is thrown.
You can call the trigger()
method programmatically (obtain the bean reference by using the name, barrier.handler
— where barrier
is the bean name of the barrier endpoint).
Alternatively, you can configure an <outbound-channel-adapter/>
to trigger the release.
Important | |
---|---|
Only one thread can be suspended with the same correlation. The same correlation can be used multiple times but only once concurrently. An exception is thrown if a second thread arrives with the same correlation. |
The following example shows how to use a custom header for correlation:
<int:barrier id="barrier1" input-channel="in" output-channel="out" correlation-strategy-expression="headers['myHeader']" output-processor="myOutputProcessor" discard-channel="lateTriggerChannel" timeout="10000"> </int:barrier> <int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />
Depending on which one has a message arrive first, either the thread sending a message to in
or the thread sending a message to release
waits for up to ten seconds until the other message arrives.
When the message is released, the out
channel is sent a message that combines the result of invoking the custom MessageGroupProcessor
bean, named myOutputProcessor
.
If the main thread times out and a trigger arrives later, you can configure a discard channel to which the late trigger is sent.
The following example shows the Java configuration to do so:
@Configuration @EnableIntegration public class Config { @ServiceActivator(inputChannel="in") @Bean public BarrierMessageHandler barrier() { BarrierMessageHandler barrier = new BarrierMessageHandler(10000); barrier.setOutputChannel(out()); barrier.setDiscardChannel(lateTriggers()); return barrier; } @ServiceActivator (inputChannel="release") @Bean public MessageHandler releaser() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { barrier().trigger(message); } }; } }
For an example of this component, see the barrier sample application.