3. Message Channels

While the Message plays the crucial role of encapsulating data, it is the MessageChannel that decouples message producers from message consumers.

3.1 The MessageChannel Interface

Spring Integration's top-level MessageChannel interface is defined as follows.

public interface MessageChannel {

    String getName();

    boolean send(Message message);

    boolean send(Message message, long timeout);
}

When sending a message, the return value will be true if the message is sent successfully. If the send call times out or is interrupted, then it will return false.

3.1.1 PollableChannel

Since Message Channels may or may not buffer Messages (as discussed in the overview), there are two sub-interfaces defining the buffering (pollable) and non-buffering (subscribable) channel behavior. Here is the definition of PollableChannel.

public interface PollableChannel extends MessageChannel {

    Message<?> receive();

    Message<?> receive(long timeout);

    List<Message<?>> clear();

    List<Message<?>> purge(MessageSelector selector);

}

Similar to the send methods, when receiving a message, the return value will be null in the case of a timeout or interrupt.

3.1.2 SubscribableChannel

The SubscribableChannel base interface is implemented by channels that send Messages directly to their subscribed MessageHandlers. Therefore, they do not provide receive methods for polling, but instead define methods for managing those subscribers:

public interface SubscribableChannel extends MessageChannel {

    boolean subscribe(MessageHandler handler);

    boolean unsubscribe(MessageHandler handler);

}

3.2 Message Channel Implementations

Spring Integration provides several different Message Channel implementations. Each is briefly described in the sections below.

3.2.1 PublishSubscribeChannel

The PublishSubscribeChannel implementation broadcasts any Message sent to it to all of its subscribed handlers. This is most often used for sending Event Messages whose primary role is notification as opposed to Document Messages which are generally intended to be processed by a single handler. Note that the PublishSubscribeChannel is intended for sending only. Since it broadcasts to its subscribers directly when its send(Message) method is invoked, consumers cannot poll for Messages (it does not implement PollableChannel and therefore has no receive() method). Instead, any subscriber must be a MessageHandler itself, and the subscriber's handleMessage(Message) method will be invoked in turn.

3.2.2 QueueChannel

The QueueChannel implementation wraps a queue. Unlike the PublishSubscribeChannel, the QueueChannel has point-to-point semantics. In other words, even if the channel has multiple consumers, only one of them should receive any Message sent to that channel. It provides a default no-argument constructor (providing an essentially unbounded capacity of Integer.MAX_VALUE) as well as a constructor that accepts the queue capacity:

public QueueChannel(int capacity)

A channel that has not reached its capacity limit will store messages in its internal queue, and the send() method will return immediately even if no receiver is ready to handle the message. If the queue has reached capacity, then the sender will block until room is available. Or, if using the send call that accepts a timeout, it will block until either room is available or the timeout period elapses, whichever occurs first. Likewise, a receive call will return immediately if a message is available on the queue, but if the queue is empty, then a receive call may block until either a message is available or the timeout elapses. In either case, it is possible to force an immediate return regardless of the queue's state by passing a timeout value of 0. Note however, that calls to the no-arg versions of send() and receive() will block indefinitely.

3.2.3 PriorityChannel

Whereas the QueueChannel enforces first-in/first-out (FIFO) ordering, the PriorityChannel is an alternative implementation that allows for messages to be ordered within the channel based upon a priority. By default the priority is determined by the 'priority' header within each message. However, for custom priority determination logic, a comparator of type Comparator<Message<?>> can be provided to the PriorityChannel's constructor.

3.2.4 RendezvousChannel

The RendezvousChannel enables a "direct-handoff" scenario where a sender will block until another party invokes the channel's receive() method or vice-versa. Internally, this implementation is quite similar to the QueueChannel except that it uses a SynchronousQueue (a zero-capacity implementation of BlockingQueue). This works well in situations where the sender and receiver are operating in different threads but simply dropping the message in a queue asynchronously is not appropriate. In other words, with a RendezvousChannel at least the sender knows that some receiver has accepted the message, whereas with a QueueChannel, the message would have been stored to the internal queue and potentially never received.

[Tip]Tip

Keep in mind that all of these queue-based channels are storing messages in-memory only. When persistence is required, you can either invoke a database operation within a handler or use Spring Integration's support for JMS-based Channel Adapters. The latter option allows you to take advantage of any JMS provider's implementation for message persistence, and it will be discussed in Chapter 22, JMS Support. However, when buffering in a queue is not necessary, the simplest approach is to rely upon the DirectChannel discussed next.

