6. Message Routing

6.1 Routers

6.1.1 Overview

Routers are a crucial element in many messaging architectures. They consume Messages from a Message Channel and forward each consumed message to one or more different Message Channel depending on a set of conditions.

Spring Integration provides the following routers out-of-the-box:

Router implementations share many configuration parameters. Yet, certain differences exist between routers. Furthermore, the availability of configuration parameters depends on whether Routers are used inside or outside of a chain. In order to provide a quick overview, all available attributes are listed in the 2 tables below.

Table 6.1. Routers Outside of a Chain

Attributerouterheader value routerxpath routerpayload type routerrecipient list routerexception type router

apply-sequence

tickmark
tickmark
tickmark
tickmark
tickmark
tickmark

default-output-channel

tickmark
tickmark
tickmark
tickmark
tickmark
tickmark

resolution-required

tickmark
tickmark
tickmark
tickmark
tickmark
tickmark

ignore-send-failures

tickmark
tickmark
tickmark
tickmark
tickmark
tickmark

timeout

tickmark
tickmark
tickmark
tickmark
tickmark
tickmark

id

tickmark
tickmark
tickmark
tickmark
tickmark
tickmark

auto-startup

tickmark
tickmark
tickmark
tickmark
tickmark
tickmark

input-channel

tickmark
tickmark
tickmark
tickmark
tickmark
tickmark

order

tickmark
tickmark
tickmark
tickmark
tickmark
tickmark

method

tickmark
     

ref

tickmark
     

expression

tickmark
     

header-name

 
tickmark
    

evaluate-as-string

  
tickmark
   

xpath-expression-ref

  
tickmark
   

converter

  
tickmark
   

Table 6.2. Routers Inside of a Chain

Attributerouterheader value routerxpath routerpayload type routerrecipient list routerexception type router

apply-sequence

tickmark
tickmark
tickmark
tickmark
tickmark
tickmark

default-output-channel

tickmark
tickmark
tickmark
tickmark
tickmark
tickmark

resolution-required

tickmark
tickmark
tickmark
tickmark
tickmark
tickmark

ignore-send-failures

tickmark
tickmark
tickmark
tickmark
tickmark
tickmark

timeout

tickmark
tickmark
tickmark
tickmark
tickmark
tickmark

id

      

auto-startup

      

input-channel

      

order

      

method

tickmark
     

ref

tickmark
     

expression

tickmark
     

header-name

 
tickmark
    

evaluate-as-string

  
tickmark
   

xpath-expression-ref

  
tickmark
   

converter

  
tickmark
   

[Important]Important

Router parameters have been more standardized across all router implementations with Spring Integration 2.1. Consequently, there are a few minor changes that leave the possibility of breaking older Spring Integration based applications.

Since Spring Integration 2.1 the ignore-channel-name-resolution-failures attribute is removed in favor of consolidating its behavior with the resolution-required attribute. Also, the resolution-required attribute now defaults to true.

Prior to these changes, the resolution-required attribute defaulted to false causing messages to be silently dropped when no channel was resolved and no default-output-channel was set. The new behavior will require at least one resolved channel and by default will throw an MessageDeliveryException if no channel was determined (or an attempt to send was not successful).

If you do desire to drop messages silently simply set default-output-channel="nullChannel".

6.1.2 Common Router Parameters

Inside and Outside of a Chain

The following parameters are valid for all routers inside and outside of chains.

apply-sequence
This attribute specifies whether sequence number and size headers should be added to each Message. This optional attribute defaults to false.
default-output-channel
If set, this attribute provides a reference to the channel, where Messages should be sent, if channel resolution fails to return any channels. If no default output channel is provided, the router will throw an Exception. If you would like to silently drop those messages instead, add the nullChannel as the default output channel attribute value.
[Note]Note

A Message will only be sent to the default-output-channel if resolution-required is false and the channel is not resolved.

resolution-required
If true this attribute specifies that channel names must always be successfully resolved to channel instances that exist. If set to true, a MessagingException will be raised, in case the channel cannot be resolved. Setting this attribute to false, will cause any unresovable channels to be ignored. This optional attribute will, if not explicitly set, default to true.
[Note]Note

A Message will only be sent to the default-output-channel, if specified, when resolution-required is false and the channel is not resolved.

ignore-send-failures
If set to true, failures to send to a message channel will be ignored. If set to false, a MessageDeliveryException will be thrown instead, and if the router resolves more than one channel, any subsequent channels will not receive the message.

The exact behavior of this attribute depends on the type of the Channel messages are sent to. For example, when using direct channels (single threaded), send-failures can be caused by exceptions thrown by components much further down-stream. However, when sending messages to a simple queue channel (asynchronous) the likelihood of an exception to be thrown is rather remote.

[Note]Note

While most routers will route to a single channel, they are allowed to return more than one channel name. The recipient-list-router, for instance, does exactly that. If you set this attribute to true on a router that only routes to a single channel, any caused exception is simply swallowed, which usually makes little sense to do. In that case it would be better to catch the exception in an error flow at the flow entry point. Therefore, setting the ignore-send-failures attribute to true usually makes more sense when the router implementation returns more than one channel name, because the other channel(s) following the one that fails would still receive the Message.

This attribute defaults to false.

timeout
The timeout attribute specifies the maximum amount of time in milliseconds to wait, when sending Messages to the target Message Channels. By default the send operation will block indefinitely.

Top-Level (Outside of a Chain)

The following parameters are valid only across all top-level routers that are ourside of chains.

id
Identifies the underlying Spring bean definition which in case of Routers is an instance of EventDrivenConsumer or PollingConsumer depending on whether the Router’s input-channel is a SubscribableChannel or PollableChannel, respectively. This is an optional attribute.
auto-startup
This Lifecycle attribute signaled if this component should be started during startup of the Application Context. This optional attribute defaults to true.
input-channel
The receiving Message channel of this endpoint.
order
This attribute defines the order for invocation when this endpoint is connected as a subscriber to a channel. This is particularly relevant when that channel is using a failover dispatching strategy. It has no effect when this endpoint itself is a Polling Consumer for a channel with a queue.

6.1.3 Router Implementations

Since content-based routing often requires some domain-specific logic, most use-cases will require Spring Integration’s options for delegating to POJOs using the XML namespace support and/or Annotations. Both of these are discussed below, but first we present a couple implementations that are available out-of-the-box since they fulfill common requirements.

PayloadTypeRouter

A PayloadTypeRouter will send Messages to the channel as defined by payload-type mappings.

<bean id="payloadTypeRouter"
      class="org.springframework.integration.router.PayloadTypeRouter">
    <property name="channelMapping">
        <map>
            <entry key="java.lang.String" value-ref="stringChannel"/>
            <entry key="java.lang.Integer" value-ref="integerChannel"/>
        </map>
    </property>
</bean>

Configuration of the PayloadTypeRouter is also supported via the namespace provided by Spring Integration (see Section E.2, “Namespace Support”), which essentially simplifies configuration by combining the <router/> configuration and its corresponding implementation defined using a <bean/> element into a single and more concise configuration element. The example below demonstrates a PayloadTypeRouter configuration which is equivalent to the one above using the namespace support:

<int:payload-type-router input-channel="routingChannel">
    <int:mapping type="java.lang.String" channel="stringChannel" />
    <int:mapping type="java.lang.Integer" channel="integerChannel" />
</int:payload-type-router>

The equivalent router, using Java configuration:

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public PayloadTypeRouter router() {
    PayloadTypeRouter router = new PayloadTypeRouter();
    router.setChannelMapping(String.class.getName(), "stringChannel");
    router.setChannelMapping(Integer.class.getName(), "integerChannel");
    return router;
}

When using the Java DSL, there are two options; 1) define the router object as above…​

@Bean
public IntegrationFlow routerFlow1() {
    return IntegrationFlows.from("routingChannel")
            .route(router())
            .get();
}

public PayloadTypeRouter router() {
    PayloadTypeRouter router = new PayloadTypeRouter();
    router.setChannelMapping(String.class.getName(), "stringChannel");
    router.setChannelMapping(Integer.class.getName(), "integerChannel");
    return router;
}

Note that the router can be, but doesn’t have to be, a @Bean - the flow will register it if it is not.

2) define the routing function within the DSL flow itself…​

@Bean
public IntegrationFlow routerFlow2() {
    return IntegrationFlows.from("routingChannel")
            .<Object, Class<?>>route(Object::getClass, m -> m
                    .channelMapping(String.class, "stringChannel")
                    .channelMapping(Integer.class, "integerChannel"))
            .get();
}

HeaderValueRouter

A HeaderValueRouter will send Messages to the channel based on the individual header value mappings. When a HeaderValueRouter is created it is initialized with the name of the header to be evaluated. The value of the header could be one of two things:

1. Arbitrary value

2. Channel name

If arbitrary then additional mappings for these header values to channel names is required, otherwise no additional configuration is needed.

Spring Integration provides a simple namespace-based XML configuration to configure a HeaderValueRouter. The example below demonstrates two types of namespace-based configuration for the HeaderValueRouter.

1. Configuration where mapping of header values to channels is required

<int:header-value-router input-channel="routingChannel" header-name="testHeader">
    <int:mapping value="someHeaderValue" channel="channelA" />
    <int:mapping value="someOtherHeaderValue" channel="channelB" />
</int:header-value-router>

During the resolution process this router may encounter channel resolution failures, causing an exception. If you want to suppress such exceptions and send unresolved messages to the default output channel (identified with the default-output-channel attribute) set resolution-required to false.

Normally, messages for which the header value is not explicitly mapped to a channel will be sent to the default-output-channel. However, in cases where the header value is mapped to a channel name but the channel cannot be resolved, setting the resolution-required attribute to false will result in routing such messages to the default-output-channel.

[Important]Important

With Spring Integration 2.1 the attribute was changed from ignore-channel-name-resolution-failures to resolution-required. Attribute resolution-required will default to true.

The equivalent router, using Java configuration:

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter("testHeader");
    router.setChannelMapping("someHeaderValue", "channelA");
    router.setChannelMapping("someOtherHeaderValue", "channelB");
    return router;
}

When using the Java DSL, there are two options; 1) define the router object as above…​

@Bean
public IntegrationFlow routerFlow1() {
    return IntegrationFlows.from("routingChannel")
            .route(router())
            .get();
}

public HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter("testHeader");
    router.setChannelMapping("someHeaderValue", "channelA");
    router.setChannelMapping("someOtherHeaderValue", "channelB");
    return router;
}

Note that the router can be, but doesn’t have to be, a @Bean - the flow will register it if it is not.

2) define the routing function within the DSL flow itself…​

@Bean
public IntegrationFlow routerFlow2() {
    return IntegrationFlows.from("routingChannel")
            .<Message<?>, String>route(m -> m.getHeaders().get("testHeader", String.class), m -> m
                    .channelMapping("someHeaderValue", "channelA")
                    .channelMapping("someOtherHeaderValue", "channelB"),
                e -> e.id("headerValueRouter"))
            .get();
}

2. Configuration where mapping of header values to channel names is not required since header values themselves represent channel names

<int:header-value-router input-channel="routingChannel" header-name="testHeader"/>
[Note]Note

Since Spring Integration 2.1 the behavior of resolving channels is more explicit. For example, if you ommit the default-output-channel attribute and the Router was unable to resolve at least one valid channel, and any channel name resolution failures were ignored by setting resolution-required to false, then a MessageDeliveryException is thrown.

Basically, by default the Router must be able to route messages successfully to at least one channel. If you really want to drop messages, you must also have default-output-channel set to nullChannel.

RecipientListRouter

A RecipientListRouter will send each received Message to a statically defined list of Message Channels:

<bean id="recipientListRouter"
      class="org.springframework.integration.router.RecipientListRouter">
    <property name="channels">
        <list>
            <ref bean="channel1"/>
            <ref bean="channel2"/>
            <ref bean="channel3"/>
        </list>
    </property>
</bean>

Spring Integration also provides namespace support for the RecipientListRouter configuration (see Section E.2, “Namespace Support”) as the example below demonstrates.

<int:recipient-list-router id="customRouter" input-channel="routingChannel"
        timeout="1234"
        ignore-send-failures="true"
        apply-sequence="true">
  <int:recipient channel="channel1"/>
  <int:recipient channel="channel2"/>
</int:recipient-list-router>

The equivalent router, using Java configuration:

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public RecipientListRouter router() {
    RecipientListRouter router = new RecipientListRouter();
    router.setSendTimeout(1_234L);
    router.setIgnoreSendFailures(true);
    router.setApplySequence(true);
    router.addRecipient("channel1");
    router.addRecipient("channel2");
    router.addRecipient("channel3");
    return router;
}

The equivalent router, using the Java DSL:

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlows.from("routingChannel")
            .routeToRecipients(r -> r
                    .applySequence(true)
                    .ignoreSendFailures(true)
                    .recipient("channel1")
                    .recipient("channel2")
                    .recipient("channel3")
                    .sendTimeout(1_234L))
            .get();
}
[Note]Note

The apply-sequence flag here has the same effect as it does for a publish-subscribe-channel, and like a publish-subscribe-channel, it is disabled by default on the recipient-list-router. Refer to the section called “PublishSubscribeChannel Configuration” for more information.

