Configuring Message Channels

To create a message channel instance, you can use the <channel/> element for xml or DirectChannel instance for Java configuration, as follows:

  • Java

  • XML

@Bean
public MessageChannel exampleChannel() {
    return new DirectChannel();
}
<int:channel id="exampleChannel"/>

When you use the <channel/> element without any sub-elements, it creates a DirectChannel instance (a SubscribableChannel).

To create a publish-subscribe channel, use the <publish-subscribe-channel/> element (the PublishSubscribeChannel in Java), as follows:

  • Java

  • XML

@Bean
public MessageChannel exampleChannel() {
    return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>

You can alternatively provide a variety of <queue/> sub-elements to create any of the pollable channel types (as described in Message Channel Implementations). The following sections shows examples of each channel type.

DirectChannel Configuration

As mentioned earlier, DirectChannel is the default type. The following listing shows who to define one:

  • Java

  • XML

@Bean
public MessageChannel directChannel() {
    return new DirectChannel();
}
<int:channel id="directChannel"/>

A default channel has a round-robin load-balancer and also has failover enabled (see DirectChannel for more detail). To disable one or both of these, add a <dispatcher/> sub-element (a LoadBalancingStrategy constructor of the DirectChannel) and configure the attributes as follows:

  • Java

  • XML

@Bean
public MessageChannel failFastChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setFailover(false);
    return channel;
}

@Bean
public MessageChannel failFastChannel() {
    return new DirectChannel(null);
}
<int:channel id="failFastChannel">
    <int:dispatcher failover="false"/>
</channel>

<int:channel id="channelWithFixedOrderSequenceFailover">
    <int:dispatcher load-balancer="none"/>
</int:channel>

Starting with version 6.3, all the MessageChannel implementations based on the UnicastingDispatcher can be configured with a Predicate<Exception> failoverStrategy instead of plain failover option. This predicate makes a decision to failover or not to the next MessageHandler based on an exception thrown from the current one. The more complex error analysis should be done using ErrorMessageExceptionTypeRouter.

Datatype Channel Configuration

Sometimes, a consumer can process only a particular type of payload, forcing you to ensure the payload type of the input messages. The first thing that comes to mind may be to use a message filter. However, all that message filter can do is filter out messages that are not compliant with the requirements of the consumer. Another way would be to use a content-based router and route messages with non-compliant data-types to specific transformers to enforce transformation and conversion to the required data type. This would work, but a simpler way to accomplish the same thing is to apply the Datatype Channel pattern. You can use separate datatype channels for each specific payload data type.

To create a datatype channel that accepts only messages that contain a certain payload type, provide the data type’s fully-qualified class name in the channel element’s datatype attribute, as the following example shows:

  • Java

  • XML

@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(Number.class);
    return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>

Note that the type check passes for any type that is assignable to the channel’s datatype. In other words, the numberChannel in the preceding example would accept messages whose payload is java.lang.Integer or java.lang.Double. Multiple types can be provided as a comma-delimited list, as the following example shows:

  • Java

  • XML

@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(String.class, Number.class);
    return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>

So the 'numberChannel' in the preceding example accepts only messages with a data type of java.lang.Number. But what happens if the payload of the message is not of the required type? It depends on whether you have defined a bean named integrationConversionService that is an instance of Spring’s Conversion Service. If not, then an Exception would be thrown immediately. However, if you have defined an integrationConversionService bean, it is used in an attempt to convert the message’s payload to the acceptable type.

You can even register custom converters. For example, suppose you send a message with a String payload to the 'numberChannel' we configured above. You might handle the message as follows:

MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));

Typically, this would be a perfectly legal operation. However, since we use Datatype Channel, the result of such operation would generate an exception similar to the following:

Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…

The exception happens because we require the payload type to be a Number, but we sent a String. So we need something to convert a String to a Number. For that, we can implement a converter similar to the following example:

public static class StringToIntegerConverter implements Converter<String, Integer> {
    public Integer convert(String source) {
        return Integer.parseInt(source);
    }
}

Then we can register it as a converter with the Integration Conversion Service, as the following example shows:

  • Java

  • XML

@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
    return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>

<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>

Or on the StringToIntegerConverter class when it is marked with the @Component annotation for auto-scanning.

When the 'converter' element is parsed, it creates the integrationConversionService bean if one is not already defined. With that converter in place, the send operation would now be successful, because the datatype channel uses that converter to convert the String payload to an Integer.