The RendezvousChannel is also useful for implementing request-reply operations. The sender can create a temporary, anonymous instance of RendezvousChannel which it then sets as the 'replyChannel' header when building a Message. After sending that Message, the sender can immediately call receive (optionally providing a timeout value) in order to block while waiting for a reply Message. This is very similar to the implementation used internally by many of Spring Integration's request-reply components.

3.2.5 DirectChannel

The DirectChannel has point-to-point semantics but otherwise is more similar to the PublishSubscribeChannel than any of the queue-based channel implementations described above. It implements the SubscribableChannel interface instead of the PollableChannel interface, so it dispatches Messages directly to a subscriber. As a point-to-point channel, however, it differs from the PublishSubscribeChannel in that it will only send each Message to a single subscribed MessageHandler.

In addition to being the simplest point-to-point channel option, one of its most important features is that it enables a single thread to perform the operations on "both sides" of the channel. For example, if a handler is subscribed to a DirectChannel, then sending a Message to that channel will trigger invocation of that handler's handleMessage(Message) method directly in the sender's thread, before the send() method invocation can return.

The key motivation for providing a channel implementation with this behavior is to support transactions that must span across the channel while still benefiting from the abstraction and loose coupling that the channel provides. If the send call is invoked within the scope of a transaction, then the outcome of the handler's invocation (e.g. updating a database record) will play a role in determining the ultimate result of that transaction (commit or rollback).

[Note]Note
Since the DirectChannel is the simplest option and does not add any additional overhead that would be required for scheduling and managing the threads of a poller, it is the default channel type within Spring Integration. The general idea is to define the channels for an application and then to consider which of those need to provide buffering or to throttle input, and then modify those to be queue-based PollableChannels. Likewise, if a channel needs to broadcast messages, it should not be a DirectChannel but rather a PublishSubscribeChannel. Below you will see how each of these can be configured.

The DirectChannel internally delegates to a Message Dispatcher to invoke its subscribed Message Handlers, and that dispatcher can have a load-balancing strategy. The load-balancer determines how invocations will be ordered in the case that there are multiple handlers subscribed to the same channel. When using the namespace support described below, the default strategy is "round-robin" which essentially load-balances across the handlers in rotation.

[Note]Note
The "round-robin" strategy is currently the only implementation available out-of-the-box in Spring Integration. Other strategy implementations may be added in future versions.

The load-balancer also works in combination with a boolean failover property. If the "failover" value is true (the default), then the dispatcher will fall back to any subsequent handlers as necessary when preceding handlers throw Exceptions. The order is determined by an optional order value defined on the handlers themselves or, if no such value exists, the order in which the handlers are subscribed.

If a certain situation requires that the dispatcher always try to invoke the first handler, then fallback in the same fixed order sequence every time an error occurs, no load-balancing strategy should be provided. In other words, the dispatcher still supports the failover boolean property even when no load-balancing is enabled. Without load-balancing, however, the invocation of handlers will always begin with the first according to their order. For example, this approach works well when there is a clear definition of primary, secondary, tertiary, and so on. When using the namespace support, the "order" attribute on any endpoint will determine that order.

[Note]Note
Keep in mind that load-balancing and failover only apply when a channel has more than one subscribed Message Handler. When using the namespace support, this means that more than one endpoint shares the same channel reference in the "input-channel" attribute.

3.2.6 ExecutorChannel

The ExecutorChannel is a point-to-point channel that supports the same dispatcher configuration as DirectChannel (load-balancing strategy and the failover boolean property). The key difference between these two dispatching channel types is that the ExecutorChannel delegates to an instance of TaskExecutor to perform the dispatch. This means that the send method typically will not block, but it also means that the handler invocation may not occur in the sender's thread. It therefore does not support transactions spanning the sender and receiving handler.

[Tip]Tip
Note that there are occasions where the sender may block. For example, when using a TaskExecutor with a rejection-policy that throttles back on the client (such as the ThreadPoolExecutor.CallerRunsPolicy), the sender's thread will execute the method directly anytime the thread pool is at its maximum capacity and the executor's work queue is full. Since that situation would only occur in a non-predictable way, that obviously cannot be relied upon for transactions.

3.2.7 ThreadLocalChannel