Another convenient option when configuring a RecipientListRouter is to use Spring Expression Language (SpEL) support as selectors for individual recipient channels. This is similar to using a Filter at the beginning of chain to act as a "Selective Consumer". However, in this case, it’s all combined rather concisely into the router’s configuration.

<int:recipient-list-router id="customRouter" input-channel="routingChannel">
    <int:recipient channel="channel1" selector-expression="payload.equals('foo')"/>
    <int:recipient channel="channel2" selector-expression="headers.containsKey('bar')"/>
</int:recipient-list-router>

In the above configuration a SpEL expression identified by the selector-expression attribute will be evaluated to determine if this recipient should be included in the recipient list for a given input Message. The evaluation result of the expression must be a boolean. If this attribute is not defined, the channel will always be among the list of recipients.

RecipientListRouterManagement

Starting with version 4.1, the RecipientListRouter provides several operation to manipulate with recipients dynamically at runtime. These management operations are presented by RecipientListRouterManagement @ManagedResource. They are available using Section 10.6, “Control Bus” as well as via JMX:

<control-bus input-channel="controlBus"/>

<recipient-list-router id="simpleRouter" input-channel="routingChannelA">
   <recipient channel="channel1"/>
</recipient-list-router>

<channel id="channel2"/>
messagingTemplate.convertAndSend(controlBus, "@'simpleRouter.handler'.addRecipient('channel2')");

From the application start up the simpleRouter will have only one channel1 recipient. But after the addRecipient command above the new channel2 recipient will be added. It is a "registering an interest in something that is part of the Message" use case, when we may be interested in messages from the router at some time period, so we are subscribing to the the recipient-list-router and in some point decide to unsubscribe our interest.

Having the runtime management operation for the <recipient-list-router>, it can be configured without any <recipient> from the start. In this case the behaviour of RecipientListRouter is the same, when there is no one matching recipient for the message: if defaultOutputChannel is configured, the message will be sent there, otherwise the MessageDeliveryException is thrown.

XPath Router

The XPath Router is part of the XML Module. See Section 37.6, “Routing XML Messages Using XPath”.

Routing and Error handling

Spring Integration also provides a special type-based router called ErrorMessageExceptionTypeRouter for routing Error Messages (Messages whose payload is a Throwable instance). ErrorMessageExceptionTypeRouter is very similar to the PayloadTypeRouter. In fact they are almost identical. The only difference is that while PayloadTypeRouter navigates the instance hierarchy of a payload instance (e.g., payload.getClass().getSuperclass()) to find the most specific type/channel mappings, the ErrorMessageExceptionTypeRouter navigates the hierarchy of exception causes (e.g., payload.getCause()) to find the most specific Throwable type/channel mappings and uses mappingClass.isInstance(cause) to match the cause to the class or any super class.

[Note]Note

Since version 4.3 the ErrorMessageExceptionTypeRouter loads all mapping classes during the initialization phase to fail-fast for a ClassNotFoundException.

Below is a sample configuration for ErrorMessageExceptionTypeRouter.

<int:exception-type-router input-channel="inputChannel"
                           default-output-channel="defaultChannel">
    <int:mapping exception-type="java.lang.IllegalArgumentException"
                 channel="illegalChannel"/>
    <int:mapping exception-type="java.lang.NullPointerException"
                 channel="npeChannel"/>
</int:exception-type-router>

<int:channel id="illegalChannel" />
<int:channel id="npeChannel" />

6.1.4 Configuring a Generic Router

Configuring a Content Based Router with XML

The "router" element provides a simple way to connect a router to an input channel and also accepts the optional default-output-channel attribute. The ref attribute references the bean name of a custom Router implementation (extending AbstractMessageRouter):

<int:router ref="payloadTypeRouter" input-channel="input1"
            default-output-channel="defaultOutput1"/>

<int:router ref="recipientListRouter" input-channel="input2"
            default-output-channel="defaultOutput2"/>

<int:router ref="customRouter" input-channel="input3"
            default-output-channel="defaultOutput3"/>

<beans:bean id="customRouterBean" class="org.foo.MyCustomRouter"/>

Alternatively, ref may point to a simple POJO that contains the @Router annotation (see below), or the ref may be combined with an explicit method name. Specifying a method applies the same behavior described in the @Router annotation section below.

<int:router input-channel="input" ref="somePojo" method="someMethod"/>

Using a ref attribute is generally recommended if the custom router implementation is referenced in other <router> definitions. However if the custom router implementation should be scoped to a single definition of the <router>, you may provide an inner bean definition:

<int:router method="someMethod" input-channel="input3"
            default-output-channel="defaultOutput3">
    <beans:bean class="org.foo.MyCustomRouter"/>
</int:router>
[Note]Note

Using both the ref attribute and an inner handler definition in the same <router> configuration is not allowed, as it creates an ambiguous condition, and an Exception will be thrown.

[Important]Important

If the "ref" attribute references a bean that extends AbstractMessageProducingHandler (such as routers provided by the framework itself), the configuration is optimized referencing the router directly. In this case, each "ref" must be to a separate bean instance (or a prototype-scoped bean), or use the inner <bean/> configuration type. However, this optimization only applies if you don’t provide any router-specific attributes in the router XML definition. If you inadvertently reference the same message handler from multiple beans, you will get a configuration exception.

The equivalent router, using Java Configuration:

@Bean
@Router(inputChannel = "routingChannel")
public AbstractMessageRouter myCustomRouter() {
    return new AbstractMessageRouter() {

        @Override
        protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
            return // determine channel(s) for message
        }

    };
}

The equivalent router, using the Java DSL:

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlows.from("routingChannel")
            .route(myCustomRouter())
            .get();
}

public AbstractMessageRouter myCustomRouter() {
    return new AbstractMessageRouter() {

        @Override
        protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
            return // determine channel(s) for message
        }

    };
}

or, if you can route on just some message payload data:

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlows.from("routingChannel")
            .route(String.class, p -> p.contains("foo") ? "fooChannel" : "barChannel")
            .get();
}

Routers and the Spring Expression Language (SpEL)

Sometimes the routing logic may be simple and writing a separate class for it and configuring it as a bean may seem like overkill. As of Spring Integration 2.0 we offer an alternative where you can now use SpEL to implement simple computations that previously required a custom POJO router.

[Note]Note

For more information about the Spring Expression Language, please refer to the respective chapter in the Spring Framework Reference Documentation at:

Generally a SpEL expression is evaluated and the result is mapped to a channel:

<int:router input-channel="inChannel" expression="payload.paymentType">
    <int:mapping value="CASH" channel="cashPaymentChannel"/>
    <int:mapping value="CREDIT" channel="authorizePaymentChannel"/>
    <int:mapping value="DEBIT" channel="authorizePaymentChannel"/>
</int:router>

The equivalent router, using Java Configuration:

@Router(inputChannel = "routingChannel")
@Bean
public ExpressionEvaluatingRouter router() {
    ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter("payload.paymentType");
    router.setChannelMapping("CASH", "cashPaymentChannel");
    router.setChannelMapping("CREDIT", "authorizePaymentChannel");
    router.setChannelMapping("DEBIT", "authorizePaymentChannel");
    return router;
}

The equivalent router, using the Java DSL:

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlows.from("routingChannel")
        .route("payload.paymentType", r -> r
            .channelMapping("CASH", "cashPaymentChannel")
            .channelMapping("CREDIT", "authorizePaymentChannel")
            .channelMapping("DEBIT", "authorizePaymentChannel"))
        .get();
}

To simplify things even more, the SpEL expression may evaluate to a channel name:

<int:router input-channel="inChannel" expression="payload + 'Channel'"/>

In the above configuration the result channel will be computed by the SpEL expression which simply concatenates the value of the payload with the literal String Channel.

Another value of SpEL for configuring routers is that an expression can actually return a Collection, effectively making every <router> a Recipient List Router. Whenever the expression returns multiple channel values the Message will be forwarded to each channel.

<int:router input-channel="inChannel" expression="headers.channels"/>

In the above configuration, if the Message includes a header with the name channels the value of which is a List of channel names then the Message will be sent to each channel in the list. You may also find Collection Projection and Collection Selection expressions useful to select multiple channels. For further information, please see:

Configuring a Router with Annotations

When using @Router to annotate a method, the method may return either a MessageChannel or String type. In the latter case, the endpoint will resolve the channel name as it does for the default output channel. Additionally, the method may return either a single value or a collection. If a collection is returned, the reply message will be sent to multiple channels. To summarize, the following method signatures are all valid.

@Router
public MessageChannel route(Message message) {...}

@Router
public List<MessageChannel> route(Message message) {...}

@Router
public String route(Foo payload) {...}

@Router
public List<String> route(Foo payload) {...}

In addition to payload-based routing, a Message may be routed based on metadata available within the message header as either a property or attribute. In this case, a method annotated with @Router may include a parameter annotated with @Header which is mapped to a header value as illustrated below and documented in Section E.6, “Annotation Support”.

@Router
public List<String> route(@Header("orderStatus") OrderStatus status)
[Note]Note

For routing of XML-based Messages, including XPath support, see Chapter 37, XML Support - Dealing with XML Payloads.

Also see Section 9.9, “Message Routers” in Java DSL chapter for more information about routers configuration.

6.1.5 Dynamic Routers

So as you can see, Spring Integration provides quite a few different router configurations for common content-based routing use cases as well as the option of implementing custom routers as POJOs. For example PayloadTypeRouter provides a simple way to configure a router which computes channels based on the payload type of the incoming Message while HeaderValueRouter provides the same convenience in configuring a router which computes channels by evaluating the value of a particular Message Header. There are also expression-based (SpEL) routers where the channel is determined based on evaluating an expression. Thus, these type of routers exhibit some dynamic characteristics.

However these routers all require static configuration. Even in the case of expression-based routers, the expression itself is defined as part of the router configuration which means that_the same expression operating on the same value will always result in the computation of the same channel_. This is acceptable in most cases since such routes are well defined and therefore predictable. But there are times when we need to change router configurations dynamically so message flows may be routed to a different channel.

Example:

You might want to bring down some part of your system for maintenance and temporarily re-reroute messages to a different message flow. Or you may want to introduce more granularity to your message flow by adding another route to handle a more concrete type of java.lang.Number (in the case of PayloadTypeRouter).

Unfortunately with static router configuration to accomplish this, you would have to bring down your entire application, change the configuration of the router (change routes) and bring it back up. This is obviously not the solution.

The Dynamic Router pattern describes the mechanisms by which one can change/configure routers dynamically without bringing down the system or individual routers. 

Before we get into the specifics of how this is accomplished in Spring Integration, let’s quickly summarize the typical flow of the router, which consists of 3 simple steps:

  • Step 1 - Compute channel identifier which is a value calculated by the router once it receives the Message. Typically it is a String or and instance of the actual MessageChannel.
  • Step 2 - Resolve channel identifier to channel name. We’ll describe specifics of this process in a moment.
  • Step 3 - Resolve channel name to the actual MessageChannel

There is not much that can be done with regard to dynamic routing if Step 1 results in the actual instance of the MessageChannel, simply because the MessageChannel is the final product of any router’s job. However, if Step 1 results in a channel identifier that is not an instance of MessageChannel, then there are quite a few possibilities to influence the process of deriving the Message Channel. Lets look at couple of the examples in the context of the 3 steps mentioned above: 

Payload Type Router

<int:payload-type-router input-channel="routingChannel">
    <int:mapping type="java.lang.String"  channel="channel1" />
    <int:mapping type="java.lang.Integer" channel="channel2" />
</int:payload-type-router>

Within the context of the Payload Type Router the 3 steps mentioned above would be realized as:

  • Step 1 - Compute channel identifier which is the fully qualified name of the payload type (e.g., java.lang.String).
  • Step 2 - Resolve channel identifier to channel name where the result of the previous step is used to select the appropriate value from the payload type mapping defined via mapping element.
  • Step 3 - Resolve channel name to the actual instance of the MessageChannel as a reference to a bean within the Application Context (which is hopefully a MessageChannel) identified by the result of the previous step.

In other words, each step feeds the next step until the process completes.

Header Value Router

<int:header-value-router input-channel="inputChannel" header-name="testHeader">
    <int:mapping value="foo" channel="fooChannel" />
    <int:mapping value="bar" channel="barChannel" />
</int:header-value-router>

Similar to the PayloadTypeRouter:

  • Step 1 - Compute channel identifier which is the value of the header identified by the header-name attribute.
  • Step 2 - Resolve channel identifier to channel name where the result of the previous step is used to select the appropriate value from the general mapping defined via mapping element.
  • Step 3 - Resolve channel name to the actual instance of the MessageChannel as a reference to a bean within the Application Context (which is hopefully a MessageChannel) identified by the result of the previous step.

The above two configurations of two different router types look almost identical. However if we look at the alternate configuration of the HeaderValueRouter we clearly see that there is no mapping sub element:

<int:header-value-router input-channel="inputChannel" header-name="testHeader">

But the configuration is still perfectly valid. So the natural question is what about the mapping in the Step 2?

