4. Message Routing

4.1 Router

4.1.1 Router Implementations

Since content-based routing often requires some domain-specific logic, most use-cases will require Spring Integration's options for delegating to POJOs using the XML namespace support and/or Annotations. Both of these are discussed below, but first we present a couple implementations that are available out-of-the-box since they fulfill generic, but common, requirements.

PayloadTypeRouter

A PayloadTypeRouter will send Messages to the channel as defined by payload-type mappings.

<bean id="payloadTypeRouter" class="org.springframework.integration.router.PayloadTypeRouter">
    <property name="payloadTypeChannelMap">
        <map>
            <entry key="java.lang.String" value-ref="stringChannel"/>
            <entry key="java.lang.Integer" value-ref="integerChannel"/>
        </map>
    </property>
</bean>

Configuration of PayloadTypeRouter is also supported via the namespace provided by Spring Integration (see Section B.2, “Namespace Support”), which essentially simplifies configuration by combining <router/> configuration and its corresponding implementation defined using <bean/> element into a single and more concise configuration element. The example below demonstrates PayloadTypeRouter configuration which is equivalent to the one above using Spring Integration's namespace support:

<payload-type-router input-channel="routingChannel">
  <mapping type="java.lang.String" channel="stringChannel" />
  <mapping type="java.lang.Integer" channel="integerChannel" />
</payload-type-router>

HeaderValueRouter

A HeaderValueRouter will send Messages to the channel based on the individual header value mappings. When HeaderValueRouter is created it is initialized with the name of the header to be evaluated, using constructor-arg. The value of the header could be one of two things:

1. Arbitrary value

2. Channel name

If arbitrary value, then a channelResolver should be provided to map header values to channel names. The example below uses MapBasedChannelResolver to set up a map of header values to channel names.

 <bean id="myHeaderValueRouter"
    class="org.springframework.integration.router.HeaderValueRouter">
  <constructor-arg value="someHeaderName" />
  <property name="channelResolver">
    <bean class="org.springframework.integration.channel.MapBasedChannelResolver">
      <property name="channelMap">
        <map>
          <entry key="someHeaderValue" value-ref="channelA" />
          <entry key="someOtherHeaderValue" value-ref="channelB" />
        </map>
      </property>
    </bean>
  </property>
</bean>

If channelResolver is not specified, then the header value will be treated as a channel name making configuration much simpler, where no channelResolver needs to be specified.

<bean id="myHeaderValueRouter"
  class="org.springframework.integration.router.HeaderValueRouter">
  <constructor-arg value="someHeaderName" />
</bean>

Similar to the PayloadTypeRouter, configuration of HeaderValueRouter is also supported via namespace support provided by Spring Integration (see Section B.2, “Namespace Support”). The example below demonstrates two types of namespace-based configuration of HeaderValueRouter which are equivalent to the ones above using Spring Integration namespace support:

1. Configuration where mapping of header values to channels is required

<header-value-router input-channel="routingChannel" header-name="testHeader">
  <mapping value="someHeaderValue" channel="channelA" />
  <mapping value="someOtherHeaderValue" channel="channelB" />
</header-value-router>

2. Configuration where mapping of header values is not required if header values themselves represent the channel names

<header-value-router input-channel="routingChannel" header-name="testHeader"/>

[Note]Note
The two router implementations shown above share some common properties, such as "defaultOutputChannel" and "resolutionRequired". If "resolutionRequired" is set to "true", and the router is unable to determine a target channel (e.g. there is no matching payload for a PayloadTypeRouter and no "defaultOutputChannel" has been specified), then an Exception will be thrown.

RecipientListRouter

A RecipientListRouter will send each received Message to a statically-defined list of Message Channels:

<bean id="recipientListRouter" class="org.springframework.integration.router.RecipientListRouter">
    <property name="channels">
        <list>
            <ref bean="channel1"/>
            <ref bean="channel2"/>
            <ref bean="channel3"/>
        </list>
    </property>
</bean>

Configuration for RecipientListRouter is also supported via namespace support provided by Spring Integration (see Section B.2, “Namespace Support”). The example below demonstrates namespace-based configuration of RecipientListRouter and all the supported attributes using Spring Integration namespace support:

<recipient-list-router id="customRouter" input-channel="routingChannel"
        timeout="1234"
        ignore-send-failures="true"
        apply-sequence="true">
  <recipient channel="channel1"/>
  <recipient channel="channel2"/>
</recipient-list-router>

[Note]Note
The 'apply-sequence' flag here has the same affect as it does for a publish-subscribe-channel, and like publish-subscribe-channel it is disabled by default on the recipient-list-router. Refer to the section called “PublishSubscribeChannel Configuration” for more information.

Another convenient option to configure Recipient List Router is to use Spring Expression Language (SpEL) support

<int:recipient-list-router id="customRouter" input-channel="routingChannel">
	<int:recipient channel="channel1" selector-expression="payload.equals('foo')"/>
	<int:recipient channel="channel2" selector-expression="headers.contains('bar')"/>
</int:recipient-list-router>

In the above configuration a SpEL expression identified by selector-expression attribute will be evaluated to determine if this recipient should be included in the recipient list for a given input Message. The evaluation result of the expression must be a boolean. If this attribute is not defined, the channel will always be among the list of recipients.

4.1.2 Namespace support for Router - <router> element

The "router" element provides a simple way to connect a router to an input channel, and also accepts the optional default output channel. The "ref" may provide the bean name of a custom Router implementation (extending AbstractMessageRouter):

<router ref="payloadTypeRouter" input-channel="input1" default-output-channel="defaultOutput1"/>

<router ref="recipientListRouter" input-channel="input2" default-output-channel="defaultOutput2"/>

<router ref="customRouter" input-channel="input3" default-output-channel="defaultOutput3"/>

<beans:bean id="customRouterBean class="org.foo.MyCustomRouter"/>

Alternatively, the "ref" may point to a simple Object that contains the @Router annotation (see below), or the "ref" may be combined with an explicit "method" name. When specifying a "method", the same behavior applies as described in the @Router annotation section below.