The final channel implementation type is ThreadLocalChannel. This channel also delegates to a queue internally, but the queue is bound to the current thread. That way the thread that sends to the channel will later be able to receive those same Messages, but no other thread would be able to access them. While probably the least common type of channel, this is useful for situations where DirectChannels are being used to enforce a single thread of operation but any reply Messages should be sent to a "terminal" channel. If that terminal channel is a ThreadLocalChannel, the original sending thread can collect its replies from it.

3.3 Channel Interceptors

One of the advantages of a messaging architecture is the ability to provide common behavior and capture meaningful information about the messages passing through the system in a non-invasive way. Since the Messages are being sent to and received from MessageChannels, those channels provide an opportunity for intercepting the send and receive operations. The ChannelInterceptor strategy interface provides methods for each of those operations:

public interface ChannelInterceptor {

    Message<?> preSend(Message<?> message, MessageChannel channel);

    void postSend(Message<?> message, MessageChannel channel, boolean sent);

    boolean preReceive(MessageChannel channel);

    Message<?> postReceive(Message<?> message, MessageChannel channel);
}

After implementing the interface, registering the interceptor with a channel is just a matter of calling:

channel.addInterceptor(someChannelInterceptor);

The methods that return a Message instance can be used for transforming the Message or can return 'null' to prevent further processing (of course, any of the methods can throw a RuntimeException). Also, the preReceive method can return 'false' to prevent the receive operation from proceeding.

[Note]Note
Keep in mind that receive() calls are only relevant for PollableChannels. In fact the SubscribableChannel interface does not even define a receive() method. The reason for this is that when a Message is sent to a SubscribableChannel it will be sent directly to one or more subscribers depending on the type of channel (e.g. a PublishSubscribeChannel sends to all of its subscribers). Therefore, the preReceive(..) and postReceive(..) interceptor methods are only invoked when the interceptor is applied to a PollableChannel.

Spring Integration also provides an implementation of the Wire Tap pattern. It is a simple interceptor that sends the Message to another channel without otherwise altering the existing flow. It can be very useful for debugging and monitoring. An example is shown in Section 3.5.10, “Wire Tap”.

Because it is rarely necessary to implement all of the interceptor methods, a ChannelInterceptorAdapter class is also available for sub-classing. It provides no-op methods (the void method is empty, the Message returning methods return the Message as-is, and the boolean method returns true). Therefore, it is often easiest to extend that class and just implement the method(s) that you need as in the following example.

public class CountingChannelInterceptor extends ChannelInterceptorAdapter {

    private final AtomicInteger sendCount = new AtomicInteger();

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        sendCount.incrementAndGet();
        return message;
    }
}

[Tip]Tip
The order of invocation for the interceptor methods depends on the type of channel. As described above, the queue-based channels are the only ones where the receive method is intercepted in the first place. Additionally, the relationship between send and receive interception depends on the timing of separate sender and receiver threads. For example, if a receiver is already blocked while waiting for a message the order could be: preSend, preReceive, postReceive, postSend. However, if a receiver polls after the sender has placed a message on the channel and already returned, the order would be: preSend, postSend, (some-time-elapses) preReceive, postReceive. The time that elapses in such a case depends on a number of factors and is therefore generally unpredictable (in fact, the receive may never happen!). Obviously, the type of queue also plays a role (e.g. rendezvous vs. priority). The bottom line is that you cannot rely on the order beyond the fact that preSend will precede postSend and preReceive will precede postReceive.

3.4 MessagingTemplate

As you will see when the endpoints and their various configuration options are introduced, Spring Integration provides a foundation for messaging components that enables non-invasive invocation of your application code from the messaging system. However, sometimes it is necessary to invoke the messaging system from your application code. For convenience when implementing such use-cases, Spring Integration provides a MessagingTemplate that supports a variety of operations across the Message Channels, including request/reply scenarios. For example, it is possible to send a request and wait for a reply.

MessagingTemplate template = new MessagingTemplate();

Message reply = template.sendAndReceive(new StringMessage("test"), someChannel);

In that example, a temporary anonymous channel would be created internally by the template. The 'sendTimeout' and 'receiveTimeout' properties may also be set on the template, and other exchange types are also supported.

public boolean send(final Message<?> message, final MessageChannel channel) { ... }

public Message<?> sendAndReceive(final Message<?> request, final MessageChannel channel) { .. }

public Message<?> receive(final PollableChannel<?> channel) { ... }

[Note]Note

A less invasive approach that allows you to invoke simple interfaces with payload and/or header values instead of Message instances is described in Section 16.1, “GatewayProxyFactoryBean”.