What this means is that Step 2 is now an optional step. If mapping is not defined then the channel identifier value computed in Step 1 will automatically be treated as the channel name, which will now be resolved to the actual MessageChannel as in Step 3.  What it also means is that Step 2 is one of the key steps to provide dynamic characteristics to the routers, since it introduces a process which allows you to change the way channel identifier resolves to 'channel name', thus influencing the process of determining the final instance of the MessageChannel from the initial channel identifier

For Example:

In the above configuration let’s assume that the testHeader value is kermit which is now a channel identifier (Step 1). Since there is no mapping in this router, resolving this channel identifier to a channel name (Step 2) is impossible and this channel identifier is now treated as channel name. However what if there was a mapping but for a different value? The end result would still be the same and that is: if a new value cannot be determined through the process of resolving the channel identifier to a channel name, such channel identifier becomes channel name.

So all that is left is for Step 3 to resolve the channel name (kermit) to an actual instance of the MessageChannel identified by this name. That basically involves a bean lookup for the name provided. So now all messages which contain the header/value pair as testHeader=kermit are going to be routed to a MessageChannel whose bean name (id) is kermit.

But what if you want to route these messages to the simpson channel? Obviously changing a static configuration will work, but will also require bringing your system down. However if you had access to the channel identifier map, then you could just introduce a new mapping where the header/value pair is now kermit=simpson, thus allowing Step 2 to treat kermit as a channel identifier while resolving it to simpson as the channel name .

The same obviously applies for PayloadTypeRouter, where you can now remap or remove a particular payload type mapping. In fact, it applies to every other router, including expression-based routers, since their computed values will now have a chance to go through Step 2 to be additionally resolved to the actual channel name.

Any router that is a subclass of the AbstractMappingMessageRouter (which includes most framework defined routers) is a Dynamic Router simply because the channelMapping is defined at the AbstractMappingMessageRouter level. That map’s setter method is exposed as a public method along with setChannelMapping and removeChannelMapping methods. These allow you to change/add/remove router mappings at runtime as long as you have a reference to the router itself. It also means that you could expose these same configuration options via JMX (see Section 10.2, “JMX Support”) or the Spring Integration ControlBus (see Section 10.6, “Control Bus”) functionality. 

Manage Router Mappings using the Control Bus

One way to manage the router mappings is through the Control Bus pattern which exposes a Control Channel where you can send control messages to manage and monitor Spring Integration components, including routers.

[Note]Note

For more information about the Control Bus, please see chapter Section 10.6, “Control Bus”.

Typically you would send a control message asking to invoke a particular operation on a particular managed component (e.g. router). Two managed operations (methods) that are specific to changing the router resolution process are:

  • public void setChannelMapping(String key, String channelName) - will allow you to add a new or modify an existing mapping between channel identifier and channel name
  • public void removeChannelMapping(String key) - will allow you to remove a particular channel mapping, thus disconnecting the relationship between channel identifier and channel name

Note that these methods can be used for simple changes (updating a single route or adding/removing a route). However, if you want to remove one route and add another, the updates are not atomic. This means the routing table may be in an indeterminate state between the updates. Starting with version 4.0, you can now use the control bus to update the entire routing table atomically.

  • public Map<String, String>getChannelMappings() returns the current mappings.
  • public void replaceChannelMappings(Properties channelMappings) updates the mappings. Notice that the parameter is a properties object; this allows the use of the inbuilt StringToPropertiesConverter by a control bus command, for example:
"@'router.handler'.replaceChannelMappings('foo=qux \n baz=bar')"
  • note that each mapping is separated by a newline character (\n). For programmatic changes to the map, it is recommended that the setChannelMappings method is used instead, for type-safety. Any non-String keys or values passed into replaceChannelMappings are ignored.

Manage Router Mappings using JMX

You can also expose a router instance with Spring’s JMX support, and then use your favorite JMX client (e.g., JConsole) to manage those operations (methods) for changing the router’s configuration.

[Note]Note

For more information about Spring Integration’s JMX support, please see chapter Section 10.2, “JMX Support”.

Routing Slip

Starting with version 4.1, Spring Integration provides an implementation of the Routing Slip Enterprise Integration Pattern. It is implemented as a routingSlip message header which is used to determine the next channel in AbstractMessageProducingHandler s, when an outputChannel isn’t specified for the endpoint. This pattern is useful in complex, dynamic, cases when it can become difficult to configure multiple routers to determine message flow. When a message arrives at an endpoint that has no output-channel, the routingSlip is consulted to determine the next channel to which the message will be sent. When the routing slip is exhausted, normal replyChannel processing resumes.

Configuration for the Routing Slip is presented as a HeaderEnricher option - a semicolon-separated Routing Slip path entries:

<util:properties id="properties">
    <beans:prop key="myRoutePath1">channel1</beans:prop>
    <beans:prop key="myRoutePath2">request.headers[myRoutingSlipChannel]</beans:prop>
</util:properties>

<context:property-placeholder properties-ref="properties"/>

<header-enricher input-channel="input" output-channel="process">
    <routing-slip
        value="${myRoutePath1}; @routingSlipRoutingPojo.get(request, reply);
               routingSlipRoutingStrategy; ${myRoutePath2}; finishChannel"/>
</header-enricher>

In this sample we have:

  • A <context:property-placeholder> configuration to demonstrate that the entries in the Routing Slip path can be specified as resolvable keys.
  • The <header-enricher> <routing-slip> sub-element is used to populate the RoutingSlipHeaderValueMessageProcessor to the HeaderEnricher handler.
  • The RoutingSlipHeaderValueMessageProcessor accepts a String array of resolved Routing Slip path entries and returns (from processMessage()) a singletonMap with the path as key and 0 as initial routingSlipIndex.

Routing Slip path entries can contain MessageChannel bean names, RoutingSlipRouteStrategy bean names and also Spring expressions (SpEL). The RoutingSlipHeaderValueMessageProcessor checks each Routing Slip path entry against the BeanFactory on the first processMessage invocation. It converts entries, which aren’t bean names in the application context, to ExpressionEvaluatingRoutingSlipRouteStrategy instances. RoutingSlipRouteStrategy entries are invoked multiple times, until they return null, or an empty String.

Since the Routing Slip is involved in the getOutputChannel process we have a request-reply context. The RoutingSlipRouteStrategy has been introduced to determine the next outputChannel using the requestMessage, as well as the reply object. An implementation of this strategy should be registered as a bean in the application context and its bean name is used in the Routing Slip path. The ExpressionEvaluatingRoutingSlipRouteStrategy implementation is provided. It accepts a SpEL expression, and an internal ExpressionEvaluatingRoutingSlipRouteStrategy.RequestAndReply object is used as the root object of the evaluation context. This is to avoid the overhead of EvaluationContext creation for each ExpressionEvaluatingRoutingSlipRouteStrategy.getNextPath() invocation. It is a simple Java Bean with two properties - Message<?> request and Object reply. With this expression implementation, we can specify Routing Slip path entries using SpEL (@routingSlipRoutingPojo.get(request, reply), request.headers[myRoutingSlipChannel]) avoiding a bean definition for the RoutingSlipRouteStrategy.

[Note]Note

The requestMessage argument is always a Message<?>; depending on context, the reply object may be a Message<?>, an AbstractIntegrationMessageBuilder or an arbitrary application domain object (if, for example, it is returned by a POJO method invoked by a service activator). In the first two cases, the usual "message" properties are available (payload and headers) when using SpEL (or a Java implementation). When an arbitrary domain object, these properties are, obviously, not available. Care should be taken when using routing slips in conjunction with POJO methods if the result is used to determine the next path.

[Important]Important

If a Routing Slip is involved in a distributed environment - cross-JVM application, request-reply through a Message Broker (e.g. Chapter 12, AMQP Support, Chapter 21, JMS Support), or persistence MessageStore (Section 10.4, “Message Store”) is used in the integration flow, etc., - it is recommended to not use inline expressions for the Routing Slip path. The framework (RoutingSlipHeaderValueMessageProcessor) converts them to ExpressionEvaluatingRoutingSlipRouteStrategy objects and they are used in the routingSlip message header. Since this class isn’t Serializable (and it can’t be, because it depends on the BeanFactory) the entire Message becomes non-serializable and in any distributed operation we end up with NotSerializableException. To overcome this limitation, register an ExpressionEvaluatingRoutingSlipRouteStrategy bean with the desired SpEL and use its bean name in the Routing Slip path configuration.

For Java configuration, simply add a RoutingSlipHeaderValueMessageProcessor instance to the HeaderEnricher bean definition:

@Bean
@Transformer(inputChannel = "routingSlipHeaderChannel")
public HeaderEnricher headerEnricher() {
    return new HeaderEnricher(Collections.singletonMap(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
            new RoutingSlipHeaderValueMessageProcessor("myRoutePath1",
                                                       "@routingSlipRoutingPojo.get(request, reply)",
                                                       "routingSlipRoutingStrategy",
                                                       "request.headers[myRoutingSlipChannel]",
                                                       "finishChannel")));
}

The Routing Slip algorithm works as follows when an endpoint produces a reply and there is no outputChannel defined:

  • The routingSlipIndex is used to get a value from the Routing Slip path list.
  • If the value by routingSlipIndex is String, it is used to get a bean from BeanFactory.
  • If a returned bean is an instance of MessageChannel, it is used as the next outputChannel and the routingSlipIndex is incremented in the reply message header (the Routing Slip path entries remain unchanged).
  • If a returned bean is an instance of RoutingSlipRouteStrategy and its getNextPath doesn’t return an empty String, that result is used a bean name for the next outputChannel. The routingSlipIndex remains unchanged.
  • If RoutingSlipRouteStrategy.getNextPath returns an empty String, the routingSlipIndex is incremented and the getOutputChannelFromRoutingSlip is invoked recursively for the next Routing Slip path item;
  • If the next Routing Slip path entry isn’t a String it must be an instance of RoutingSlipRouteStrategy;
  • When the routingSlipIndex exceeds the size of the Routing Slip path list, the algorithm moves to the default behavior for the standard replyChannel header.

Process Manager Enterprise Integration Pattern

The EIP also defines the Process Manager pattern. This pattern can now easily be implemented using custom Process Manager logic encapsulated in a RoutingSlipRouteStrategy within the routing slip. In addition to a bean name, the RoutingSlipRouteStrategy can return any MessageChannel object; and there is no requirement that this MessageChannel instance is a bean in the application context. This way, we can provide powerful dynamic routing logic, when there is no prediction which channel should be used; a MessageChannel can be created within the RoutingSlipRouteStrategy and returned. A FixedSubscriberChannel with an associated MessageHandler implementation is good combination for such cases. For example we can route to a Reactor Stream:

@Bean
public PollableChannel resultsChannel() {
    return new QueueChannel();
}
@Bean
public RoutingSlipRouteStrategy routeStrategy() {
    return (requestMessage, reply) -> requestMessage.getPayload() instanceof String
            ? new FixedSubscriberChannel(m ->
            Mono.just((String) m.getPayload())
                    .map(String::toUpperCase)
                    .subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)))
            : new FixedSubscriberChannel(m ->
            Mono.just((Integer) m.getPayload())
                    .map(v -> v * 2)
                    .subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)));
}

6.2 Filter

6.2.1 Introduction

Message Filters are used to decide whether a Message should be passed along or dropped based on some criteria such as a Message Header value or Message content itself. Therefore, a Message Filter is similar to a router, except that for each Message received from the filter’s input channel, that same Message may or may not be sent to the filter’s output channel. Unlike the router, it makes no decision regarding which Message Channel to send the Message to but only decides whether to send.

[Note]Note

As you will see momentarily, the Filter also supports a discard channel, so in certain cases it can play the role of a very simple router (or "switch") based on a boolean condition.

In Spring Integration, a Message Filter may be configured as a Message Endpoint that delegates to an implementation of the MessageSelector interface. That interface is itself quite simple:

public interface MessageSelector {

    boolean accept(Message<?> message);

}

The MessageFilter constructor accepts a selector instance:

MessageFilter filter = new MessageFilter(someSelector);

In combination with the namespace and SpEL, very powerful filters can be configured with very little java code.

6.2.2 Configuring Filter

Configuring a Filter with XML

The <filter> element is used to create a Message-selecting endpoint. In addition to input-channel and output-channel attributes, it requires a ref. The ref may point to a MessageSelector implementation:

<int:filter input-channel="input" ref="selector" output-channel="output"/>

<bean id="selector" class="example.MessageSelectorImpl"/>

Alternatively, the method attribute can be added at which point the ref may refer to any object. The referenced method may expect either the Message type or the payload type of inbound Messages. The method must return a boolean value. If the method returns true, the Message will be sent to the output-channel.

<int:filter input-channel="input" output-channel="output"
    ref="exampleObject" method="someBooleanReturningMethod"/>

<bean id="exampleObject" class="example.SomeObject"/>

If the selector or adapted POJO method returns false, there are a few settings that control the handling of the rejected Message. By default (if configured like the example above), rejected Messages will be silently dropped. If rejection should instead result in an error condition, then set the throw-exception-on-rejection attribute to true:

<int:filter input-channel="input" ref="selector"
    output-channel="output" throw-exception-on-rejection="true"/>