<router input-channel="input" ref="somePojo" method="someMethod"/>

Using a "ref" attribute is generally recommended if the custom router implementation can be reused in other <router> definitions. However if the custom router implementation should be scoped to a concrete definition of the <router>, you can provide an inner bean definition:

<router method="someMethod" input-channel="input3" default-output-channel="defaultOutput3">
  <beans:bean class="org.foo.MyCustomRouter"/>
</router>

[Note]Note

Using both the "ref" attribute and an inner handler definition in the same <router> configuration is not allowed, as it creates an ambiguous condition and will result in an Exception being thrown.

Routers and Spring Expression Language (SpEL)

Some times the routing logic may be simple and writing a separate class for it and configuring it as a bean may seem like an overkill. Since Spring Integration 2.0 we offer an alternative where you can now use SpEL (http://static.springsource.org/spring/docs/3.0.x/spring-framework-reference/html/expressions.html) to implement simple computations that otherwise were implemented in a custom POJO router.

<int:router input-channel="inChannel" expression="payload + 'Channel'"/>

In the above configuration the result channel will be computed by the SpEL expression which simply concatenates the value of the payload with the literal 'Channel'

Another value of SpEL when it comes to configuring routers is that expression can actually return a Collection, thus making every <router> a Recipient List Router. Whenever expression returns multiple channel values Message will be forwarded to all such channels.

<int:router input-channel="inChannel" expression="headers.channels"/>

In the above configuration lets assume that you have a message header with the name 'channels' and the value being the List of channel names. Now, message will be sent to all channels in this list. You ,ay also fine Collection Projection and Collection Selection expressions usefull to select multiple channels. See (http://static.springsource.org/spring/docs/3.0.x/spring-framework-reference/html/expressions.html#d0e12084)

4.1.3 The @Router Annotation

When using the @Router annotation, the annotated method can return either the MessageChannel or String type. In the case of the latter, the endpoint will resolve the channel name as it does for the default output. Additionally, the method can return either a single value or a collection. When a collection is returned, the reply message will be sent to multiple channels. To summarize, the following method signatures are all valid.

@Router
public MessageChannel route(Message message) {...}

@Router
public List<MessageChannel> route(Message message) {...}

@Router
public String route(Foo payload) {...}

@Router
public List<String> route(Foo payload) {...}

In addition to payload-based routing, a common requirement is to route based on metadata available within the message header as either a property or attribute. Rather than requiring use of the Message type as the method parameter, the @Router annotation may also use the @Header parameter annotation that is documented in Section B.5, “Annotation Support”.

@Router
public List<String> route(@Header("orderStatus") OrderStatus status)

[Note]Note
For routing of XML-based Messages, including XPath support, see Chapter 23, XML Support - Dealing with XML Payloads.

4.1.4 Dynamic Routers

So as you can see, Spring Integration provides quite a few different router configurations for most common content-based routing use cases as well as the option of implementing custom routers as POJOs. For example; Payload Type Router provides a simple way to configure a router which computes channels based on the payload type of the incoming Message while Header Value Router provides the same convenience in configuring a router which computes channels based on evaluating the value of a particular Message Header. There is also an expression-based (SpEL) routers where the channel is determined based on evaluating an expression which gives these type of routers some dynamic characteristics.

However these routers share one common attribute - static configuration. Even in the case of expression-based routers, the expression itself is defined as part of the router configuration which means that the same expression operating on the same value will always result in the computation of the same channel. This is good in most cases since such routes are well defined and therefore predictable. But there are times when we need to change router configurations dynamically so message flows could be routed to a different channel.

For example:

You might want to bring down some part of your system for maintenance. So, temporarily you want to re-reroute messages to a different message flow. Or you may want to introduce more granularity to your message flow by adding another route to handle a more concrete type of java.lang.Number (in cases of Payload Type Router).

Unfortunately with static router configuration to accomplish this you'd have to bring down your entire application, change the configuration of the router (change routes) and bring it back up. This is obviously not the solution.

Dynamic Router pattern describes the mechanisms by which one can change/configure routers dynamically without bringing down your system or individual routers. 

Before we get into the specifics of how it is accomplished in Spring Integration lets quickly summarize the typical flow of the router, which consists of 3 simple steps:

  • Step 1 - Compute channel identifier which is a value calculated by the router once it receives the Message. Typically it is a String or and instance of the actual MessageChannel.

  • Step 2 - Resolve channel identifier to channel name. We'll describe specifics of this process in a moment.

  • Step 3 - Resolve channel name to the actual MessageChannel

There is not much that could be done with regard to router dynamics if Step 1 results in the actual instance of the MessageChannel simply because MessageChannel is the final product of any router's job. However, if Step 1 results in channel identifier that is not and instance of MessageChannel, then there are quite a few possibilities to influence the process of calculating what will be the final instance of the Message Channel. Lets look at couple of the examples in the context of the 3 steps mentioned above: 

Payload Type Router

<payload-type-router input-channel="routingChannel">
  <mapping type="java.lang.String" channel="channel1" />
  <mapping type="java.lang.Integer" channel="channel2" />
</payload-type-router>

Within the context of the Payload Type Router the 3 steps mentioned above would be realized as:

  • Step 1 - Compute channel identifier which is the fully qualified name of the payload type (e.g., java.lang.String).

  • Step 2 - Resolve channel identifier to channel name where the result of the previous step is used to select the appropriate value from the payload type mapping defined via mapping element.

  • Step 3 - Resolve channel name to the actual instance of the MessageChannel where using ChannelResolver router will obtain a reference to a bean (which is hopefully a MessageChannel) identified by the result of the previous step.

In other words each step feeds the next step until thr process completes.

Header Value Router

<header-value-router input-channel="inputChannel" header-name="testHeader">
  <mapping value="foo" channel="fooChannel" />
  <mapping value="bar" channel="barChannel" />
</header-value-router>

Similar to the PayloadTypeRouter:

  • Step 1 - Compute channel identifier which is the value of the header identified by the header-name attribute.

  • Step 2 - Resolve channel identifier to channel name where the result of the previous step is used to select the appropriate value from the general mapping defined via mapping element.

  • Step 3 - Resolve channel name to the actual instance of the MessageChannel where using ChannelResolver router will obtain a reference to a bean (which is hopefully a MessageChannel) identified by the result of the previous step.

The above two configurations of two different router types look almost identical. However if we look at the different configuration of the HeaderValueRouter we clearly see that there is no mapping sub element:

<header-value-router input-channel="inputChannel" header-name="testHeader">

But configuration is still perfectly valid. So the natural question is what about the maping in the Step 2?

What this means is that Step 2 is now an optional step. If mapping is not defined then the channel identifier value computed in Step 1 will automatically be treated as the channel name which will now be resolved to the actual MessageChannel in the Step 3. What it also means is that Step 2 is one of the key steps to provide dynamic characteristics to the routers, since it introduces a process which allows you to change the way 'channel identifier' resolves to 'channel name', thus influencing the process of determining the final instance of the MessageChannel from the initial channel identifier

For Example:

In the above configuration lets assume that the testHeader value is 'kermit' which is now a channel identifier (Step 1). Since there is no mapping in this router, resolving this channel identifier to a channel name (Step 2) is impossible and this channel identifier is now treated as channel name. However what if there was mapping but for a different value, the end result would still be the same and that is: if new value can not be determined through the process of resolving 'channel identifier' to a 'channel name', such 'channel identifier' becomes 'channel name'

So all that is left is for Step 3 to resolve channel name ('kermit') to an actual instance of the MessageChannel identified by this name. That will be done via default ChannelResolver implementation which is BeanFactoryChannelResolver which basically does a bean lookup by the name provided. So now all messages which contain the header/value pair as testHeader=kermit are going to be routed to a 'kermit' MessageChannel.

But what if you want to route these messages to 'simpson' channel? Obviously changing static configuration would work, but would also require bringing your system down. However if you had access to channel identifier map, then you could just introduce a new mapping where header/value pair is now kermit=simpson, thus allowing Step 2 to treat 'kermit' as channel identifier while resolving it to 'simpson' as channel name .

The same obviously applies for PayloadTypeRouter where you can now remap or remove a particular payload type mapping, and every other router including expression-based routers since their computed value will now have a chance to go through Step 2 to be aditionally resolved to the actual channel name.

In Spring Integration 2.0 routers hierarchy underwent major refactoring and now any router that is a subclass of the AbstractMessageRouter (all framework defined routers) is a Dynamic Router simply because channelIdentiferMap is defined at the AbstractMessageRouter with convenient accessors and modifiers exposed as public methods allowing you to change/add/remove router mapping at runtime via JMX (see section section 29) or ControlBus (see section section 29.7) functionality. 

Control Bus

One of the way to manage the router mappings is through the Control Bus which exposes a Control Channel where you can send control messages to manage and monitor Spring Integration components which includes routers. For more information about the Control Bus see section 29.7. Typically you would send a control message asking to invoke a particular JMX operation on a particular managed component (e.g., router). The two managed operations (methods) that are specific to changing router resolution process are:

  • public void setChannelMapping(String channelIdentifier, String channelName) - will allow you to add new or modify existing mapping of channel identifier to channel name

  • public void removeChannelMapping(String channelIdentifier) - will allow you to remove a particular channel mapping, thus disconnecting the relationship between channel identifier and channel name

There are obviously other managed operations, so please refer to an AbstractMessageRouter for more detail

You can also use your favorite JMX client (e.g., JConsole) and use those operations (methods) to change router configuration. For more information on Spring Integration management and monitoring please visit section 29 of this manual.

4.2 Filter

4.2.1 Introduction

Message Filters are used to decide whether a Message should be passed along or dropped based on some criteria such as a Message Header value or even content within the Message itself. Therefore, a Message Filter is similar to a router, except that for each Message received from the filter's input channel, that same Message may or may not be sent to the filter's output channel. Unlike the router, it makes no decision regarding which Message Channel to send to but only decides whether to send.

[Note]Note
As you will see momentarily, the Filter does also support a discard channel, so in certain cases it can play the role of a very simple router (or "switch") based on a boolean condition.

In Spring Integration, a Message Filter may be configured as a Message Endpoint that delegates to some implementation of the MessageSelector interface. That interface is itself quite simple:

 public interface MessageSelector {

     boolean accept(Message<?> message);

 }

The MessageFilter constructor accepts a selector instance:

 MessageFilter filter = new MessageFilter(someSelector);

In combination with the namespace and SpEL very powerful filters can be configured with very little java code.

4.2.2 Namespace support for Filter - <filter> Element

The <filter> element is used to create a Message-selecting endpoint. In addition to "input-channel" and "output-channel" attributes, it requires a "ref". The "ref" may point to a MessageSelector implementation:

 <filter input-channel="input" ref="selector" output-channel="output"/>

 <bean id="selector" class="example.MessageSelectorImpl"/>

Alternatively, the "method" attribute can be added at which point the "ref" may refer to any object. The referenced method may expect either the Message type or the payload type of inbound Messages. The return value of the method must be a boolean value. Any time the method returns 'true', the Message will be passed along to the output-channel.

 <filter input-channel="input" output-channel="output"
         ref="exampleObject" method="someBooleanReturningMethod"/>

 <bean id="exampleObject" class="example.SomeObject"/>

If the selector or adapted POJO method returns false, there are a few settings that control the fate of the rejected Message. By default (if configured like the example above), the rejected Messages will be silently dropped. If rejection should instead indicate an error condition, then set the 'throw-exception-on-rejection' flag to true:

 <filter input-channel="input" ref="selector"
      output-channel="output" throw-exception-on-rejection="true"/> 

If you want the rejected messages to go to a specific channel, provide that reference as the 'discard-channel':

 <filter input-channel="input" ref="selector"
      output-channel="output" discard-channel="rejectedMessages"/> 

[Note]Note
A common usage for Message Filters is in conjunction with a Publish Subscribe Channel. Many filter endpoints may be subscribed to the same channel, and they decide whether or not to pass the Message for the next endpoint which could be any of the supported types (e.g. Service Activator). This provides a reactive alternative to the more proactive approach of using a Message Router with a single Point-to-Point input channel and multiple output channels.

Using a "ref" attribute is generally recommended if the custom filter implementation can be reused in other <filter> definitions. However if the custom filter implementation should be scoped to a single <filter> element, provide an inner bean definition:

<filter method="someMethod" input-channel="inChannel" output-channel="outChannel">
  <beans:bean class="org.foo.MyCustomFilter"/>
</filter>

[Note]Note

Using both the "ref" attribute and an inner handler definition in the same <filter> configuration is not allowed, as it creates an ambiguous condition, and it will therefore result in an Exception being thrown.

With the introduction of SpEL Spring Integration has added the expression attribute to the filter element. It can be used to avoid Java entirely for simple filters.

          <filter input-channel="input" expression="payload.equals(nonsense)"/>
      

The string passed as the expression attribute will be evaluated as a SpEL expression in the context of the message. If it is needed to include the result of an expression in the scope of the application context you can use the #{} notation as defined in the SpEL reference documentation SpEL reference documentation .

          <filter input-channel="input" expression="payload.matches(#{filterPatterns.nonsensePattern})"/>
      

If the Expression itself needs to be dynamic, then an 'expression' sub-element may be used. That provides a level of indirection for resolving the Expression by its key from an ExpressionSource. That is a strategy interface that you can implement directly, or you can rely upon a version available in Spring Integration that loads Expressions from a "resource bundle" and can check for modifications after a given number of seconds. All of this is demonstrated in the following configuration sample where the Expression could be reloaded within one minute if the underlying file had been modified. If the ExpressionSource bean is named "expressionSource", then it is not necessary to provide the "source" attribute on the <expression> element, but in this case it's shown for completeness.

          <filter input-channel="input" output-channel="output">
      <expression key="filterPatterns.example" source="myExpressions"/>
  </filter>

  <beans:bean id="myExpressions" id="myExpressions"
          class="org.springframework.integration.expression.ReloadableResourceBundleExpressionSource">
      <beans:property name="basename" value="config/integration/expressions"/>
      <beans:property name="cacheSeconds" value="60"/>
  </beans:bean>

Then, the 'config/integration/expressions.properties' file (or any more specific version with a locale extension to be resolved in the typical way that resource-bundles are loaded) would contain a key/value pair:

          filterPatterns.example=payload > 100

[Note]Note
All of the examples that use "expression" as an attribute or sub-element can also be applied within transformer, router, splitter, service-activator, and header-enricher elements. Of course, the semantics/role of the given component type would affect the interpretation of the evaluation result in the same way that the return or a method-invocation would be interpreted. For example, an expression can return Strings that are to be treated as Message Channel names by a router component.

4.3 Splitter

4.3.1 Introduction

The Splitter is a component whose role is to partition a message in several parts, and send the resulting messages to be processed independently. Very often, they are upstream producers in a pipeline that includes an Aggregator.

4.3.2 Programming model

The API for performing splitting consists from one base class, AbstractMessageSplitter, which is a MessageHandler implementation, encapsulating features which are common to splitters, such as filling in the appropriate message headers CORRELATION_ID, SEQUENCE_SIZE, and SEQUENCE_NUMBER on the messages that are produced. This allows to track down the messages and the results of their processing (in a typical scenario, these headers would be copied over to the messages that are produced by the various transforming endpoints), and use them, for example, in a Composed Message Processor scenario.

An excerpt from AbstractMessageSplitter can be seen below:

public abstract class AbstractMessageSplitter
                extends AbstractReplyProducingMessageConsumer {
  ...
  protected abstract Object splitMessage(Message<?> message);

}

For implementing a specific Splitter in an application, a developer can extend AbstractMessageSplitter and implement the splitMessage method, thus defining the actual logic for splitting the messages. The return value can be one of the following:

  • a Collection (or subclass thereof) or an array of Message objects - in this case the messages will be sent as such (after the CORRELATION_ID, SEQUENCE_SIZE and SEQUENCE_NUMBER are populated). Using this approach gives more control to the developer, for example for populating custom message headers as part of the splitting process.

  • a Collection (or subclass thereof) or an array of non-Message objects - works like the prior case, except that each collection element will be used as a Message payload. Using this approach allows developers to focus on the domain objects without having to consider the Messaging system and produces code that is easier to test.

  • a Message or non-Message object (but not a Collection or an Array) - it works like the previous cases, except that there is a single message to be sent out.

In Spring Integration, any POJO can implement the splitting algorithm, provided that it defines a method that accepts a single argument and has a return value. In this case, the return value of the method will be interpreted as described above. The input argument might either be a Message or a simple POJO. In the latter case, the splitter will receive the payload of the incoming message. Since this decouples the code from the Spring Integration API and will typically be easier to test, it is the recommended approach.

4.3.3 Configuring a Splitter using XML

A splitter can be configured through XML as follows:

<channel id="inputChannel"/>

<splitter id="splitter" 1
  ref="splitterBean" 2
  method="split" 3
  input-channel="inputChannel" 4
  output-channel="outputChannel" 5/>

<channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>

1

The id of the splitter is optional.

2

A reference to a bean defined in the application context. The bean must implement the splitting logic as described in the section above. Optional. If reference to a bean is not provided, then it is assumed that the payload of the Message that arrived on the input-channel is an implementation of java.util.Collection and the default splitting logic will be applied on such Collection, incorporating each individual element into a Message and depositing it on the output-channel.

3

The method (defined on the bean specified above) that implements the splitting logic. Optional.

4

The input channel of the splitter. Required.

5

The channel where the splitter will send the results of splitting the incoming message. Optional (because incoming messages can specify a reply channel themselves).

Using a "ref" attribute is generally recommended if the custom splitter handler implementation can be reused in other <splitter> definitions. However if the custom splitter handler implementation should be scoped to a single definition of the <splitter>, you can configure an inner bean definition:

<splitter id="testSplitter" input-channel="inChannel" method="split"
                output-channel="outChannel">
  <beans:bean class="org.foo.TestSplitter"/>
</spliter>

[Note]Note

Using both a "ref" attribute and an inner handler definition in the same <splitter> configuration is not allowed, as it creates an ambiguous condition and will result in an Exception being thrown.

4.3.4 Configuring a Splitter with Annotations

The @Splitter annotation is applicable to methods that expect either the Message type or the message payload type, and the return values of the method should be a collection of any type. If the returned values are not actual Message objects, then each of them will be sent as the payload of a message. Those messages will be sent to the output channel as designated for the endpoint on which the @Splitter is defined.

@Splitter
List<LineItem> extractItems(Order order) {
    return order.getItems()
}

4.4 Aggregator

4.4.1 Introduction

Basically a mirror-image of the Splitter, the Aggregator is a type of Message Handler that receives multiple Messages and combines them into a single Message. In fact, Aggregators are often downstream consumers in a pipeline that includes a Splitter.

Technically, the Aggregator is more complex than a Splitter, because it is required to maintain state (the Messages to be aggregated), to decide when the complete group of Messages is available. In order to do this it requires a MessageStore

4.4.2 Functionality

The Aggregator combines a group of related messages, by correlating and storing them, until the group is deemed complete. At that point, the Aggregator will create a single message by processing the whole group, and will send that aggregated message as output.

An main aspect of implementing an Aggregator is providing the logic that has to be executed when the aggregation (creation of a single message out of many) takes place. The other two aspects are correlation and release

In Spring Integration, the grouping of the messages for aggregation (correlation) is done by default based on their CORRELATION_ID message header (i.e. the messages with the same CORRELATION_ID will be grouped together). However, this can be customized, and the users can opt for other ways of specifying how the messages should be grouped together, by using a CorrelationStrategy (see below).

To determine whether or not a group of messages may be processed, a ReleaseStrategy is consulted. The default release strategy for aggregator will release groups that have all messages from the sequence, but this can be entirely customized

4.4.3 Programming model

The Aggregation API consists of a number of classes:

  • The interface MessageGroupProcessor and related base class AbstractAggregatingMessageGroupProcessor and its subclass MethodInvokingAggregatingMessageGroupProcessor

  • The ReleaseStrategy interface and its default implementation SequenceSizeReleaseStrategy

  • The CorrelationStrategy interface and its default implementation HeaderAttributeCorrelationStrategy

CorrelatingMessageHandler

The CorrelatingMessageHandler is a MessageHandler implementation, encapsulating the common functionalities of an Aggregator (and other correlating use cases), which are:

  • correlating messages into a group to be aggregated

  • maintaining those messages in a MessageStore until the group may be released

  • deciding when the group is in fact may be released

  • processing the released group into a single aggregated message

  • recognizing and responding to an expired group

The responsibility of deciding how the messages should be grouped together is delegated to a CorrelationStrategy instance. The responsibility of deciding whether the message group can be released is delegated to a ReleaseStrategy instance.

Here is a brief highlight of the base AbstractAggregatingMessageGroupProcessor (the responsibility of implementing the aggregateMessages method is left to the developer):

public abstract class AbstractAggregatingMessageGroupProcessor
              implements MessageGroupProcessor {

    protected Map<String, Object> aggregateHeaders(MessageGroup group) {
  ....
    }

  	protected abstract Object aggregatePayloads(MessageGroup group);

}
The CorrelationStrategy is owned by the CorrelatingMessageHandler and it has a default value based on the correlation ID message header:
private volatile CorrelationStrategy correlationStrategy =
          new HeaderAttributeCorrelationStrategy(MessageHeaders.CORRELATION_ID);

When appropriate, the simplest option is the DefaultAggregatingMessageGroupProcessor. It creates a single Message whose payload is a List of the payloads received for a given group. It uses the default CorrelationStrategy and CompletionStrategy as shown above. This works well for simple Scatter Gather implementations with either a Splitter, Publish Subscribe Channel, or Recipient List Router upstream.

[Note]Note

When using a Publish Subscribe Channel or Recipient List Router in this type of scenario, be sure to enable the flag to apply-sequence. That will add the necessary headers (correlation id, sequence number and sequence size). That behavior is enabled by default for Splitters in Spring Integration, but it is not enabled for the Publish Subscribe Channel or Recipient List Router because those components may be used in a variety of contexts where those headers are not necessary.

When implementing a specific aggregator object for an application, a developer can extend AbstractAggregatingMessageGroupProcessor and implement the aggregatePayloads method. However, there are better suited (which reads, less coupled to the API) solutions for implementing the aggregation logic, which can be configured easily either through XML or through annotations.

In general, any ordinary Java class (i.e. POJO) can implement the aggregation algorithm. For doing so, it must provide a method that accepts as an argument a single java.util.List (parametrized lists are supported as well). This method will be invoked for aggregating messages, as follows:

  • if the argument is a parametrized java.util.List, and the parameter type is assignable to Message, then the whole list of messages accumulated for aggregation will be sent to the aggregator

  • if the argument is a non-parametrized java.util.List or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages

  • if the return type is not assignable to Message, then it will be treated as the payload for a Message that will be created automatically by the framework.

[Note]Note

In the interest of code simplicity, and promoting best practices such as low coupling, testability, etc., the preferred way of implementing the aggregation logic is through a POJO, and using the XML or annotation support for setting it up in the application.

ReleaseStrategy

The ReleaseStrategy interface is defined as follows:

public interface ReleaseStrategy {

  boolean canRelease(MessageGroup messages);

}

In general, any ordinary Java class (i.e. POJO) can implement the completion decision mechanism. For doing so, it must provide a method that accepts as an argument a single java.util.List (parametrized lists are supported as well), and returns a boolean value. This method will be invoked after the arrival of a new message, to decide whether the group is complete or not, as follows:

  • if the argument is a parametrized java.util.List, and the parameter type is assignable to Message, then the whole list of messages accumulated in the group will be sent to the method

  • if the argument is a non-parametrized java.util.List or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages

  • the method must return true if the message group is ready for aggregation, and false otherwise.

When the group is released for aggregation, all its unmarked messages are processed and then marked so they will not be processed again. If the group is also complete (i.e. if all messages from a sequence have arrived or if there is no sequence defined) then the group is removed from the message store. Partial sequences can be released, in which case the next time the ReleaseStrategy is called it will be presented with a group containing marked messages (already processed) and unmarked messages (a potential new partial sequence)

Spring Integration provides an out-of-the box implementation for ReleaseStrategy, the SequenceSizerReleaseStrategy. This implementation uses the SEQUENCE_NUMBER and SEQUENCE_SIZE of the arriving messages for deciding when a message group is complete and ready to be aggregated. As shown above, it is also the default strategy.

CorrelationStrategy

The CorrelationStrategy interface is defined as follows:

public interface CorrelationStrategy {

  Object getCorrelationKey(Message<?> message);

}

The method shall return an Object which represents the correlation key used for grouping messages together. The key must satisfy the criteria used for a key in a Map with respect to the implementation of equals() and hashCode().

In general, any ordinary Java class (i.e. POJO) can implement the correlation decision mechanism, and the rules for mapping a message to a method's argument (or arguments) are the same as for a ServiceActivator (including support for @Header annotations). The method must return a value, and the value must not be null.

Spring Integration provides an out-of-the box implementation for CorrelationStrategy, the HeaderAttributeCorrelationStrategy. This implementation returns the value of one of the message headers (whose name is specified by a constructor argument) as the correlation key. By default, the correlation strategy is a HeaderAttributeCorrelationStrategy returning the value of the CORRELATION_ID header attribute.

4.4.4 Configuring an Aggregator with XML

Spring Integration supports the configuration of an aggregator via XML through the <aggregator/> element. Below you can see an example of an aggregator with all optional parameters defined.

<channel id="inputChannel"/>

<aggregator id="completelyDefinedAggregator" 1
    input-channel="inputChannel" 2
    output-channel="outputChannel"  3
    discard-channel="discardChannel"  4
    ref="aggregatorBean" 5
    method="add" 6
    release-strategy="releaseStrategyBean"  7
    release-strategy-method="canRelease" 8
    correlation-strategy="correlationStrategyBean" 9
    correlation-strategy-method="groupNumbersByLastDigit" 10
    message-store="messageStore" 11
    send-partial-result-on-expiry="true" 12
    send-timeout="86420000" 13 
    expression="#this.toArray()" 14/> 

<channel id="outputChannel"/>

<bean id="aggregatorBean" class="sample.PojoAggregator"/>

<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>

<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>

1

The id of the aggregator is optional.

2

The input channel of the aggregator. Required.

3

The channel where the aggregator will send the aggregation results. Optional (because incoming messages can specify a reply channel themselves).

4

The channel where the aggregator will send the messages that timed out (if send-partial-results-on-timeout is false). Optional.

5

A reference to a bean defined in the application context. The bean must implement the aggregation logic as described above. Required.

6

A method defined on the bean referenced by ref, that implements the message aggregation algorithm. Optional, with restrictions (see above).

7

A reference to a bean that implements the decision algorithm as to whether a given message group is complete. The bean can be an implementation of the CompletionStrategy interface or a POJO. In the latter case the completion-strategy-method attribute must be defined as well. Optional (by default, the aggregator will use sequence size) .

8

A method defined on the bean referenced by release-strategy, that implements the completion decision algorithm. Optional, with restrictions (requires completion-strategy to be present).

9

A reference to a bean that implements the correlation strategy. The bean can be an implementation of the CorrelationStrategy interface or a POJO. In the latter case the correlation-strategy-method attribute must be defined as well. Optional (by default, the aggregator will use the correlation id header attribute) .

10

A method defined on the bean referenced by correlation-strategy, that implements the correlation key algorithm. Optional, with restrictions (requires correlation-strategy to be present).

11

A reference to a MessageGroupStore that can be used to store groups of messages under their correlation key until they are complete. Optional with default a volatile in-memory store.

12

Whether upon the expiration of the message group, the aggregator will try to aggregate the messages that have already arrived. Optional (false by default).

13

The timeout for sending the aggregated messages to the output or reply channel. Optional.

14

SpEL expression to handle release strategy Optional.

Using a "ref" attribute is generally recommended if a custom aggregator handler implementation can be reused in other <aggregator> definitions. However if a custom aggregator handler implementation should be scoped to a concrete definition of the <aggregator>, you can use an inner bean definition (starting with version 1.0.3) for custom aggregator handlers within the <aggregator> element:

<aggregator input-channel="input" method="sum" output-channel="output">
    <beans:bean class="org.foo.ExampleAggregator"/>
</aggregator>
[Note]Note

Using both a "ref" attribute and an inner bean definition in the same <aggregator> configuration is not allowed, as it creates an ambiguous condition. In such cases, an Exception will be thrown.

An example implementation of the aggregator bean looks as follows:

public class PojoAggregator {

  public Long add(List<Long> results) {
    long total = 0l;
    for (long partialResult: results) {
      total += partialResult;
    }
    return total;
  }

}

An implementation of the completion strategy bean for the example above may be as follows:

public class PojoReleaseStrategy {
...
  public boolean canRelease(List<Long> numbers) {
    int sum = 0;
    for (long number: numbers) {
      sum += number;
    }
    return sum >= maxValue;
  }
}

[Note]Note

Wherever it makes sense, the release strategy method and the aggregator method can be combined in a single bean.

An implementation of the correlation strategy bean for the example above may be as follows:

public class PojoCorrelationStrategy {
...
  public Long groupNumbersByLastDigit(Long number) {
    return number % 10;
  }
}

For example, this aggregator would group numbers by some criterion (in our case the remainder after dividing by 10) and will hold the group until the sum of the numbers which represents the payload exceeds a certain value.

[Note]Note

Wherever it makes sense, the release strategy method, correlation strategy method and the aggregator method can be combined in a single bean (all of them or any two).

Aggregators and Spring Expression Language (SpEL)

Since Spring Integration 2.0, the release strategy may be handled with SpEL (http://static.springsource.org/spring/docs/3.0.x/spring-framework-reference/html/expressions.html) which would be recommend if the logic behind such release strategy is relatively simple. Let's say you have a legacy component which was designed to receive an array of objects. We know that default release strategy will assemble all aggregated messages in the List. So now we have two problems. First we need to extract individual messages form such list, extract payload of each message and assemble them into the array of objects (see code below)

public String[] processRelease(List<Message<String>> mesages){
		List<String> strList = new ArrayList<String>();
		for (Message<String> message : mesages) {
			strList.add(message.getPayload());
		}
		return strList.toArray(new String[]{});
}

However, with SpEL such requirement could actually be handled relatively easy with a simple one-line expression, thus sparing you from writing a custom class and configuring it as a bean.

<int:aggregator input-channel="aggrChannel" 
	output-channel="replyChannel" 
	expression="#this.![payload].toArray()"/>

In the above configuration we are using Collection Projection expression (http://static.springsource.org/spring/docs/3.0.x/spring-framework-reference/html/expressions.html#d0e12113) to assemble a new collection from the payloads of all messages in the list and then transforming it to an Array, thus achieving the same result as the java code above.

4.4.5 Managing State in an Aggregator: MessageGroupStore

Aggregator (and some other patterns in Spring Integration) is a stateful pattern that requires decisions to be made based on a group of messages that have arrived over a period of time, all with the same correlation key. The design of the interfaces in the stateful patterns (e.g. ReleaseStrategy) is driven by the principle that the components (framework and user) should be to remain stateless. All state is carried by the MessageGroup and its management is delegated to the MessageGroupStore.

The MessageGroupStore accumulates state information in MessageGroups, potentially forever. So to prevent stale state from hanging around, and for volatile stores to provide a hook for cleaning up when the application shots down, the MessageGroupStore allows the user to register callbacks to apply to MessageGroups when they expire. The interface is very straighforward:

public interface MessageGroupCallback {

  void execute(MessageGroupStore messageGroupStore, MessageGroup group);

}

The callback has access directly to the store and the message group so it can manage the persistent state (e.g. by removing the group from the store entirely).

The MessageGroupStore maintains a list of these callbacks which it applies when asked to all messages whose timestamp is earlier than a time supplied as a parameter:

public interface MessageGroupStore {
   void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
   int expireMessageGroups(long timeout);
}

The expireMessageGroups method can be called with a timeout value: any message older than the current time minus this value wiull be expired, and have the callbacks applied. Thus it is the user of the store that defines what is meant by message group "expiry".

As a convenience for users, Spring Integration provides a wrapper for the message expiry in the form of a MessageGroupStoreReaper:

<bean id="reaper" class="org...MessageGroupStoreReaper">
  <property name="messageGroupStore" ref="messageStore"/>
  <property name="timeout" value="10"/>
</bean>

<task:scheduled-tasks scheduler="scheduler">	
  <task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>

The reaper is a Runnable, and all that is happening is that the message group store's expire method is being called in the sample above once every 10 seconds. In addition to the reaper, the expiry callbacks are invoked when the application shuts down via a lifecycle callback in the CorrelatingMessageHandler.

The CorrelatingMessageHandler registers its own expiry callback, and this is the link with the boolean flag send-partial-result-on-expiry in the XML configuration of the aggregator. If the flag is set to true, then when the expiry callback is invoked then any unmarked messages in groups that are not yet released can be sent on to the downstream channel.

4.4.6 Configuring an Aggregator with Annotations

An aggregator configured using annotations can look like this.

public class Waiter {
  ... 

  @Aggregator 1 
  public Delivery aggregatingMethod(List<OrderItem> items) {
    ...
  }

  @ReleaseStrategy 2
  public boolean releaseChecker(List<Message<?>> messages) {
    ...
  }

  @CorrelationStrategy 3
  public String correlateBy(OrderItem item) {
    ...
  }

}

1

An annotation indicating that this method shall be used as an aggregator. Must be specified if this class will be used as an aggregator.

2

An annotation indicating that this method shall be used as the release strategy of an aggregator. If not present on any method, the aggregator will use the SequenceSizeCompletionStrategy.

3

An annotation indicating that this method shall be used as the correlation strategy of an aggregator. If no correlation strategy is indicated, the aggregator will use the HeaderAttributeCorrelationStrategy based on CORRELATION_ID.

All of the configuration options provided by the xml element are also available for the @Aggregator annotation.

The aggregator can be either referenced explicitly from XML or, if the @MessageEndpoint is defined on the class, detected automatically through classpath scanning.

4.5 Resequencer

4.5.1 Introduction

Related to the Aggregator, albeit different from a functional standpoint, is the Resequencer.

4.5.2 Functionality

The Resequencer works in a similar way to the Aggregator, in the sense that it uses the CORRELATION_ID to store messages in groups, the difference being that the Resequencer does not process the messages in any way. It simply releases them in the order of their SEQUENCE_NUMBER header values.

With respect to that, the user might opt to release all messages at once (after the whole sequence, according to the SEQUENCE_SIZE, has been released), or as soon as a valid sequence is available.

4.5.3 Configuring a Resequencer with XML

Configuring a resequencer requires only including the appropriate element in XML.

A sample resequencer configuration is shown below.

<channel id="inputChannel"/>

<channel id="outputChannel"/>

<resequencer id="completelyDefinedResequencer" 1
  input-channel="inputChannel" 2
  output-channel="outputChannel" 3
  discard-channel="discardChannel" 4
  release-partial-sequences="true" 5
  message-store="messageStore" 6
  send-partial-result-on-expiry="true" 7
  send-timeout="86420000" 8 /> 

1

The id of the resequencer is optional.

2

The input channel of the resequencer. Required.

3

The channel where the resequencer will send the reordered messages. Optional.

4

The channel where the resequencer will send the messages that timed out (if send-partial-result-on-timeout is false). Optional.

5

Whether to send out ordered sequences as soon as they are available, or only after the whole message group arrives. Optional (false by default).

If this flag is not specified (so a complete sequence is defined by the sequence headers) then it can make sense to provide a custom Comparator to be used to order the messages when sending (use the XML attribute comparator to point to a bean definition). If release-partial-sequences is true then there is no way with a custom comparator to define a partial sequence. To do that you would have to provide a release-strategy (also a reference to another bean definition, either a POJO or a ReleaseStrategy ).

6

A reference to a MessageGroupStore that can be used to store groups of messages under their correlation key until they are complete. Optional with default a volatile in-memory store.

7

Whether, upon the expiration of the group, the ordered group should be sent out (even if some of the messages are missing). Optional (false by default). See Section 4.4.5, “Managing State in an Aggregator: MessageGroupStore”.

8

The timeout for sending out messages. Optional.

[Note]Note
Since there is no custom behavior to be implemented in Java classes for resequencers, there is no annotation support for it.

4.6 Message Handler Chain

4.6.1 Introduction

The MessageHandlerChain is an implementation of MessageHandler that can be configured as a single Message Endpoint while actually delegating to a chain of other handlers, such as Filters, Transformers, Splitters, and so on. This can lead to a much simpler configuration when several handlers need to be connected in a fixed, linear progression. For example, it is fairly common to provide a Transformer before other components. Similarly, when providing a Filter before some other component in a chain, you are essentially creating a Selective Consumer. In either case, the chain only requires a single input-channel and a single output-channel as opposed to the configuration of channels for each individual component.

[Tip]Tip
Spring Integration's Filter provides a boolean property 'throwExceptionOnRejection'. When providing multiple Selective Consumers on the same point-to-point channel with different acceptance criteria, this value should be set to 'true' (the default is false) so that the dispatcher will know that the Message was rejected and as a result will attempt to pass the Message on to other subscribers. If the Exception were not thrown, then it would appear to the dispatcher as if the Message had been passed on successfully even though the Filter had dropped the Message to prevent further processing.

The handler chain simplifies configuration while internally maintaining the same degree of loose coupling between components, and it is trivial to modify the configuration if at some point a non-linear arrangement is required.

Internally, the chain will be expanded into a linear setup of the listed endpoints, separated by direct channels. The reply channel header will not be taken into account within the chain: only after the last handler is invoked will the resulting message be forwarded on to the reply channel or the chain's output channel. Because of this setup all handlers except the last require a setOutputChannel implementation. The last handler only needs an output channel if the outputChannel on the MessageHandlerChain is set.

[Note]Note

As with other endpoints, the output-channel is optional. If there is a reply Message at the end of the chain, the output-channel takes precedence, but if not available, the chain handler will check for a reply channel header on the inbound Message.

In most cases there is no need to implement MessageHandlers yourself. The next section will focus on namespace support for the chain element. Most Spring Integration endpoints, like Service Activators and Transformers, are suitable for use within a MessageHandlerChain.

4.6.2 The <chain> Element

The <chain> element provides an 'input-channel' attribute, and if the last element in the chain is capable of producing reply messages (optional), it also supports an 'output-channel' attribute. The sub-elements are then filters, transformers, splitters, and service-activators. The last element may also be a router.

 <chain input-channel="input" output-channel="output">
     <filter ref="someSelector" throw-exception-on-rejection="true"/>
     <header-enricher error-channel="customErrorChannel">
         <header name="foo" value="bar"/>
     </header-enricher>
     <service-activator ref="someService" method="someMethod"/>
 </chain>

The <header-enricher> element used in the above example will set a message header with name "foo" and value "bar" on the message. A header enricher is a specialization of Transformer that touches only header values. You could obtain the same result by implementing a MessageHandler that did the header modifications and wiring that as a bean.

Some time you need to make a nested call to another chain from within the chain and then come back and continue execution within the original chain. To accomplish this you can utilize Messaging Gateway by including light-configuration via <gateway> element. For example:

   <si:chain id="main-chain" input-channel="inputA" output-channel="inputB">
    <si:header-enricher>
      <si:header name="name" value="Many" />
    </si:header-enricher>
    <si:service-activator>
      <bean class="org.foo.SampleService" />
    </si:service-activator>
    <si:gateway request-channel="inputC"/>  
  </si:chain>
  <si:chain id="nested-chain-a" input-channel="inputC">
    <si:header-enricher>
      <si:header name="name" value="Moe" />
    </si:header-enricher>
    <si:gateway request-channel="inputD"/> 
    <si:service-activator>
      <bean class="org.foo.SampleService" />
    </si:service-activator>
  </si:chain>
  <si:chain id="nested-chain-b" input-channel="inputD">
    <si:header-enricher>
      <si:header name="name" value="Jack" />
    </si:header-enricher>
    <si:service-activator>
      <bean class="org.foo.SampleService" />
    </si:service-activator>
  </si:chain>

In the above example the nested-chain-a will be called at the end of main-chain processing by the 'gateway' element configured there. While in nested-chain-a a call to a nested-chain-b will be made after header enrichment and then it will come back to finish execution in nested-chain-b finally getting back to the main-chain. When light version of <gateway> element is defined in the chain SI will construct an instance SimpleMessagingGateway (no need to provide 'service-interface' configuration) which will take the message in its current state and will place it on the channel defined via 'request-channel' attribute. Upon processing Message will be returned to the gateway and continue its journey within the current chain.