3.5 Configuring Message Channels

To create a Message Channel instance, you can use the 'channel' element:

<channel id="exampleChannel"/>

The default channel type is Point to Point. To create a Publish Subscribe channel, use the "publish-subscribe-channel" element:

<publish-subscribe-channel id="exampleChannel"/>

To create a Datatype Channel that only accepts messages containing a certain payload type, provide the fully-qualified class name in the channel element's datatype attribute:

<channel id="numberChannel" datatype="java.lang.Number"/>

Note that the type check passes for any type that is assignable to the channel's datatype. In other words, the "numberChannel" above would accept messages whose payload is java.lang.Integer or java.lang.Double. Multiple types can be provided as a comma-delimited list:

<channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>

When using the "channel" element without any sub-elements, it will create a DirectChannel instance (a SubscribableChannel).

However, you can alternatively provide a variety of "queue" sub-elements to create any of the pollable channel types (as described in Section 3.2, “Message Channel Implementations”). Examples of each are shown below.

3.5.1 DirectChannel Configuration

As mentioned above, DirectChannel is the default type.

<channel id="directChannel"/>

A default channel will have a round-robin load-balancer and will also have failover enabled (See the discussion in Section 3.2.5, “DirectChannel” for more detail). To disable one or both of these, add a <dispatcher/> sub-element and configure the attributes:

<channel id="failFastChannel">
    <dispatcher failover="false"/>
</channel>

<channel id="channelWithFixedOrderSequenceFailover">
    <dispatcher load-balancer="none"/>
</channel>

3.5.2 QueueChannel Configuration

To create a QueueChannel, use the "queue" sub-element. You may specify the channel's capacity:

<channel id="queueChannel">
    <queue capacity="25"/>
</channel>

[Note]Note
If you do not provide a value for the 'capacity' attribute on this <queue/> sub-element, the resulting queue will be unbounded. To avoid issues such as OutOfMemoryErrors, it is highly recommended to set an explicit value for a bounded queue.

3.5.3 PublishSubscribeChannel Configuration