If you want rejected messages to be routed to a specific channel, provide that reference as the discard-channel:

<int:filter input-channel="input" ref="selector"
    output-channel="output" discard-channel="rejectedMessages"/>

Also see Section 8.9.7, “Advising Filters”.

[Note]Note

Message Filters are commonly used in conjunction with a Publish Subscribe Channel. Many filter endpoints may be subscribed to the same channel, and they decide whether or not to pass the Message to the next endpoint which could be any of the supported types (e.g. Service Activator). This provides a reactive alternative to the more proactive approach of using a Message Router with a single Point-to-Point input channel and multiple output channels.

Using a ref attribute is generally recommended if the custom filter implementation is referenced in other <filter> definitions. However if the custom filter implementation is scoped to a single <filter> element, provide an inner bean definition:

<int:filter method="someMethod" input-channel="inChannel" output-channel="outChannel">
  <beans:bean class="org.foo.MyCustomFilter"/>
</filter>
[Note]Note

Using both the ref attribute and an inner handler definition in the same <filter> configuration is not allowed, as it creates an ambiguous condition, and an Exception will be thrown.

[Important]Important

If the "ref" attribute references a bean that extends MessageFilter (such as filters provided by the framework itself), the configuration is optimized by injecting the output channel into the filter bean directly. In this case, each "ref" must be to a separate bean instance (or a prototype-scoped bean), or use the inner <bean/> configuration type. However, this optimization only applies if you don’t provide any filter-specific attributes in the filter XML definition. If you inadvertently reference the same message handler from multiple beans, you will get a configuration exception.

With the introduction of SpEL support, Spring Integration added the expression attribute to the filter element. It can be used to avoid Java entirely for simple filters.

<int:filter input-channel="input" expression="payload.equals('nonsense')"/>

The string passed as the expression attribute will be evaluated as a SpEL expression with the Message available in the evaluation context. If it is necessary to include the result of an expression in the scope of the application context you can use the #{} notation as defined in the SpEL reference documentation.

<int:filter input-channel="input"
            expression="payload.matches(#{filterPatterns.nonsensePattern})"/>

If the Expression itself needs to be dynamic, then an expression sub-element may be used. That provides a level of indirection for resolving the Expression by its key from an ExpressionSource. That is a strategy interface that you can implement directly, or you can rely upon a version available in Spring Integration that loads Expressions from a "resource bundle" and can check for modifications after a given number of seconds. All of this is demonstrated in the following configuration sample where the Expression could be reloaded within one minute if the underlying file had been modified. If the ExpressionSource bean is named "expressionSource", then it is not necessary to provide the` source` attribute on the <expression> element, but in this case it’s shown for completeness.

<int:filter input-channel="input" output-channel="output">
    <int:expression key="filterPatterns.example" source="myExpressions"/>
</int:filter>

<beans:bean id="myExpressions" id="myExpressions"
    class="o.s.i.expression.ReloadableResourceBundleExpressionSource">
    <beans:property name="basename" value="config/integration/expressions"/>
    <beans:property name="cacheSeconds" value="60"/>
</beans:bean>

Then, the config/integration/expressions.properties file (or any more specific version with a locale extension to be resolved in the typical way that resource-bundles are loaded) would contain a key/value pair:

filterPatterns.example=payload > 100
[Note]Note

All of these examples that use expression as an attribute or sub-element can also be applied within transformer, router, splitter, service-activator, and header-enricher elements. Of course, the semantics/role of the given component type would affect the interpretation of the evaluation result in the same way that the return value of a method-invocation would be interpreted. For example, an expression can return Strings that are to be treated as Message Channel names by a router component. However, the underlying functionality of evaluating the expression against the Message as the root object, and resolving bean names if prefixed with @ is consistent across all of the core EIP components within Spring Integration.

Configuring a Filter with Annotations

A filter configured using annotations would look like this.

public class PetFilter {
    ...
    @Filter  1
    public boolean dogsOnly(String input) {
        ...
    }
}

1

An annotation indicating that this method shall be used as a filter. Must be specified if this class will be used as a filter.

All of the configuration options provided by the xml element are also available for the @Filter annotation.

The filter can be either referenced explicitly from XML or, if the @MessageEndpoint annotation is defined on the class, detected automatically through classpath scanning.

Also see Section 8.9.8, “Advising Endpoints Using Annotations”.

6.3 Splitter

6.3.1 Introduction

The Splitter is a component whose role is to partition a message in several parts, and send the resulting messages to be processed independently. Very often, they are upstream producers in a pipeline that includes an Aggregator.

6.3.2 Programming model

The API for performing splitting consists of one base class, AbstractMessageSplitter, which is a MessageHandler implementation, encapsulating features which are common to splitters, such as filling in the appropriate message headers CORRELATION_ID, SEQUENCE_SIZE, and SEQUENCE_NUMBER on the messages that are produced. This enables tracking down the messages and the results of their processing (in a typical scenario, these headers would be copied over to the messages that are produced by the various transforming endpoints), and use them, for example, in a Composed Message Processor scenario.

An excerpt from AbstractMessageSplitter can be seen below:

public abstract class AbstractMessageSplitter
    extends AbstractReplyProducingMessageConsumer {
    ...
    protected abstract Object splitMessage(Message<?> message);

}

To implement a specific Splitter in an application, extend AbstractMessageSplitter and implement the splitMessage method, which contains logic for splitting the messages. The return value may be one of the following:

  • A Collection or an array of Messages, or an Iterable (or Iterator) that iterates over Messages - in this case the messages will be sent as such (after the CORRELATION_ID, SEQUENCE_SIZE and SEQUENCE_NUMBER are populated). Using this approach gives more control to the developer, for example for populating custom message headers as part of the splitting process.
  • A Collection or an array of non-Message objects, or an Iterable (or Iterator) that iterates over non-Message objects - works like the prior case, except that each collection element will be used as a Message payload. Using this approach allows developers to focus on the domain objects without having to consider the Messaging system and produces code that is easier to test.
  • a Message or non-Message object (but not a Collection or an Array) - it works like the previous cases, except a single message will be sent out.

In Spring Integration, any POJO can implement the splitting algorithm, provided that it defines a method that accepts a single argument and has a return value. In this case, the return value of the method will be interpreted as described above. The input argument might either be a Message or a simple POJO. In the latter case, the splitter will receive the payload of the incoming message. Since this decouples the code from the Spring Integration API and will typically be easier to test, it is the recommended approach.

Iterators

Starting with version 4.1, the AbstractMessageSplitter supports the Iterator type for the value to split. Note, in the case of an Iterator (or Iterable), we don’t have access to the number of underlying items and the SEQUENCE_SIZE header is set to 0. This means that the default SequenceSizeReleaseStrategy of an <aggregator> won’t work and the group for the CORRELATION_ID from the splitter won’t be released; it will remain as incomplete. In this case you should use an appropriate custom ReleaseStrategy or rely on send-partial-result-on-expiry together with group-timeout or a MessageGroupStoreReaper.

Starting with version 5.0, the AbstractMessageSplitter provides protected obtainSizeIfPossible() methods to allow the determination of the size of the Iterable and Iterator objects if that is possible. For example XPathMessageSplitter can determine the size of the underlying NodeList object. And starting with version 5.0.9, this method also properly returns a size of the com.fasterxml.jackson.core.TreeNode.

An Iterator object is useful to avoid the need for building an entire collection in the memory before splitting. For example, when underlying items are populated from some external system (e.g. DataBase or FTP MGET) using iterations or streams.

Stream and Flux

Starting with version 5.0, the AbstractMessageSplitter supports the Java Stream and Reactive Streams Publisher types for the value to split. In this case the target Iterator is built on their iteration functionality.

In addition, if Splitter’s output channel is an instance of a ReactiveStreamsSubscribableChannel, the AbstractMessageSplitter produces a Flux result instead of an Iterator and the output channel is subscribed to this Flux for back-pressure based splitting on downstream flow demand.

6.3.3 Configuring Splitter

Configuring a Splitter using XML

A splitter can be configured through XML as follows:

<int:channel id="inputChannel"/>

<int:splitter id="splitter"  1
  ref="splitterBean"  2
  method="split"  3
  input-channel="inputChannel"  4
  output-channel="outputChannel" /> 5

<int:channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>

1

The id of the splitter is optional.

2

A reference to a bean defined in the application context. The bean must implement the splitting logic as described in the section above .Optional. If reference to a bean is not provided, then it is assumed that the payload of the Message that arrived on the input-channel is an implementation of java.util.Collection and the default splitting logic will be applied to the Collection, incorporating each individual element into a Message and sending it to the output-channel.

3

The method (defined on the bean specified above) that implements the splitting logic.Optional.

4

The input channel of the splitter. Required.

5

The channel to which the splitter will send the results of splitting the incoming message. Optional (because incoming messages can specify a reply channel themselves).

Using a ref attribute is generally recommended if the custom splitter implementation may be referenced in other <splitter> definitions. However if the custom splitter handler implementation should be scoped to a single definition of the <splitter>, configure an inner bean definition:

<int:splitter id="testSplitter" input-channel="inChannel" method="split"
                output-channel="outChannel">
  <beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
[Note]Note

Using both a ref attribute and an inner handler definition in the same <int:splitter> configuration is not allowed, as it creates an ambiguous condition and will result in an Exception being thrown.

[Important]Important

If the "ref" attribute references a bean that extends AbstractMessageProducingHandler (such as splitters provided by the framework itself), the configuration is optimized by injecting the output channel into the handler directly. In this case, each "ref" must be to a separate bean instance (or a prototype-scoped bean), or use the inner <bean/> configuration type. However, this optimization only applies if you don’t provide any splitter-specific attributes in the splitter XML definition. If you inadvertently reference the same message handler from multiple beans, you will get a configuration exception.

Configuring a Splitter with Annotations

The @Splitter annotation is applicable to methods that expect either the Message type or the message payload type, and the return values of the method should be a Collection of any type. If the returned values are not actual Message objects, then each item will be wrapped in a Message as its payload. Each message will be sent to the designated output channel for the endpoint on which the @Splitter is defined.

@Splitter
List<LineItem> extractItems(Order order) {
    return order.getItems()
}

Also see Section 8.9.8, “Advising Endpoints Using Annotations”.

Also see Section 9.10, “Splitters” in Java DSL chapter.

6.4 Aggregator

6.4.1 Introduction

Basically a mirror-image of the Splitter, the Aggregator is a type of Message Handler that receives multiple Messages and combines them into a single Message. In fact, an Aggregator is often a downstream consumer in a pipeline that includes a Splitter.

Technically, the Aggregator is more complex than a Splitter, because it is stateful as it must hold the Messages to be aggregated and determine when the complete group of Messages is ready to be aggregated. In order to do this it requires a MessageStore.

6.4.2 Functionality

The Aggregator combines a group of related messages, by correlating and storing them, until the group is deemed complete. At that point, the Aggregator will create a single message by processing the whole group, and will send the aggregated message as output.

Implementing an Aggregator requires providing the logic to perform the aggregation (i.e., the creation of a single message from many). Two related concepts are correlation and release.

Correlation determines how messages are grouped for aggregation. In Spring Integration correlation is done by default based on the IntegrationMessageHeaderAccessor.CORRELATION_ID message header. Messages with the same IntegrationMessageHeaderAccessor.CORRELATION_ID will be grouped together. However, the correlation strategy may be customized to allow other ways of specifying how the messages should be grouped together by implementing a CorrelationStrategy (see below).

To determine the point at which a group of messages is ready to be processed, a ReleaseStrategy is consulted. The default release strategy for the Aggregator will release a group when all messages included in a sequence are present, based on the IntegrationMessageHeaderAccessor.SEQUENCE_SIZE header. This default strategy may be overridden by providing a reference to a custom ReleaseStrategy implementation.

6.4.3 Programming model

The Aggregation API consists of a number of classes:

  • The interface MessageGroupProcessor, and its subclasses: MethodInvokingAggregatingMessageGroupProcessor and ExpressionEvaluatingMessageGroupProcessor
  • The ReleaseStrategy interface and its default implementation SimpleSequenceSizeReleaseStrategy
  • The CorrelationStrategy interface and its default implementation HeaderAttributeCorrelationStrategy

AggregatingMessageHandler

The AggregatingMessageHandler (subclass of AbstractCorrelatingMessageHandler) is a MessageHandler implementation, encapsulating the common functionalities of an Aggregator (and other correlating use cases), which are:

  • correlating messages into a group to be aggregated
  • maintaining those messages in a MessageStore until the group can be released
  • deciding when the group can be released
  • aggregating the released group into a single message
  • recognizing and responding to an expired group

The responsibility of deciding how the messages should be grouped together is delegated to a CorrelationStrategy instance. The responsibility of deciding whether the message group can be released is delegated to a ReleaseStrategy instance.

Here is a brief highlight of the base AbstractAggregatingMessageGroupProcessor (the responsibility of implementing the aggregatePayloads method is left to the developer):

public abstract class AbstractAggregatingMessageGroupProcessor
              implements MessageGroupProcessor {

    protected Map<String, Object> aggregateHeaders(MessageGroup group) {
        // default implementation exists
    }

    protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);

}