For more information regarding payload type conversion, see Payload Type Conversion.

Beginning with version 4.0, the integrationConversionService is invoked by the DefaultDatatypeChannelMessageConverter, which looks up the conversion service in the application context. To use a different conversion technique, you can specify the message-converter attribute on the channel. This must be a reference to a MessageConverter implementation. Only the fromMessage method is used. It provides the converter with access to the message headers (in case the conversion might need information from the headers, such as content-type). The method can return only the converted payload or a full Message object. If the latter, the converter must be careful to copy all the headers from the inbound message.

Alternatively, you can declare a <bean/> of type MessageConverter with an ID of datatypeChannelMessageConverter, and that converter is used by all channels with a datatype.

QueueChannel Configuration

To create a QueueChannel, use the <queue/> sub-element. You may specify the channel’s capacity as follows:

  • Java

  • XML

@Bean
public PollableChannel queueChannel() {
    return new QueueChannel(25);
}
<int:channel id="queueChannel">
    <queue capacity="25"/>
</int:channel>
If you do not provide a value for the 'capacity' attribute on this <queue/> sub-element, the resulting queue is unbounded. To avoid issues such as running out of memory, we highly recommend that you set an explicit value for a bounded queue.

Persistent QueueChannel Configuration

Since a QueueChannel provides the capability to buffer messages but does so in-memory only by default, it also introduces a possibility that messages could be lost in the event of a system failure. To mitigate this risk, a QueueChannel may be backed by a persistent implementation of the MessageGroupStore strategy interface. For more details on MessageGroupStore and MessageStore, see Message Store.

The capacity attribute is not allowed when the message-store attribute is used.

When a QueueChannel receives a Message, it adds the message to the message store. When a Message is polled from a QueueChannel, it is removed from the message store.

By default, a QueueChannel stores its messages in an in-memory queue, which can lead to the lost message scenario mentioned earlier. However, Spring Integration provides persistent stores, such as the JdbcChannelMessageStore.

You can configure a message store for any QueueChannel by adding the message-store attribute, as the following example shows:

<int:channel id="dbBackedChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

(See samples below for Java/Kotlin Configuration options.)

The Spring Integration JDBC module also provides a schema Data Definition Language (DDL) for a number of popular databases. These schemas are located in the org.springframework.integration.jdbc.store.channel package of that module (spring-integration-jdbc).

One important feature is that, with any transactional persistent store (such as JdbcChannelMessageStore), as long as the poller has a transaction configured, a message removed from the store can be permanently removed only if the transaction completes successfully. Otherwise, the transaction rolls back, and the Message is not lost.

Many other implementations of the message store are available as the growing number of Spring projects related to “NoSQL” data stores come to provide underlying support for these stores. You can also provide your own implementation of the MessageGroupStore interface if you cannot find one that meets your particular needs.

Since version 4.0, we recommend that QueueChannel instances be configured to use a ChannelMessageStore, if possible. These are generally optimized for this use, as compared to a general message store. If the ChannelMessageStore is a ChannelPriorityMessageStore, the messages are received in FIFO within priority order. The notion of priority is determined by the message store implementation. For example, the following example shows the Java configuration for the MongoDB Channel Message Store:

  • Java

  • Java DSL

  • Kotlin DSL

@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
    MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
    store.setPriorityEnabled(true);
    return store;
}

@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
    return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
    return IntegrationFlow.from((Channels c) ->
            c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
            ....
            .get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
    integrationFlow {
        channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
    }
Pay attention to the MessageGroupQueue class. That is a BlockingQueue implementation to use the MessageGroupStore operations.

Another option to customize the QueueChannel environment is provided by the ref attribute of the <int:queue> sub-element or its particular constructor. This attribute supplies the reference to any java.util.Queue implementation. For example, a Hazelcast distributed IQueue can be configured as follows:

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance(new Config()
                                           .setProperty("hazelcast.logging.type", "log4j"));
}

@Bean
public PollableChannel distributedQueue() {
    return new QueueChannel(hazelcastInstance()
                              .getQueue("springIntegrationQueue"));
}

PublishSubscribeChannel Configuration

To create a PublishSubscribeChannel, use the <publish-subscribe-channel/> element. When using this element, you can also specify the task-executor used for publishing messages (if none is specified, it publishes in the sender’s thread), as follows:

  • Java

  • XML