To create a PublishSubscribeChannel, use the "publish-subscribe-channel" element. When using this element, you can also specify the "task-executor" used for publishing Messages (if none is specified it simply publishes in the sender's thread):

<publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>

If you are providing a Resequencer or Aggregator downstream from a PublishSubscribeChannel, then you can set the 'apply-sequence' property on the channel to true. That will indicate that the channel should set the sequence-size and sequence-number Message headers as well as the correlation id prior to passing the Messages along. For example, if there are 5 subscribers, the sequence-size would be set to 5, and the Messages would have sequence-number header values ranging from 1 to 5.

<publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>

[Note]Note
The 'apply-sequence' value is false by default so that a Publish Subscribe Channel can send the exact same Message instances to multiple outbound channels. Since Spring Integration enforces immutability of the payload and header references, the channel creates new Message instances with the same payload reference but different header values when the flag is set to true.

3.5.4 ExecutorChannel

To create an ExecutorChannel, add the <dispatcher> sub-element along with a 'task-executor' attribute. Its value can reference any TaskExecutor within the context. For example, this enables configuration of a thread-pool for dispatching messages to subscribed handlers. As mentioned above, this does break the "single-threaded" execution context between sender and receiver so that any active transaction context will not be shared by the invocation of the handler (i.e. the handler may throw an Exception, but the send invocation has already returned successfully).

<channel id="executorChannel">
    <dispatcher task-executor="someExecutor"/>
</channel>

[Note]Note
The "load-balancer" and "failover" options are also both available on the dispatcher sub-element as described above in Section 3.5.1, “DirectChannel Configuration”. The same defaults apply as well. So, the channel will have a round-robin load-balancing strategy with failover enabled unless explicit configuration is provided for one or both of those attributes.
<channel id="executorChannelWithoutFailover">
    <dispatcher task-executor="someExecutor" failover="false"/>
</channel>

3.5.5 PriorityChannel Configuration

To create a PriorityChannel, use the "priority-queue" sub-element:

<channel id="priorityChannel">
    <priority-queue capacity="20"/>
</channel>

By default, the channel will consult the MessagePriority header of the message. However, a custom Comparator reference may be provided instead. Also, note that the PriorityChannel (like the other types) does support the "datatype" attribute. As with the QueueChannel, it also supports a "capacity" attribute. The following example demonstrates all of these:

<channel id="priorityChannel" datatype="example.Widget">
    <priority-queue comparator="widgetComparator"
                    capacity="10"/>
</channel>

3.5.6 RendezvousChannel Configuration

A RendezvousChannel is created when the queue sub-element is a <rendezvous-queue>. It does not provide any additional configuration options to those described above, and its queue does not accept any capacity value since it is a 0-capacity direct handoff queue.

<channel id="rendezvousChannel"/>
    <rendezvous-queue/>
</channel>

3.5.7 ThreadLocalChannel Configuration

The ThreadLocalChannel does not provide any additional configuration options.

<thread-local-channel id="threadLocalChannel"/>

3.5.8 Channel Interceptor Configuration

Message channels may also have interceptors as described in Section 3.3, “Channel Interceptors”. The <interceptors> sub-element can be added within <channel> (or the more specific element types). Provide the "ref" attribute to reference any Spring-managed object that implements the ChannelInterceptor interface:

<channel id="exampleChannel">
    <interceptors>
        <ref bean="trafficMonitoringInterceptor"/>
    </interceptors>
</channel>

In general, it is a good idea to define the interceptor implementations in a separate location since they usually provide common behavior that can be reused across multiple channels.

3.5.9 Global Channel Interceptor Configuration

Channel Interceptors allow you for a clean and concise way of applying cross-cutting behavior per individual channel. But what if the same behavior should be applied on multiple channels, configuring the same set of interceptors for each channel would not be the most efficient way. The better way would be to configure interceptors globally and apply them on multiple channels in one shot. Spring Integration provides capabilities to configure Global Interceptors and apply them on multiple channels. Look at the example below:

<int:channel-interceptor pattern="input*, bar*, foo" order="3">
    <bean class="foo.barSampleInterceptor"/>
</int:channel-interceptor>

or

<int:channel-interceptor ref="myInterceptor" pattern="input*, bar*, foo" order="3"/>

<bean id="myInterceptor" class="foo.barSampleInterceptor"/>

<channel-interceptor> element allows you to define a global interceptor which will be applied on all channels that match patterns defined via pattern attribute. In the above case the global interceptor will be applied on 'foo' channel and all other channels that begin with 'bar' and 'input'. The order attribute allows you to manage the place where this interceptor will be injected. For example, channel 'inputChannel' could have individual interceptors configured locally (see below):

<int:channel id="inputChannel"> 
  <int:interceptors>
    <int:wire-tap channel="logger"/> 
  </int:interceptors>
</int:channel>

The reasonable question would be how global interceptor will be injected in relation to other interceptors configured locally or through other global interceptor definitions? Current implementation provides a very simple and clever mechanism of handling this. Positive number in the order attribute will ensure interceptor injection after existing interceptors and negative number will ensure that such interceptors injected before. This means that in the above example global interceptor will be injected AFTER (since its order is greater then 0) 'wire-tap' interceptor configured locally. If there was another global interceptor with matching pattern their order would be determined based on who's got the higher or lower value in order attribute. To inject global interceptor BEFORE the existing interceptors use negative value for the order attribute.

[Note]Note
Note that order and pattern attributes are optional. The default value for order will be 0 and for pattern is '*'

3.5.10 Wire Tap

As mentioned above, Spring Integration provides a simple Wire Tap interceptor out of the box. You can configure a Wire Tap on any channel within an 'interceptors' element. This is especially useful for debugging, and can be used in conjunction with Spring Integration's logging Channel Adapter as follows:

 <channel id="in">
     <interceptors>
         <wire-tap channel="logger"/>
     </interceptors>
 </channel>

 <logging-channel-adapter id="logger" level="DEBUG"/>

[Tip]Tip
The 'logging-channel-adapter' also accepts a boolean attribute: 'log-full-message'. That is false by default so that only the payload is logged. Setting that to true enables logging of all headers in addition to the payload.

[Note]Note

If namespace support is enabled, there are also two special channels defined within the context by default: errorChannel and nullChannel. The 'nullChannel' acts like /dev/null, simply logging any Message sent to it at DEBUG level and returning immediately. Any time you face channel resolution errors for a reply that you don't care about, you can set the affected component's 'output-channel' to reference 'nullChannel' (the name 'nullChannel' is reserved within the context). The 'errorChannel' is used internally for sending error messages, and it can be overridden with a custom configuration. It is discussed in greater detail in Section B.4, “Error Handling”.