The CorrelationStrategy is owned by the AbstractCorrelatingMessageHandler and it has a default value based on the IntegrationMessageHeaderAccessor.CORRELATION_ID message header:

public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
        CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
    ...
    this.correlationStrategy = correlationStrategy == null ?
        new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
    this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
    ...
}

As for actual processing of the message group, the default implementation is the DefaultAggregatingMessageGroupProcessor. It creates a single Message whose payload is a List of the payloads received for a given group. This works well for simple Scatter Gather implementations with either a Splitter, Publish Subscribe Channel, or Recipient List Router upstream.

[Note]Note

When using a Publish Subscribe Channel or Recipient List Router in this type of scenario, be sure to enable the flag to apply-sequence. That will add the necessary headers (CORRELATION_ID, SEQUENCE_NUMBER and SEQUENCE_SIZE). That behavior is enabled by default for Splitters in Spring Integration, but it is not enabled for the Publish Subscribe Channel or Recipient List Router because those components may be used in a variety of contexts in which these headers are not necessary.

When implementing a specific aggregator strategy for an application, a developer can extend AbstractAggregatingMessageGroupProcessor and implement the aggregatePayloads method. However, there are better solutions, less coupled to the API, for implementing the aggregation logic which can be configured easily either through XML or through annotations.

In general, any POJO can implement the aggregation algorithm if it provides a method that accepts a single java.util.List as an argument (parameterized lists are supported as well). This method will be invoked for aggregating messages as follows:

  • if the argument is a java.util.Collection<T>, and the parameter type T is assignable to Message, then the whole list of messages accumulated for aggregation will be sent to the aggregator
  • if the argument is a non-parameterized java.util.Collection or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages
  • if the return type is not assignable to Message, then it will be treated as the payload for a Message that will be created automatically by the framework.
[Note]Note

In the interest of code simplicity, and promoting best practices such as low coupling, testability, etc., the preferred way of implementing the aggregation logic is through a POJO, and using the XML or annotation support for configuring it in the application.

[Important]Important

The SimpleMessageGroup.getMessages() method returns an unmodifiableCollection, therefore, if your aggregating POJO method has a Collection<Message> parameter, the argument passed in will be exactly that Collection instance and, when a SimpleMessageStore is used for the Aggregator, that original Collection<Message> will be cleared after releasing the group. Hence the Collection<Message> variable in the POJO will be cleared too, if passed out of the aggregator. If you wish to simply release that collection as-is for further processing, it is required that you build a new Collection (e.g. new ArrayList<Message>(messages)) Starting with _version 4.3, the Framework no longer copies the messages to a new collection, to avoid undesired extra object creation.

If the MessageGroupProcessor 's processMessageGroup method returns a collection, it must be a collection of Message<?> s. In this case, the messages are released individually. Prior to version 4.2, it was not possible to provide a MessageGroupProcessor using XML configuration, only POJO methods could be used for aggregation. Now, if the framework detects that the referenced (or inner) bean implements MessageProcessor, it is used as the aggregator’s output processor.

If you wish to release a collection of objects from a custom MessageGroupProcessor as the payload of a message, your class should extend AbstractAggregatingMessageGroupProcessor and implement aggregatePayloads().

Also, since version 4.2, a SimpleMessageGroupProcessor is provided; which simply returns the collection of messages from the group, which, as indicated above, causes the released messages to be sent individually.

This allows the aggregator to work as a message barrier where arriving messages are held until the release strategy fires, and the group is released, as a sequence of individual messages.

ReleaseStrategy

The ReleaseStrategy interface is defined as follows:

public interface ReleaseStrategy {

  boolean canRelease(MessageGroup group);

}

In general, any POJO can implement the completion decision logic if it provides a method that accepts a single java.util.List as an argument (parameterized lists are supported as well), and returns a boolean value. This method will be invoked after the arrival of each new message, to decide whether the group is complete or not, as follows:

  • if the argument is a java.util.List<T>, and the parameter type T is assignable to Message, then the whole list of messages accumulated in the group will be sent to the method
  • if the argument is a non-parametrized java.util.List or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages
  • the method must return true if the message group is ready for aggregation, and false otherwise.

For example:

public class MyReleaseStrategy {

    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
public class MyReleaseStrategy {

    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<String>) {...}
}

As you can see based on the above signatures, the POJO-based Release Strategy will be passed a Collection of not-yet-released Messages (if you need access to the whole Message) or a Collection of payload objects (if the type parameter is anything other than Message). Typically this would satisfy the majority of use cases. However if, for some reason, you need to access the full MessageGroup then you should simply provide an implementation of the ReleaseStrategy interface.

[Warning]Warning

When handling potentially large groups, it is important to understand how these methods are invoked because the release strategy may be invoked multiple times before the group is released. The most efficient is an implementation of ReleaseStrategy because the aggregator can invoke it directly. The second most efficient is a POJO method with a Collection<Message<?>> parameter type. The least efficient is a POJO method with a Collection<Foo> type - the framework has to copy the payloads from the messages in the group into a new collection (and possibly attempt conversion on the payloads to Foo) every time the release strategy is called. Collection<?> avoids the conversion but still requires creating the new Collection.

For these reasons, for large groups, it is recommended that you implement ReleaseStrategy.

When the group is released for aggregation, all its not-yet-released messages are processed and removed from the group. If the group is also complete (i.e. if all messages from a sequence have arrived or if there is no sequence defined), then the group is marked as complete. Any new messages for this group will be sent to the discard channel (if defined). Setting expire-groups-upon-completion to true (default is false) removes the entire group and any new messages, with the same correlation id as the removed group, will form a new group. Partial sequences can be released by using a MessageGroupStoreReaper together with send-partial-result-on-expiry being set to true.

[Important]Important

To facilitate discarding of late-arriving messages, the aggregator must maintain state about the group after it has been released. This can eventually cause out of memory conditions. To avoid such situations, you should consider configuring a MessageGroupStoreReaper to remove the group metadata; the expiry parameters should be set to expire groups after it is not expected that late messages will arrive. For information about configuring a reaper, see Section 6.4.5, “Managing State in an Aggregator: MessageGroupStore”.

Spring Integration provides an out-of-the box implementation for ReleaseStrategy, the SimpleSequenceSizeReleaseStrategy. This implementation consults the SEQUENCE_NUMBER and SEQUENCE_SIZE headers of each arriving message to decide when a message group is complete and ready to be aggregated. As shown above, it is also the default strategy.

[Note]Note

Before version 5.0, the default release strategy was SequenceSizeReleaseStrategy which does not perform well with large groups. With that strategy, duplicate sequence numbers are detected and rejected; this operation can be expensive.

If you are aggregating large groups, you don’t need to release partial groups, and you don’t need to detect/reject duplicate sequences, consider using the SimpleSequenceSizeReleaseStrategy instead - it is much more efficient for these use cases, and is the default since version 5.0 when partial group release is not specified.

Aggregating Large Groups

The 4.3 release changed the default Collection for messages in a SimpleMessageGroup to HashSet (it was previously a BlockingQueue). This was expensive when removing individual messages from large groups (an O(n) linear scan was required). Although the hash set is generally much faster for removing, it can be expensive for large messages because the hash has to be calculated (on both inserts and removes). If you have messages that are expensive to hash, consider using some other collection type. As discussed in Section 10.4.1, “MessageGroupFactory”, a SimpleMessageGroupFactory is provided so you can select the Collection that best suits your needs. You can also provide your own factory implementation to create some other Collection<Message<?>>.

Here is an example of how to configure an aggregator with the previous implementation and a SimpleSequenceSizeReleaseStrategy.

<int:aggregator input-channel="aggregate"
    output-channel="out" message-store="store" release-strategy="releaser" />

<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
    <property name="messageGroupFactory">
        <bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
            <constructor-arg value="BLOCKING_QUEUE"/>
        </bean>
    </property>
</bean>

<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />

CorrelationStrategy

The CorrelationStrategy interface is defined as follows:

public interface CorrelationStrategy {

  Object getCorrelationKey(Message<?> message);

}

The method returns an Object which represents the correlation key used for associating the message with a message group. The key must satisfy the criteria used for a key in a Map with respect to the implementation of equals() and hashCode().

In general, any POJO can implement the correlation logic, and the rules for mapping a message to a method’s argument (or arguments) are the same as for a ServiceActivator (including support for @Header annotations). The method must return a value, and the value must not be null.

Spring Integration provides an out-of-the box implementation for CorrelationStrategy, the HeaderAttributeCorrelationStrategy. This implementation returns the value of one of the message headers (whose name is specified by a constructor argument) as the correlation key. By default, the correlation strategy is a HeaderAttributeCorrelationStrategy returning the value of the CORRELATION_ID header attribute. If you have a custom header name you would like to use for correlation, then simply configure that on an instance of HeaderAttributeCorrelationStrategy and provide that as a reference for the Aggregator’s correlation-strategy.

LockRegistry

Changes to groups are thread safe; a LockRegistry is used to obtain a lock for the resolved correlation id. A DefaultLockRegistry is used by default (in-memory). For synchronizing updates across servers, where a shared MessageGroupStore is being used, a shared lock registry must be configured. See Section 6.4.4, “Configuring an Aggregator” below for more information.

6.4.4 Configuring an Aggregator

See Section 9.11, “Aggregators and Resequencers” for configuring an Aggregator in Java DSL.

Configuring an Aggregator with XML

Spring Integration supports the configuration of an aggregator via XML through the <aggregator/> element. Below you can see an example of an aggregator.

<channel id="inputChannel"/>

<int:aggregator id="myAggregator"  1
        auto-startup="true"  2
        input-channel="inputChannel"  3
        output-channel="outputChannel"  4
        discard-channel="throwAwayChannel"  5
        message-store="persistentMessageStore"  6
        order="1"  7
        send-partial-result-on-expiry="false"  8
        send-timeout="1000"  9

        correlation-strategy="correlationStrategyBean"  10
        correlation-strategy-method="correlate"  11
        correlation-strategy-expression="headers['foo']"  12

        ref="aggregatorBean"  13
        method="aggregate"  14

        release-strategy="releaseStrategyBean"  15
        release-strategy-method="release"  (16)
        release-strategy-expression="size() == 5"  (17)

        expire-groups-upon-completion="false"  (18)
        empty-group-min-timeout="60000"  (19)

        lock-registry="lockRegistry"  (20)

        group-timeout="60000"  (21)
        group-timeout-expression="size() ge 2 ? 100 : -1"  (22)
        expire-groups-upon-timeout="true"  (23)

        scheduler="taskScheduler" >  (24)
            <expire-transactional/>  (25)
            <expire-advice-chain/>  (26)
</aggregator>

<int:channel id="outputChannel"/>

<int:channel id="throwAwayChannel"/>

<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
    <constructor-arg ref="dataSource"/>
</bean>

<bean id="aggregatorBean" class="sample.PojoAggregator"/>

<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>

<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>

1

The id of the aggregator is Optional.

2

Lifecycle attribute signaling if aggregator should be started during Application Context startup. Optional (default is true).

3

The channel from which where aggregator will receive messages. Required.

4

The channel to which the aggregator will send the aggregation results. Optional (because incoming messages can specify a reply channel themselves via replyChannel Message Header).

5

The channel to which the aggregator will send the messages that timed out (if send-partial-result-on-expiry is false). Optional.

6

A reference to a MessageGroupStore used to store groups of messages under their correlation key until they are complete. Optional, by default a volatile in-memory store.

7

Order of this aggregator when more than one handle is subscribed to the same DirectChannel (use for load balancing purposes). Optional.

8

Indicates that expired messages should be aggregated and sent to the output-channel or replyChannel once their containing MessageGroup is expired (see MessageGroupStore.expireMessageGroups(long)). One way of expiring MessageGroup s is by configuring a MessageGroupStoreReaper. However MessageGroup s can alternatively be expired by simply calling MessageGroupStore.expireMessageGroups(timeout). That could be accomplished via a Control Bus operation or by simply invoking that method if you have a reference to the MessageGroupStore instance. Otherwise by itself this attribute has no behavior. It only serves as an indicator of what to do (discard or send to the output/reply channel) with Messages that are still in the MessageGroup that is about to be expired. Optional. Default - false. NOTE: This attribute is more properly send-partial-result-on-timeout because the group may not actually expire if expire-groups-upon-timeout is set to false.

9

The timeout interval to wait when sending a reply Message to the output-channel or discard-channel. Defaults to -1 - blocking indefinitely. It is applied only if the output channel has some sending limitations, e.g. QueueChannel with a fixed capacity. In this case a MessageDeliveryException is thrown. The send-timeout is ignored in case of AbstractSubscribableChannel implementations. In case of group-timeout(-expression) the MessageDeliveryException from the scheduled expire task leads this task to be rescheduled. Optional.

10

A reference to a bean that implements the message correlation (grouping) algorithm. The bean can be an implementation of the CorrelationStrategy interface or a POJO. In the latter case the correlation-strategy-method attribute must be defined as well. Optional (by default, the aggregator will use the IntegrationMessageHeaderAccessor.CORRELATION_ID header).

11