@Bean
public MessageChannel pubsubChannel() {
    return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>

If you provide a resequencer or aggregator downstream from a PublishSubscribeChannel, you can set the 'apply-sequence' property on the channel to true. Doing so indicates that the channel should set the sequence-size and sequence-number message headers as well as the correlation ID prior to passing along the messages. For example, if there are five subscribers, the sequence-size would be set to 5, and the messages would have sequence-number header values ranging from 1 to 5.

Along with the Executor, you can also configure an ErrorHandler. By default, the PublishSubscribeChannel uses a MessagePublishingErrorHandler implementation to send an error to the MessageChannel from the errorChannel header or into the global errorChannel instance. If an Executor is not configured, the ErrorHandler is ignored and exceptions are thrown directly to the caller’s thread.

If you provide a Resequencer or Aggregator downstream from a PublishSubscribeChannel, you can set the 'apply-sequence' property on the channel to true. Doing so indicates that the channel should set the sequence-size and sequence-number message headers as well as the correlation ID prior to passing along the messages. For example, if there are five subscribers, the sequence-size would be set to 5, and the messages would have sequence-number header values ranging from 1 to 5.

The following example shows how to set the apply-sequence header to true:

  • Java

  • XML

@Bean
public MessageChannel pubsubChannel() {
    PublishSubscribeChannel channel = new PublishSubscribeChannel();
    channel.setApplySequence(true);
    return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
The apply-sequence value is false by default so that a publish-subscribe channel can send the exact same message instances to multiple outbound channels. Since Spring Integration enforces immutability of the payload and header references, when the flag is set to true, the channel creates new Message instances with the same payload reference but different header values.

Starting with version 5.4.3, the PublishSubscribeChannel can also be configured with the requireSubscribers option of its BroadcastingDispatcher to indicate that this channel will not ignore a message silently when it has no subscribers. A MessageDispatchingException with a Dispatcher has no subscribers message is thrown when there are no subscribers and this option is set to true.

ExecutorChannel

To create an ExecutorChannel, add the <dispatcher> sub-element with a task-executor attribute. The attribute’s value can reference any TaskExecutor within the context. For example, doing so enables configuration of a thread pool for dispatching messages to subscribed handlers. As mentioned earlier, doing so breaks the single-threaded execution context between sender and receiver so that any active transaction context is not shared by the invocation of the handler (that is, the handler may throw an Exception, but the send invocation has already returned successfully). The following example shows how to use the dispatcher element and specify an executor in the task-executor attribute:

  • Java

  • XML

@Bean
public MessageChannel executorChannel() {
    return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
    <int:dispatcher task-executor="someExecutor"/>
</int:channel>

The load-balancer and failover options are also both available on the <dispatcher/> sub-element, as described earlier in DirectChannel Configuration. The same defaults apply. Consequently, the channel has a round-robin load-balancing strategy with failover enabled unless explicit configuration is provided for one or both of those attributes, as the following example shows:

<int:channel id="executorChannelWithoutFailover">
    <int:dispatcher task-executor="someExecutor" failover="false"/>
</int:channel>

PriorityChannel Configuration

To create a PriorityChannel, use the <priority-queue/> sub-element, as the following example shows:

  • Java

  • XML

@Bean
public PollableChannel priorityChannel() {
    return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
    <int:priority-queue capacity="20"/>
</int:channel>

By default, the channel consults the priority header of the message. However, you can instead provide a custom Comparator reference. Also, note that the PriorityChannel (like the other types) does support the datatype attribute. As with the QueueChannel, it also supports a capacity attribute. The following example demonstrates all of these:

  • Java

  • XML

@Bean
public PollableChannel priorityChannel() {
    PriorityChannel channel = new PriorityChannel(20, widgetComparator());
    channel.setDatatypes(example.Widget.class);
    return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
    <int:priority-queue comparator="widgetComparator"
                    capacity="10"/>
</int:channel>

Since version 4.0, the priority-channel child element supports the message-store option (comparator and capacity are not allowed in that case). The message store must be a PriorityCapableChannelMessageStore. Implementations of the PriorityCapableChannelMessageStore are currently provided for Redis, JDBC, and MongoDB. See QueueChannel Configuration and Message Store for more information. You can find sample configuration in Backing Message Channels.

RendezvousChannel Configuration

A RendezvousChannel is created when the queue sub-element is a <rendezvous-queue>. It does not provide any additional configuration options to those described earlier, and its queue does not accept any capacity value, since it is a zero-capacity direct handoff queue. The following example shows how to declare a RendezvousChannel:

  • Java

  • XML

@Bean
public PollableChannel rendezvousChannel() {
    return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
    <int:rendezvous-queue/>
</int:channel>

Scoped Channel Configuration

Any channel can be configured with a scope attribute, as the following example shows:

<int:channel id="threadLocalChannel" scope="thread"/>

Channel Interceptor Configuration

Message channels may also have interceptors, as described in Channel Interceptors. The <interceptors/> sub-element can be added to a <channel/> (or the more specific element types). You can provide the ref attribute to reference any Spring-managed object that implements the ChannelInterceptor interface, as the following example shows:

<int:channel id="exampleChannel">
    <int:interceptors>
        <ref bean="trafficMonitoringInterceptor"/>
    </int:interceptors>
</int:channel>

In general, we recommend defining the interceptor implementations in a separate location, since they usually provide common behavior that can be reused across multiple channels.

Global Channel Interceptor Configuration

Channel interceptors provide a clean and concise way of applying cross-cutting behavior per individual channel. If the same behavior should be applied on multiple channels, configuring the same set of interceptors for each channel would not be the most efficient way. To avoid repeated configuration while also enabling interceptors to apply to multiple channels, Spring Integration provides global interceptors. Consider the following pair of examples:

<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
    <bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>

<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>

Each <channel-interceptor/> element lets you define a global interceptor, which is applied on all channels that match any patterns defined by the pattern attribute. In the preceding case, the global interceptor is applied on the 'thing1' channel and all other channels that begin with 'thing2' or 'input' but not to channels starting with 'thing3' (since version 5.0).

The addition of this syntax to the pattern causes one possible (though perhaps unlikely) problem. If you have a bean named !thing1 and you included a pattern of !thing1 in your channel interceptor’s pattern patterns, it no longer matches. The pattern now matches all beans not named thing1. In this case, you can escape the ! in the pattern with \. The pattern \!thing1 matches a bean named !thing1.

The order attribute lets you manage where this interceptor is injected when there are multiple interceptors on a given channel. For example, channel 'inputChannel' could have individual interceptors configured locally (see below), as the following example shows:

<int:channel id="inputChannel">
  <int:interceptors>
    <int:wire-tap channel="logger"/>
  </int:interceptors>
</int:channel>

A reasonable question is “how is a global interceptor injected in relation to other interceptors configured locally or through other global interceptor definitions?” The current implementation provides a simple mechanism for defining the order of interceptor execution. A positive number in the order attribute ensures interceptor injection after any existing interceptors, while a negative number ensures that the interceptor is injected before existing interceptors. This means that, in the preceding example, the global interceptor is injected after (since its order is greater than 0) the 'wire-tap' interceptor configured locally. If there were another global interceptor with a matching pattern, its order would be determined by comparing the values of both interceptors' order attributes. To inject a global interceptor before the existing interceptors, use a negative value for the order attribute.

Note that both the order and pattern attributes are optional. The default value for order will be 0 and for pattern, the default is '*' (to match all channels).

Wire Tap

As mentioned earlier, Spring Integration provides a simple wire tap interceptor. You can configure a wire tap on any channel within an <interceptors/> element. Doing so is especially useful for debugging and can be used in conjunction with Spring Integration’s logging channel adapter as follows:

<int:channel id="in">
    <int:interceptors>
        <int:wire-tap channel="logger"/>
    </int:interceptors>
</int:channel>

<int:logging-channel-adapter id="logger" level="DEBUG"/>
The 'logging-channel-adapter' also accepts an 'expression' attribute so that you can evaluate a SpEL expression against the 'payload' and 'headers' variables. Alternatively, to log the full message toString() result, provide a value of true for the 'log-full-message' attribute. By default, it is false so that only the payload is logged. Setting it to true enables logging of all headers in addition to the payload. The 'expression' option provides the most flexibility (for example, expression="payload.user.name").

One of the common misconceptions about the wire tap and other similar components (Message Publishing Configuration) is that they are automatically asynchronous in nature. By default, wire tap as a component is not invoked asynchronously. Instead, Spring Integration focuses on a single unified approach to configuring asynchronous behavior: the message channel. What makes certain parts of the message flow synchronous or asynchronous is the type of Message Channel that has been configured within that flow. That is one of the primary benefits of the message channel abstraction. From the inception of the framework, we have always emphasized the need and the value of the message channel as a first-class citizen of the framework. It is not just an internal, implicit realization of the EIP pattern. It is fully exposed as a configurable component to the end user. So, the wire tap component is only responsible for performing the following tasks:

  • Intercept a message flow by tapping into a channel (for example, channelA)

  • Grab each message

  • Send the message to another channel (for example, channelB)

It is essentially a variation of the bridge pattern, but it is encapsulated within a channel definition (and hence easier to enable and disable without disrupting a flow). Also, unlike the bridge, it basically forks another message flow. Is that flow synchronous or asynchronous? The answer depends on the type of message channel that 'channelB' is. We have the following options: direct channel, pollable channel, and executor channel. The last two break the thread boundary, making communication over such channels asynchronous, because the dispatching of the message from that channel to its subscribed handlers happens on a different thread than the one used to send the message to that channel. That is what is going to make your wire-tap flow synchronous or asynchronous. It is consistent with other components within the framework (such as message publisher) and adds a level of consistency and simplicity by sparing you from worrying in advance (other than writing thread-safe code) about whether a particular piece of code should be implemented as synchronous or asynchronous. The actual wiring of two pieces of code (say, component A and component B) over a message channel is what makes their collaboration synchronous or asynchronous. You may even want to change from synchronous to asynchronous in the future, and message channel lets you do it swiftly without ever touching the code.

One final point regarding the wire tap is that, despite the rationale provided above for not being asynchronous by default, you should keep in mind that it is usually desirable to hand off the message as soon as possible. Therefore, it would be quite common to use an asynchronous channel option as the wire tap’s outbound channel. However, the asynchronous behavior is not enforced by default. There are a number of use cases that would break if we did, including that you might not want to break a transactional boundary. Perhaps you use the wire tap pattern for auditing purposes, and you do want the audit messages to be sent within the original transaction. As an example, you might connect the wire tap to a JMS outbound channel adapter. That way, you get the best of both worlds: 1) the sending of a JMS Message can occur within the transaction while 2) it is still a “fire-and-forget” action, thereby preventing any noticeable delay in the main message flow.

Starting with version 4.0, it is important to avoid circular references when an interceptor (such as the WireTap class) references a channel. You need to exclude such channels from those being intercepted by the current interceptor. This can be done with appropriate patterns or programmatically. If you have a custom ChannelInterceptor that references a channel, consider implementing VetoCapableInterceptor. That way, the framework asks the interceptor if it is OK to intercept each channel that is a candidate, based on the supplied pattern. You can also add runtime protection in the interceptor methods to ensure that the channel is not one that is referenced by the interceptor. The WireTap uses both of these techniques.

Starting with version 4.3, the WireTap has additional constructors that take a channelName instead of a MessageChannel instance. This can be convenient for Java configuration and when channel auto-creation logic is being used. The target MessageChannel bean is resolved from the provided channelName later, on the first interaction with the interceptor.

Channel resolution requires a BeanFactory, so the wire tap instance must be a Spring-managed bean.

This late-binding approach also allows simplification of typical wire-tapping patterns with Java DSL configuration, as the following example shows:

@Bean
public PollableChannel myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input")
            .get();
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}

Conditional Wire Taps

Wire taps can be made conditional by using the selector or selector-expression attributes. The selector references a MessageSelector bean, which can determine at runtime whether the message should go to the tap channel. Similarly, the selector-expression is a boolean SpEL expression that performs the same purpose: If the expression evaluates to true, the message is sent to the tap channel.

Global Wire Tap Configuration

It is possible to configure a global wire tap as a special case of the Global Channel Interceptor Configuration. To do so, configure a top level wire-tap element. Now, in addition to the normal wire-tap namespace support, the pattern and order attributes are supported and work in exactly the same way as they do for the channel-interceptor. The following example shows how to configure a global wire tap:

  • Java

  • XML

@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
    return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
A global wire tap provides a convenient way to configure a single-channel wire tap externally without modifying the existing channel configuration. To do so, set the pattern attribute to the target channel name. For example, you can use this technique to configure a test case to verify messages on a channel.