A method defined on the bean referenced by correlation-strategy, that implements the correlation decision algorithm. Optional, with restrictions (requires correlation-strategy to be present).

12

A SpEL expression representing the correlation strategy. Example: "headers['foo']". Only one of correlation-strategy or correlation-strategy-expression is allowed.

13

A reference to a bean defined in the application context. The bean must implement the aggregation logic as described above. Optional (by default the list of aggregated Messages will become a payload of the output message).

14

A method defined on the bean referenced by ref, that implements the message aggregation algorithm. Optional, depends on ref attribute being defined.

15

A reference to a bean that implements the release strategy. The bean can be an implementation of the ReleaseStrategy interface or a POJO. In the latter case the release-strategy-method attribute must be defined as well. Optional (by default, the aggregator will use the IntegrationMessageHeaderAccessor.SEQUENCE_SIZE header attribute).

(16)

A method defined on the bean referenced by release-strategy, that implements the completion decision algorithm. Optional, with restrictions (requires release-strategy to be present).

(17)

A SpEL expression representing the release strategy; the root object for the expression is a MessageGroup. Example: "size() == 5". Only one of release-strategy or release-strategy-expression is allowed.

(18)

When set to true (default false), completed groups are removed from the message store, allowing subsequent messages with the same correlation to form a new group. The default behavior is to send messages with the same correlation as a completed group to the discard-channel.

(19)

Only applies if a MessageGroupStoreReaper is configured for the <aggregator>'s MessageStore. By default, when a MessageGroupStoreReaper is configured to expire partial groups, empty groups are also removed. Empty groups exist after a group is released normally. This is to enable the detection and discarding of late-arriving messages. If you wish to expire empty groups on a longer schedule than expiring partial groups, set this property. Empty groups will then not be removed from the MessageStore until they have not been modified for at least this number of milliseconds. Note that the actual time to expire an empty group will also be affected by the reaper’s timeout property and it could be as much as this value plus the timeout.

(20)

A reference to a org.springframework.integration.util.LockRegistry bean; used to obtain a Lock based on the groupId for concurrent operations on the MessageGroup. By default, an internal DefaultLockRegistry is used. Use of a distributed LockRegistry, such as the ZookeeperLockRegistry, ensures only one instance of the aggregator will operate on a group concurrently. See Section 25.11, “Redis Lock Registry”, Section 17.6, “Gemfire Lock Registry”, Section 39.3, “Zookeeper Lock Registry” for more information.

(21)

A timeout in milliseconds to force the MessageGroup complete, when the ReleaseStrategy doesn’t release the group when the current Message arrives. This attribute provides a built-in Time-base Release Strategy for the aggregator, when there is a need to emit a partial result (or discard the group), if a new Message does not arrive for the MessageGroup within the timeout. When a new Message arrives at the aggregator, any existing ScheduledFuture<?> for its MessageGroup is canceled. If the ReleaseStrategy returns false (don’t release) and the groupTimeout > 0 a new task will be scheduled to expire the group. Setting this attribute to zero is not advised because it will effectively disable the aggregator because every message group will be immediately completed. It is possible, however to conditionally set it to zero using an expression; see group-timeout-expression for information. The action taken during the completion depends on the ReleaseStrategy and the send-partial-group-on-expiry attribute. See the section called “Aggregator and Group Timeout” for more information. Mutually exclusive with group-timeout-expression attribute.

(22)

The SpEL expression that evaluates to a groupTimeout with the MessageGroup as the #root evaluation context object. Used for scheduling the MessageGroup to be forced complete. If the expression evaluates to null or < 0, the completion is not scheduled. If it evaluates to zero, the group is completed immediately on the current thread. In effect, this provides a dynamic group-timeout property. See group-timeout for more information. Mutually exclusive with group-timeout attribute.

(23)

When a group is completed due to a timeout (or by a MessageGroupStoreReaper), the group is expired (completely removed) by default. Late arriving messages will start a new group. Set this to false to complete the group but have its metadata remain so that late arriving messages will be discarded. Empty groups can be expired later using a MessageGroupStoreReaper together with the empty-group-min-timeout attribute. Default: true.

(24)

A TaskScheduler bean reference to schedule the MessageGroup to be forced complete if no new message arrives for the MessageGroup within the groupTimeout. If not provided, the default scheduler taskScheduler, registered in the ApplicationContext (ThreadPoolTaskScheduler) will be used. This attribute does not apply if group-timeout or group-timeout-expression is not specified.

(25)

Since version 4.1. Allows a transaction to be started for the forceComplete operation. It is initiated from a group-timeout(-expression) or by a MessageGroupStoreReaper and is not applied to the normal add/release/discard operations. Only this sub-element or <expire-advice-chain/> is allowed.

(26)

Since version 4.1. Allows the configuration of any Advice for the forceComplete operation. It is initiated from a group-timeout(-expression) or by a MessageGroupStoreReaper and is not applied to the normal add/release/discard operations. Only this sub-element or <expire-transactional/> is allowed. A transaction Advice can also be configured here using the Spring tx namespace.

[Important]Expiring Groups

There are two attributes related to expiring (completely removing) groups. When a group is expired, there is no record of it and if a new message arrives with the same correlation, a new group is started. When a group is completed (without expiry), the empty group remains and late arriving messages are discarded. Empty groups can be removed later using a MessageGroupStoreReaper in combination with the empty-group-min-timeout attribute.

expire-groups-upon-completion relates to "normal" completion - when the ReleaseStrategy releases the group. This defaults to false.

If a group is not completed normally, but is released or discarded because of a timeout, the group is normally expired. Since version 4.1, you can now control this behavior using expire-groups-upon-timeout; this defaults to true for backwards compatibility.

[Note]Note

When a group is timed out, the ReleaseStrategy is given one more opportunity to release the group; if it does so, and expire-groups-upon-timeout is false, then expiration is controlled by expire-groups-upon-completion. If the group is not released by the release strategy during timeout, then the expiration is controlled by the expire-groups-upon-timeout. Timed-out groups are either discarded, or a partial release occurs (based on send-partial-result-on-expiry).

Starting with version 5.0 empty groups are also scheduled for removal after empty-group-min-timeout. If expireGroupsUponCompletion == false and minimumTimeoutForEmptyGroups > 0, the task to remove the group is scheduled, when normal or partial sequences release happens.

Using a ref attribute is generally recommended if a custom aggregator handler implementation may be referenced in other <aggregator> definitions. However if a custom aggregator implementation is only being used by a single definition of the <aggregator>, you can use an inner bean definition (starting with version 1.0.3) to configure the aggregation POJO within the <aggregator> element:

<aggregator input-channel="input" method="sum" output-channel="output">
    <beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
[Note]Note

Using both a ref attribute and an inner bean definition in the same <aggregator> configuration is not allowed, as it creates an ambiguous condition. In such cases, an Exception will be thrown.

An example implementation of the aggregator bean looks as follows:

public class PojoAggregator {

  public Long add(List<Long> results) {
    long total = 0l;
    for (long partialResult: results) {
      total += partialResult;
    }
    return total;
  }
}

An implementation of the completion strategy bean for the example above may be as follows:

public class PojoReleaseStrategy {
...
  public boolean canRelease(List<Long> numbers) {
    int sum = 0;
    for (long number: numbers) {
      sum += number;
    }
    return sum >= maxValue;
  }
}
[Note]Note

Wherever it makes sense, the release strategy method and the aggregator method can be combined in a single bean.

An implementation of the correlation strategy bean for the example above may be as follows:

public class PojoCorrelationStrategy {
...
  public Long groupNumbersByLastDigit(Long number) {
    return number % 10;
  }
}

For example, this aggregator would group numbers by some criterion (in our case the remainder after dividing by 10) and will hold the group until the sum of the numbers provided by the payloads exceeds a certain value.

[Note]Note

Wherever it makes sense, the release strategy method, correlation strategy method and the aggregator method can be combined in a single bean (all of them or any two).

Aggregators and Spring Expression Language (SpEL)

Since Spring Integration 2.0, the various strategies (correlation, release, and aggregation) may be handled with SpEL which is recommended if the logic behind such release strategy is relatively simple. Let’s say you have a legacy component that was designed to receive an array of objects. We know that the default release strategy will assemble all aggregated messages in the List. So now we have two problems. First we need to extract individual messages from the list, and then we need to extract the payload of each message and assemble the array of objects (see code below).

public String[] processRelease(List<Message<String>> messages){
    List<String> stringList = new ArrayList<String>();
    for (Message<String> message : messages) {
        stringList.add(message.getPayload());
    }
    return stringList.toArray(new String[]{});
}

However, with SpEL such a requirement could actually be handled relatively easily with a one-line expression, thus sparing you from writing a custom class and configuring it as a bean.

<int:aggregator input-channel="aggChannel"
    output-channel="replyChannel"
    expression="#this.![payload].toArray()"/>

In the above configuration we are using a Collection Projection expression to assemble a new collection from the payloads of all messages in the list and then transforming it to an Array, thus achieving the same result as the java code above.

The same expression-based approach can be applied when dealing with custom Release and Correlation strategies.

Instead of defining a bean for a custom CorrelationStrategy via the correlation-strategy attribute, you can implement your simple correlation logic via a SpEL expression and configure it via the correlation-strategy-expression attribute.

For example:

correlation-strategy-expression="payload.person.id"

In the above example it is assumed that the payload has an attribute person with an id which is going to be used to correlate messages.

Likewise, for the ReleaseStrategy you can implement your release logic as a SpEL expression and configure it via the release-strategy-expression attribute. The root object for evaluation context is the MessageGroup itself. The List of messages can be referenced using the message property of the group within the expression.

[Note]Note

In releases prior to version 5.0, the root object was the collection of Message<?>.

For example:

release-strategy-expression="!messages.?[payload==5].empty"

In this example the root object of the SpEL Evaluation Context is the MessageGroup itself, and you are simply stating that as soon as there are a message with payload as 5 in this group, it should be released.

Aggregator and Group Timeout

Starting with version 4.0, two new mutually exclusive attributes have been introduced: group-timeout and group-timeout-expression (see the description above). There are some cases where it is needed to emit the aggregator result (or discard the group) after a timeout if the ReleaseStrategy doesn’t release when the current Message arrives. For this purpose the groupTimeout option allows scheduling the MessageGroup to be forced complete:

<aggregator input-channel="input" output-channel="output"
        send-partial-result-on-expiry="true"
        group-timeout-expression="size() ge 2 ? 10000 : -1"
        release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>

With this example, the normal release will be possible if the aggregator receives the last message in sequence as defined by the release-strategy-expression. If that specific message does not arrive, the groupTimeout will force the group complete after 10 seconds as long as the group contains at least 2 Messages.

The results of forcing the group complete depends on the ReleaseStrategy and the send-partial-result-on-expiry. First, the release strategy is again consulted to see if a normal release is to be made - while the group won’t have changed, the ReleaseStrategy can decide to release the group at this time. If the release strategy still does not release the group, it will be expired. If send-partial-result-on-expiry is true, existing messages in the (partial) MessageGroup will be released as a normal aggregator reply Message to the output-channel, otherwise it will be discarded.

There is a difference between groupTimeout behavior and MessageGroupStoreReaper (see Section 6.4.4, “Configuring an Aggregator”). The reaper initiates forced completion for all MessageGroup s in the MessageGroupStore periodically. The groupTimeout does it for each MessageGroup individually, if a new Message doesn’t arrive during the groupTimeout. Also, the reaper can be used to remove empty groups (empty groups are retained in order to discard late messages, if expire-groups-upon-completion is false).

Configuring an Aggregator with Annotations

An aggregator configured using annotations would look like this.

public class Waiter {
  ...

  @Aggregator  1
  public Delivery aggregatingMethod(List<OrderItem> items) {
    ...
  }

  @ReleaseStrategy  2
  public boolean releaseChecker(List<Message<?>> messages) {
    ...
  }

  @CorrelationStrategy  3
  public String correlateBy(OrderItem item) {
    ...
  }
}

1

An annotation indicating that this method shall be used as an aggregator. Must be specified if this class will be used as an aggregator.

2

An annotation indicating that this method shall be used as the release strategy of an aggregator. If not present on any method, the aggregator will use the SimpleSequenceSizeReleaseStrategy.

3

An annotation indicating that this method shall be used as the correlation strategy of an aggregator. If no correlation strategy is indicated, the aggregator will use the HeaderAttributeCorrelationStrategy based on CORRELATION_ID.

All of the configuration options provided by the xml element are also available for the @Aggregator annotation.

The aggregator can be either referenced explicitly from XML or, if the @MessageEndpoint is defined on the class, detected automatically through classpath scanning.

Annotation configuration (@Aggregator and others) for the Aggregator component covers only simple use cases, where most default options are sufficient. If you need more control over those options using Annotation configuration, consider using a @Bean definition for the AggregatingMessageHandler and mark its @Bean method with @ServiceActivator:

@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
     AggregatingMessageHandler aggregator =
                       new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                                                 jdbcMessageGroupStore);
     aggregator.setOutputChannel(resultsChannel());
     aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
     aggregator.setTaskScheduler(this.taskScheduler);
     return aggregator;
}

See Section 6.4.3, “Programming model” and Section E.6.2, “Annotations on @Beans” for more information.

[Note]Note

Starting with the version 4.2 the AggregatorFactoryBean is available, to simplify Java configuration for the AggregatingMessageHandler.

6.4.5 Managing State in an Aggregator: MessageGroupStore

Aggregator (and some other patterns in Spring Integration) is a stateful pattern that requires decisions to be made based on a group of messages that have arrived over a period of time, all with the same correlation key. The design of the interfaces in the stateful patterns (e.g. ReleaseStrategy) is driven by the principle that the components (whether defined by the framework or a user) should be able to remain stateless. All state is carried by the MessageGroup and its management is delegated to the MessageGroupStore.

public interface MessageGroupStore {

    int getMessageCountForAllMessageGroups();

    int getMarkedMessageCountForAllMessageGroups();

    int getMessageGroupCount();

    MessageGroup getMessageGroup(Object groupId);

    MessageGroup addMessageToGroup(Object groupId, Message<?> message);

    MessageGroup markMessageGroup(MessageGroup group);

    MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);

    MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);

    void removeMessageGroup(Object groupId);

    void registerMessageGroupExpiryCallback(MessageGroupCallback callback);

    int expireMessageGroups(long timeout);
}

For more information please refer to the JavaDoc.

The MessageGroupStore accumulates state information in MessageGroups while waiting for a release strategy to be triggered, and that event might not ever happen. So to prevent stale messages from lingering, and for volatile stores to provide a hook for cleaning up when the application shuts down, the MessageGroupStore allows the user to register callbacks to apply to its MessageGroups when they expire. The interface is very straightforward:

public interface MessageGroupCallback {

    void execute(MessageGroupStore messageGroupStore, MessageGroup group);

}

The callback has direct access to the store and the message group so it can manage the persistent state (e.g. by removing the group from the store entirely).

The MessageGroupStore maintains a list of these callbacks which it applies, on demand, to all messages whose timestamp is earlier than a time supplied as a parameter (see the registerMessageGroupExpiryCallback(..) and expireMessageGroups(..) methods above).

The expireMessageGroups method can be called with a timeout value. Any message older than the current time minus this value will be expired, and have the callbacks applied. Thus it is the user of the store that defines what is meant by message group "expiry".

As a convenience for users, Spring Integration provides a wrapper for the message expiry in the form of a MessageGroupStoreReaper:

<bean id="reaper" class="org...MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="messageStore"/>
    <property name="timeout" value="30000"/>
</bean>

<task:scheduled-tasks scheduler="scheduler">
    <task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>

The reaper is a Runnable, and all that is happening in the example above is that the message group store’s expire method is being called once every 10 seconds. The timeout itself is 30 seconds.

[Note]Note

It is important to understand that the timeout property of the MessageGroupStoreReaper is an approximate value and is impacted by the the rate of the task scheduler since this property will only be checked on the next scheduled execution of the MessageGroupStoreReaper task. For example if the timeout is set for 10 min, but the MessageGroupStoreReaper task is scheduled to run every 60 min and the last execution of the MessageGroupStoreReaper task happened 1 min before the timeout, the MessageGroup will not expire for the next 59 min. So it is recommended to set the rate at least equal to the value of the timeout or shorter.

In addition to the reaper, the expiry callbacks are invoked when the application shuts down via a lifecycle callback in the AbstractCorrelatingMessageHandler.

The AbstractCorrelatingMessageHandler registers its own expiry callback, and this is the link with the boolean flag send-partial-result-on-expiry in the XML configuration of the aggregator. If the flag is set to true, then when the expiry callback is invoked, any unmarked messages in groups that are not yet released can be sent on to the output channel.

[Important]Important

When a shared MessageStore is used for different correlation endpoints, it is necessary to configure a proper CorrelationStrategy to ensure uniqueness for group ids. Otherwise unexpected behavior may happen when one correlation endpoint may release or expire messages from others - messages with the same correlation key are stored in the same message group.

Some MessageStore implementations allow using the same physical resources, by partitioning the data; for example, the JdbcMessageStore has a region property; the MongoDbMessageStore has a collectionName property.

For more information about MessageStore interface and its implementations, please read Section 10.4, “Message Store”.

6.5 Resequencer

6.5.1 Introduction

Related to the Aggregator, albeit different from a functional standpoint, is the Resequencer.

6.5.2 Functionality

The Resequencer works in a similar way to the Aggregator, in the sense that it uses the CORRELATION_ID to store messages in groups, the difference being that the Resequencer does not process the messages in any way. It simply releases them in the order of their SEQUENCE_NUMBER header values.

With respect to that, the user might opt to release all messages at once (after the whole sequence, according to the SEQUENCE_SIZE, has been released), or as soon as a valid sequence is available.

[Important]Important

The resequencer is intended to resequence relatively short sequences of messages with small gaps. If you have a large number of disjoint sequences with many gaps, you may experience performance issues.

6.5.3 Configuring a Resequencer

See Section 9.11, “Aggregators and Resequencers” for configuring a Resequencer in Java DSL.

Configuring a resequencer requires only including the appropriate element in XML.

A sample resequencer configuration is shown below.

<int:channel id="inputChannel"/>

<int:channel id="outputChannel"/>

<int:resequencer id="completelyDefinedResequencer"  1
  input-channel="inputChannel"  2
  output-channel="outputChannel"  3
  discard-channel="discardChannel"  4
  release-partial-sequences="true"  5
  message-store="messageStore"  6
  send-partial-result-on-expiry="true"  7
  send-timeout="86420000"  8
  correlation-strategy="correlationStrategyBean"  9
  correlation-strategy-method="correlate"  10
  correlation-strategy-expression="headers['foo']"  11
  release-strategy="releaseStrategyBean"  12
  release-strategy-method="release"  13
  release-strategy-expression="size() == 10"  14
  empty-group-min-timeout="60000"  15

  lock-registry="lockRegistry"  (16)

  group-timeout="60000"  (17)
  group-timeout-expression="size() ge 2 ? 100 : -1"  (18)
  scheduler="taskScheduler" />  (19)
  expire-group-upon-timeout="false" />  (20)

1

The id of the resequencer is optional.

2

The input channel of the resequencer. Required.

3

The channel to which the resequencer will send the reordered messages. Optional.

4

The channel to which the resequencer will send the messages that timed out (if send-partial-result-on-timeout is false). Optional.

5

Whether to send out ordered sequences as soon as they are available, or only after the whole message group arrives. Optional (false by default).

6

A reference to a MessageGroupStore that can be used to store groups of messages under their correlation key until they are complete. Optional with default a volatile in-memory store.

7

Whether, upon the expiration of the group, the ordered group should be sent out (even if some of the messages are missing). Optional (false by default). See Section 6.4.5, “Managing State in an Aggregator: MessageGroupStore”.

8

The timeout interval to wait when sending a reply Message to the output-channel or discard-channel. Defaults to -1 - blocking indefinitely. It is applied only if the output channel has some sending limitations, e.g. QueueChannel with a fixed capacity. In this case a MessageDeliveryException is thrown. The send-timeout is ignored in case of AbstractSubscribableChannel implementations. In case of group-timeout(-expression) the MessageDeliveryException from the scheduled expire task leads this task to be rescheduled. Optional.

9

A reference to a bean that implements the message correlation (grouping) algorithm. The bean can be an implementation of the CorrelationStrategy interface or a POJO. In the latter case the correlation-strategy-method attribute must be defined as well. Optional (by default, the aggregator will use the IntegrationMessageHeaderAccessor.CORRELATION_ID header).

10

A method defined on the bean referenced by correlation-strategy, that implements the correlation decision algorithm. Optional, with restrictions (requires correlation-strategy to be present).

11

A SpEL expression representing the correlation strategy. Example: "headers['foo']". Only one of correlation-strategy or correlation-strategy-expression is allowed.

12

A reference to a bean that implements the release strategy. The bean can be an implementation of the ReleaseStrategy interface or a POJO. In the latter case the release-strategy-method attribute must be defined as well. Optional (by default, the aggregator will use the IntegrationMessageHeaderAccessor.SEQUENCE_SIZE header attribute).

13

A method defined on the bean referenced by release-strategy, that implements the completion decision algorithm. Optional, with restrictions (requires release-strategy to be present).

14

A SpEL expression representing the release strategy; the root object for the expression is a MessageGroup. Example: "size() == 5". Only one of release-strategy or release-strategy-expression is allowed.

15

Only applies if a MessageGroupStoreReaper is configured for the <resequencer> MessageStore. By default, when a MessageGroupStoreReaper is configured to expire partial groups, empty groups are also removed. Empty groups exist after a group is released normally. This is to enable the detection and discarding of late-arriving messages. If you wish to expire empty groups on a longer schedule than expiring partial groups, set this property. Empty groups will then not be removed from the MessageStore until they have not been modified for at least this number of milliseconds. Note that the actual time to expire an empty group will also be affected by the reaper’s timeout property and it could be as much as this value plus the timeout.

(16)

See the section called “Configuring an Aggregator with XML”.

(17)

See the section called “Configuring an Aggregator with XML”.

(18)

See the section called “Configuring an Aggregator with XML”.

(19)

See the section called “Configuring an Aggregator with XML”.

(20)

When a group is completed due to a timeout (or by a MessageGroupStoreReaper), the empty group’s metadata is retained by default. Late arriving messages will be immediately discarded. Set this to true to remove the group completely; then, late arriving messages will start a new group and won’t be discarded until the group again times out. The new group will never be released normally because of the "hole" in the sequence range that caused the timeout. Empty groups can be expired (completely removed) later using a MessageGroupStoreReaper together with the empty-group-min-timeout attribute. Starting with version 5.0 empty groups are also scheduled for removal after empty-group-min-timeout. Default: false.

[Note]Note

Since there is no custom behavior to be implemented in Java classes for resequencers, there is no annotation support for it.

6.6 Message Handler Chain

6.6.1 Introduction

The MessageHandlerChain is an implementation of MessageHandler that can be configured as a single Message Endpoint while actually delegating to a chain of other handlers, such as Filters, Transformers, Splitters, and so on. This can lead to a much simpler configuration when several handlers need to be connected in a fixed, linear progression. For example, it is fairly common to provide a Transformer before other components. Similarly, when providing a Filter before some other component in a chain, you are essentially creating a Selective Consumer. In either case, the chain only requires a single input-channel and a single output-channel eliminating the need to define channels for each individual component.

[Tip]Tip

Spring Integration’s Filter provides a boolean property throwExceptionOnRejection. When providing multiple Selective Consumers on the same point-to-point channel with different acceptance criteria, this value should be set to true (the default is false) so that the dispatcher will know that the Message was rejected and as a result will attempt to pass the Message on to other subscribers. If the Exception were not thrown, then it would appear to the dispatcher as if the Message had been passed on successfully even though the Filter had dropped the Message to prevent further processing. If you do indeed want to "drop" the Messages, then the Filter’s discard-channel might be useful since it does give you a chance to perform some operation with the dropped message (e.g. send to a JMS queue or simply write to a log).

The handler chain simplifies configuration while internally maintaining the same degree of loose coupling between components, and it is trivial to modify the configuration if at some point a non-linear arrangement is required.

Internally, the chain will be expanded into a linear setup of the listed endpoints, separated by anonymous channels. The reply channel header will not be taken into account within the chain: only after the last handler is invoked will the resulting message be forwarded on to the reply channel or the chain’s output channel. Because of this setup all handlers except the last required to implement the MessageProducer interface (which provides a setOutputChannel() method). The last handler only needs an output channel if the outputChannel on the MessageHandlerChain is set.

[Note]Note

As with other endpoints, the output-channel is optional. If there is a reply Message at the end of the chain, the output-channel takes precedence, but if not available, the chain handler will check for a reply channel header on the inbound Message as a fallback.

In most cases there is no need to implement MessageHandlers yourself. The next section will focus on namespace support for the chain element. Most Spring Integration endpoints, like Service Activators and Transformers, are suitable for use within a MessageHandlerChain.

6.6.2 Configuring a Chain

The <chain> element provides an input-channel attribute, and if the last element in the chain is capable of producing reply messages (optional), it also supports an output-channel attribute. The sub-elements are then filters, transformers, splitters, and service-activators. The last element may also be a router or an outbound-channel-adapter.

<int:chain input-channel="input" output-channel="output">
    <int:filter ref="someSelector" throw-exception-on-rejection="true"/>
    <int:header-enricher>
        <int:header name="foo" value="bar"/>
    </int:header-enricher>
    <int:service-activator ref="someService" method="someMethod"/>
</int:chain>

The <header-enricher> element used in the above example will set a message header named "foo" with a value of "bar" on the message. A header enricher is a specialization of Transformer that touches only header values. You could obtain the same result by implementing a MessageHandler that did the header modifications and wiring that as a bean, but the header-enricher is obviously a simpler option.

The <chain> can be configured as the last black-box consumer of the message flow. For this solution it is enough to put at the end of the <chain> some <outbound-channel-adapter>:

<int:chain input-channel="input">
    <int-xml:marshalling-transformer marshaller="marshaller" result-type="StringResult" />
    <int:service-activator ref="someService" method="someMethod"/>
    <int:header-enricher>
        <int:header name="foo" value="bar"/>
    </int:header-enricher>
    <int:logging-channel-adapter level="INFO" log-full-message="true"/>
</int:chain>

Disallowed Attributes and Elements

It is important to note that certain attributes, such as order and input-channel are not allowed to be specified on components used within a chain. The same is true for the poller sub-element.

[Important]Important

For the Spring Integration core components, the XML Schema itself will enforce some of these constraints. However, for non-core components or your own custom components, these constraints are enforced by the XML namespace parser, not by the XML Schema.

These XML namespace parser constraints were added with Spring Integration 2.2. The XML namespace parser will throw an BeanDefinitionParsingException if you try to use disallowed attributes and elements.

'id' Attribute

Beginning with Spring Integration 3.0, if a chain element is given an id, the bean name for the element is a combination of the chain’s id and the id of the element itself. Elements without an id are not registered as beans, but they are given componentName s that include the chain id. For example:

<int:chain id="fooChain" input-channel="input">
    <int:service-activator id="fooService" ref="someService" method="someMethod"/>
    <int:object-to-json-transformer/>
</int:chain>
  • The <chain> root element has an id fooChain. So, the AbstractEndpoint implementation (PollingConsumer or EventDrivenConsumer, depending on the input-channel type) bean takes this value as it’s bean name.
  • The MessageHandlerChain bean acquires a bean alias fooChain.handler, which allows direct access to this bean from the BeanFactory.
  • The <service-activator> is not a fully-fledged Messaging Endpoint (PollingConsumer or EventDrivenConsumer) - it is simply a MessageHandler within the <chain>. In this case, the bean name registered with the BeanFactory is fooChain$child.fooService.handler.
  • The componentName of this ServiceActivatingHandler takes the same value, but without the .handler suffix - fooChain$child.fooService.
  • The last <chain> sub-component, <object-to-json-transformer>, doesn’t have an id attribute. Its componentName is based on its position in the <chain>. In this case, it is fooChain$child#1. (The final element of the name is the order within the chain, beginning with #0). Note, this transformer isn’t registered as a bean within the application context, so, it doesn’t get a beanName, however its componentName has a value which is useful for logging etc.

The id attribute for <chain> elements allows them to be eligible for JMX export and they are trackable via Message History. They can also be accessed from the BeanFactory using the appropriate bean name as discussed above.

[Tip]Tip

It is useful to provide an explicit id attribute on <chain> s to simplify the identification of sub-components in logs, and to provide access to them from the BeanFactory etc.

Calling a Chain from within a Chain

Sometimes you need to make a nested call to another chain from within a chain and then come back and continue execution within the original chain. To accomplish this you can utilize a Messaging Gateway by including a <gateway> element. For example:

<int:chain id="main-chain" input-channel="in" output-channel="out">
    <int:header-enricher>
      <int:header name="name" value="Many" />
    </int:header-enricher>
    <int:service-activator>
      <bean class="org.foo.SampleService" />
    </int:service-activator>
    <int:gateway request-channel="inputA"/>  
</int:chain>

<int:chain id="nested-chain-a" input-channel="inputA">
    <int:header-enricher>
        <int:header name="name" value="Moe" />
    </int:header-enricher>
    <int:gateway request-channel="inputB"/> 
    <int:service-activator>
        <bean class="org.foo.SampleService" />
    </int:service-activator>
</int:chain>

<int:chain id="nested-chain-b" input-channel="inputB">
    <int:header-enricher>
        <int:header name="name" value="Jack" />
    </int:header-enricher>
    <int:service-activator>
        <bean class="org.foo.SampleService" />
    </int:service-activator>
</int:chain>

In the above example the nested-chain-a will be called at the end of main-chain processing by the gateway element configured there. While in nested-chain-a a call to a nested-chain-b will be made after header enrichment and then it will come back to finish execution in nested-chain-b. Finally the flow returns to the main-chain. When the nested version of a <gateway> element is defined in the chain, it does not require the service-interface attribute. Instead, it simple takes the message in its current state and places it on the channel defined via the request-channel attribute. When the downstream flow initiated by that gateway completes, a Message will be returned to the gateway and continue its journey within the current chain.

6.7 Scatter-Gather

6.7.1 Introduction

Starting with version 4.1, Spring Integration provides an implementation of the Scatter-Gather Enterprise Integration Pattern. It is a compound endpoint, where the goal is to send a message to the recipients and aggregate the results. Quoting the EIP Book, it is a component for scenarios like best quote, when we need to request information from several suppliers and decide which one provides us with the best term for the requested item.

Previously, the pattern could be configured using discrete components, this enhancement brings more convenient configuration.

The ScatterGatherHandler is a request-reply endpoint that combines a PublishSubscribeChannel (or RecipientListRouter) and an AggregatingMessageHandler. The request message is sent to the scatter channel and the ScatterGatherHandler waits for the reply from the aggregator to sends to the outputChannel.

6.7.2 Functionality

The Scatter-Gather pattern suggests two scenarios - Auction and Distribution. In both cases, the aggregation function is the same and provides all options available for the AggregatingMessageHandler. Actually the ScatterGatherHandler just requires an AggregatingMessageHandler as a constructor argument. See Section 6.4, “Aggregator” for more information.

Auction

The Auction Scatter-Gather variant uses publish-subscribe logic for the request message, where the scatter channel is a PublishSubscribeChannel with apply-sequence="true". However, this channel can be any MessageChannel implementation as is the case with the request-channel in the ContentEnricher (see Section 7.2, “Content Enricher”) but, in this case, the end-user should support his own custom correlationStrategy for the aggregation function.

Distribution

The Distribution Scatter-Gather variant is based on the RecipientListRouter (see the section called “RecipientListRouter”) with all available options for the RecipientListRouter. This is the second ScatterGatherHandler constructor argument. If you want to rely just on the default correlationStrategy for the recipient-list-router and the aggregator, you should specify apply-sequence="true". Otherwise, a custom correlationStrategy should be supplied for the aggregator. Unlike the PublishSubscribeChannel (Auction) variant, having a recipient-list-router selector option, we can filter target suppliers based on the message. With apply-sequence="true" the default sequenceSize will be supplied and the aggregator will be able to release the group correctly. The Distribution option is mutually exclusive with the Auction option.

In both cases, the request (scatter) message is enriched with the gatherResultChannel QueueChannel header, to wait for a reply message from the aggregator.

By default, all suppliers should send their result to the replyChannel header (usually by omitting the output-channel from the ultimate endpoint). However, the gatherChannel option is also provided, allowing suppliers to send their reply to that channel for the aggregation.

6.7.3 Configuring a Scatter-Gather Endpoint

For Java and Annotation configuration, the bean definition for the Scatter-Gather is:

@Bean
public MessageHandler distributor() {
    RecipientListRouter router = new RecipientListRouter();
    router.setApplySequence(true);
    router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
            distributionChannel3()));
    return router;
}

@Bean
public MessageHandler gatherer() {
	return new AggregatingMessageHandler(
			new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
			new SimpleMessageStore(),
			new HeaderAttributeCorrelationStrategy(
			       IntegrationMessageHeaderAccessor.CORRELATION_ID),
			new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}

@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
	ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
	handler.setOutputChannel(output());
	return handler;
}

Here, we configure the RecipientListRouter distributor bean, with applySequence="true" and the list of recipient channels. The next bean is for an AggregatingMessageHandler. Finally, we inject both those beans into the ScatterGatherHandler bean definition and mark it as a @ServiceActivator to wire the Scatter-Gather component into the integration flow.

Configuring the <scatter-gather> endpoint using the XML namespace:

<scatter-gather
		id=""  1
		auto-startup=""  2
		input-channel=""  3
		output-channel=""  4
		scatter-channel=""  5
		gather-channel=""  6
		order=""  7
		phase=""  8
		send-timeout=""  9
		gather-timeout=""  10
		requires-reply="" > 11
			<scatterer/>  12
			<gatherer/>  13
</scatter-gather>

1

The id of the Endpoint. The ScatterGatherHandler bean is registered with id + '.handler' alias. The RecipientListRouter - with id + '.scatterer'. And the AggregatingMessageHandler with id + '.gatherer'. Optional (a default id is generated value by BeanFactory).

2

Lifecycle attribute signaling if the Endpoint should be started during Application Context initialization. In addition, the ScatterGatherHandler also implements Lifecycle and starts/stops the gatherEndpoint, which is created internally if a gather-channel is provided. Optional (default is true).

3

The channel to receive request messages to handle them in the ScatterGatherHandler. Required.

4

The channel to which the Scatter-Gather will send the aggregation results. Optional (because incoming messages can specify a reply channel themselves via replyChannel Message Header).

5

The channel to send the scatter message for the Auction scenario. Optional. Mutually exclusive with <scatterer> sub-element.

6

The channel to receive replies from each supplier for the aggregation. is used as the replyChannel header in the scatter message. Optional. By default the FixedSubscriberChannel is created.

7

Order of this component when more than one handler is subscribed to the same DirectChannel (use for load balancing purposes). Optional.

8

Specify the phase in which the endpoint should be started and stopped. The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that. By default this value is Integer.MAX_VALUE meaning that this container starts as late as possible and stops as soon as possible. Optional.

9

The timeout interval to wait when sending a reply Message to the output-channel. By default the send will block for one second. It applies only if the output channel has some sending limitations, e.g. a QueueChannel with a fixed capacity and is full. In this case, a MessageDeliveryException is thrown. The send-timeout is ignored in case of AbstractSubscribableChannel implementations. In case of group-timeout(-expression) the MessageDeliveryException from the scheduled expire task leads this task to be rescheduled. Optional.

10

Allows you to specify how long the Scatter-Gather will wait for the reply message before returning. By default it will wait indefinitely. null is returned if the reply times out. Optional. Defaults to -1 - indefinitely.

11

Specify whether the Scatter-Gather must return a non-null value. This value is true by default, hence a ReplyRequiredException will be thrown when the underlying aggregator returns a null value after gather-timeout. Note, if null is a possibility, the gather-timeout should be specified to avoid an indefinite wait.

12

The <recipient-list-router> options. Optional. Mutually exclusive with scatter-channel attribute.

13

The <aggregator> options. Required.

6.8 Thread Barrier

Sometimes, we need to suspend a message flow thread until some other asynchronous event occurs. For example, consider an HTTP request that publishes a message to RabbitMQ. We might wish to not reply to the user until the RabbitMQ broker has issued an acknowledgment that the message was received.

Spring Integration version 4.2 introduced the <barrier/> component for this purpose. The underlying MessageHandler is the BarrierMessageHandler; this class also implements MessageTriggerAction where a message passed to the trigger() method releases a corresponding thread in the handleRequestMessage() method (if present).

The suspended thread and trigger thread are correlated by invoking a CorrelationStrategy on the messages. When a message is sent to the input-channel, the thread is suspended for up to timeout milliseconds, waiting for a corresponding trigger message. The default correlation strategy uses the IntegrationMessageHeaderAccessor.CORRELATION_ID header. When a trigger message arrives with the same correlation, the thread is released. The message sent to the output-channel after release is constructed using a MessageGroupProcessor. By default, the message is a Collection<?> of the two payloads and the headers are merged, using a DefaultAggregatingMessageGroupProcessor.

[Caution]Caution

If the trigger() method is invoked first (or after the main thread times out), it will be suspended for up to timeout waiting for the suspending message to arrive. If you do not want to suspend the trigger thread, consider handing off to a TaskExecutor instead so its thread will be suspended instead.

The requires-reply property determines the action if the suspended thread times out before the trigger message arrives. By default, it is false which means the endpoint simply returns null, the flow ends and the thread returns to the caller. When true, a ReplyRequiredException is thrown.

You can call the trigger() method programmatically (obtain the bean reference using the name barrier.handler - where barrier is the bean name of the barrier endpoint) or you can configure an <outbound-channel-adapter/> to trigger the release.

[Important]Important

Only one thread can be suspended with the same correlation; the same correlation can be used multiple times but only once concurrently. An exception is thrown if a second thread arrives with the same correlation.

<int:barrier id="barrier1" input-channel="in" output-channel="out"
        correlation-strategy-expression="headers['myHeader']"
        output-processor="myOutputProcessor"
        discard-channel="lateTriggerChannel"
        timeout="10000">
</int:barrier>

<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />

In this example, a custom header is used for correlation. Either the thread sending a message to in or the one sending a message to release will wait for up to 10 seconds until the other arrives. When the message is released, the out channel will be sent a message combining the result of invoking the custom MessageGroupProcessor bean myOutputProcessor. If the main thread times out and a trigger arrives later, you can configure a discard channel to which the late trigger will be sent. Java configuration is shown below.

@Configuration
@EnableIntegration
public class Config {

    @ServiceActivator(inputChannel="in")
    @Bean
    public BarrierMessageHandler barrier() {
        BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
        barrier.setOutputChannel(out());
        barrier.setDiscardChannel(lateTriggers());
        return barrier;
    }

    @ServiceActivator (inputChannel="release")
    @Bean
    public MessageHandler releaser() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                barrier().trigger(message);
            }

        };
    }

}

See the barrier sample application for an example of this component.