Message Channel Implementations

Spring Integration provides different message channel implementations. The following sections briefly describe each one.

PublishSubscribeChannel

The PublishSubscribeChannel implementation broadcasts any Message sent to it to all of its subscribed handlers. This is most often used for sending event messages, whose primary role is notification (as opposed to document messages, which are generally intended to be processed by a single handler). Note that the PublishSubscribeChannel is intended for sending only. Since it broadcasts to its subscribers directly when its send(Message) method is invoked, consumers cannot poll for messages (it does not implement PollableChannel and therefore has no receive() method). Instead, any subscriber must itself be a MessageHandler, and the subscriber’s handleMessage(Message) method is invoked in turn.

Prior to version 3.0, invoking the send method on a PublishSubscribeChannel that had no subscribers returned false. When used in conjunction with a MessagingTemplate, a MessageDeliveryException was thrown. Starting with version 3.0, the behavior has changed such that a send is always considered successful if at least the minimum subscribers are present (and successfully handle the message). This behavior can be modified by setting the minSubscribers property, which defaults to 0.

If you use a TaskExecutor, only the presence of the correct number of subscribers is used for this determination, because the actual handling of the message is performed asynchronously.

QueueChannel

The QueueChannel implementation wraps a queue. Unlike the PublishSubscribeChannel, the QueueChannel has point-to-point semantics. In other words, even if the channel has multiple consumers, only one of them should receive any Message sent to that channel. It provides a default no-argument constructor (providing an essentially unbounded capacity of Integer.MAX_VALUE) as well as a constructor that accepts the queue capacity, as the following listing shows:

public QueueChannel(int capacity)

A channel that has not reached its capacity limit stores messages in its internal queue, and the send(Message<?>) method returns immediately, even if no receiver is ready to handle the message. If the queue has reached capacity, the sender blocks until room is available in the queue. Alternatively, if you use the send method that has an additional timeout parameter, the queue blocks until either room is available or the timeout period elapses, whichever occurs first. Similarly, a receive() call returns immediately if a message is available on the queue, but, if the queue is empty, then a receive call may block until either a message is available or the timeout, if provided, elapses. In either case, it is possible to force an immediate return regardless of the queue’s state by passing a timeout value of 0. Note, however, that calls to the versions of send() and receive() with no timeout parameter block indefinitely.

PriorityChannel

Whereas the QueueChannel enforces first-in-first-out (FIFO) ordering, the PriorityChannel is an alternative implementation that allows for messages to be ordered within the channel based upon a priority. By default, the priority is determined by the priority header within each message. However, for custom priority determination logic, a comparator of type Comparator<Message<?>> can be provided to the PriorityChannel constructor.

RendezvousChannel

The RendezvousChannel enables a “direct-handoff” scenario, wherein a sender blocks until another party invokes the channel’s receive() method. The other party blocks until the sender sends the message. Internally, this implementation is quite similar to the QueueChannel, except that it uses a SynchronousQueue (a zero-capacity implementation of BlockingQueue). This works well in situations where the sender and receiver operate in different threads, but asynchronously dropping the message in a queue is not appropriate. In other words, with a RendezvousChannel, the sender knows that some receiver has accepted the message, whereas with a QueueChannel, the message would have been stored to the internal queue and potentially never received.

Keep in mind that all of these queue-based channels are storing messages in-memory only by default. When persistence is required, you can either provide a 'message-store' attribute within the 'queue' element to reference a persistent MessageStore implementation or you can replace the local channel with one that is backed by a persistent broker, such as a JMS-backed channel or channel adapter. The latter option lets you take advantage of any JMS provider’s implementation for message persistence, as discussed in JMS Support. However, when buffering in a queue is not necessary, the simplest approach is to rely upon the DirectChannel, discussed in the next section.

The RendezvousChannel is also useful for implementing request-reply operations. The sender can create a temporary, anonymous instance of RendezvousChannel, which it then sets as the 'replyChannel' header when building a Message. After sending that Message, the sender can immediately call receive (optionally providing a timeout value) in order to block while waiting for a reply Message. This is very similar to the implementation used internally by many of Spring Integration’s request-reply components.

DirectChannel

The DirectChannel has point-to-point semantics but otherwise is more similar to the PublishSubscribeChannel than any of the queue-based channel implementations described earlier. It implements the SubscribableChannel interface instead of the PollableChannel interface, so it dispatches messages directly to a subscriber. As a point-to-point channel, however, it differs from the PublishSubscribeChannel in that it sends each Message to a single subscribed MessageHandler.

In addition to being the simplest point-to-point channel option, one of its most important features is that it enables a single thread to perform the operations on “both sides” of the channel. For example, if a handler subscribes to a DirectChannel, then sending a Message to that channel triggers invocation of that handler’s handleMessage(Message) method directly in the sender’s thread, before the send() method invocation can return.

The key motivation for providing a channel implementation with this behavior is to support transactions that must span across the channel while still benefiting from the abstraction and loose coupling that the channel provides. If the send() call is invoked within the scope of a transaction, the outcome of the handler’s invocation (for example, updating a database record) plays a role in determining the ultimate result of that transaction (commit or rollback).

Since the DirectChannel is the simplest option and does not add any additional overhead that would be required for scheduling and managing the threads of a poller, it is the default channel type within Spring Integration. The general idea is to define the channels for an application, consider which of those need to provide buffering or to throttle input, and modify those to be queue-based PollableChannels. Likewise, if a channel needs to broadcast messages, it should not be a DirectChannel but rather a PublishSubscribeChannel. Later, we show how each of these channels can be configured.

The DirectChannel internally delegates to a message dispatcher to invoke its subscribed message handlers, and that dispatcher can have a load-balancing strategy exposed by load-balancer or load-balancer-ref attributes (mutually exclusive). The load balancing strategy is used by the message dispatcher to help determine how messages are distributed amongst message handlers when multiple message handlers subscribe to the same channel. As a convenience, the load-balancer attribute exposes an enumeration of values pointing to pre-existing implementations of LoadBalancingStrategy. A round-robin (load-balances across the handlers in rotation) and none (for the cases where one wants to explicitly disable load balancing) are the only available values. Other strategy implementations may be added in future versions. However, since version 3.0, you can provide your own implementation of the LoadBalancingStrategy and inject it by using the load-balancer-ref attribute, which should point to a bean that implements LoadBalancingStrategy, as the following example shows:

A FixedSubscriberChannel is a SubscribableChannel that only supports a single MessageHandler subscriber that cannot be unsubscribed. This is useful for high-throughput performance use-cases when no other subscribers are involved and no channel interceptors are needed.

<int:channel id="lbRefChannel">
  <int:dispatcher load-balancer-ref="lb"/>
</int:channel>

<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>

Note that the load-balancer and load-balancer-ref attributes are mutually exclusive.

The load-balancing also works in conjunction with a boolean failover property. If the failover value is true (the default), the dispatcher falls back to any subsequent handlers (as necessary) when preceding handlers throw exceptions. The order is determined by an optional order value defined on the handlers themselves or, if no such value exists, the order in which the handlers subscribed.

If a certain situation requires that the dispatcher always try to invoke the first handler and then fall back in the same fixed order sequence every time an error occurs, no load-balancing strategy should be provided. In other words, the dispatcher still supports the failover boolean property even when no load-balancing is enabled. Without load-balancing, however, the invocation of handlers always begins with the first, according to their order. For example, this approach works well when there is a clear definition of primary, secondary, tertiary, and so on. When using the namespace support, the order attribute on any endpoint determines the order.

Keep in mind that load-balancing and failover apply only when a channel has more than one subscribed message handler. When using the namespace support, this means that more than one endpoint shares the same channel reference defined in the input-channel attribute.

Starting with version 5.2, when failover is true, a failure of the current handler together with the failed message is logged under debug or info if configured respectively.

ExecutorChannel

The ExecutorChannel is a point-to-point channel that supports the same dispatcher configuration as DirectChannel (load-balancing strategy and the failover boolean property). The key difference between these two dispatching channel types is that the ExecutorChannel delegates to an instance of TaskExecutor to perform the dispatch. This means that the send method typically does not block, but it also means that the handler invocation may not occur in the sender’s thread. It therefore does not support transactions that span the sender and receiving handler.

The sender can sometimes block. For example, when using a TaskExecutor with a rejection policy that throttles the client (such as the ThreadPoolExecutor.CallerRunsPolicy), the sender’s thread can execute the method any time the thread pool is at its maximum capacity and the executor’s work queue is full. Since that situation would only occur in a non-predictable way, you should not rely upon it for transactions.

PartitionedChannel

Starting with version 6.1, a PartitionedChannel implementation is provided. This is an extension of AbstractExecutorChannel and represents point-to-point dispatching logic where the actual consumption is processed on a specific thread, determined by the partition key evaluated from a message sent to this channel. This channel is similar to the ExecutorChannel mentioned above, but with the difference that messages with the same partition key are always handled in the same thread, preserving ordering. It does not require an external TaskExecutor, but can be configured with a custom ThreadFactory (e.g. Thread.ofVirtual().name("partition-", 0).factory()). This factory is used to populate single-thread executors into a MessageDispatcher delegate, per partition. By default, the IntegrationMessageHeaderAccessor.CORRELATION_ID message header is used as the partition key. This channel can be configured as a simple bean:

@Bean
PartitionedChannel somePartitionedChannel() {
    return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}

The channel will have 3 partitions - dedicated threads; will use the partitionKey header to determine in which partition the message will be handled. See PartitionedChannel class Javadocs for more information.

FluxMessageChannel

The FluxMessageChannel is an org.reactivestreams.Publisher implementation for "sinking" sent messages into an internal reactor.core.publisher.Flux for on demand consumption by reactive subscribers downstream. This channel implementation is neither a SubscribableChannel, nor a PollableChannel, so only org.reactivestreams.Subscriber instances can be used to consume from this channel honoring back-pressure nature of reactive streams. On the other hand, the FluxMessageChannel implements a ReactiveStreamsSubscribableChannel with its subscribeTo(Publisher<Message<?>>) contract allowing receiving events from reactive source publishers, bridging a reactive stream into the integration flow. To achieve fully reactive behavior for the whole integration flow, such a channel must be placed between all the endpoints in the flow.

See Reactive Streams Support for more information about interaction with Reactive Streams.

Scoped Channel

Spring Integration 1.0 provided a ThreadLocalChannel implementation, but that has been removed as of 2.0. Now the more general way to handle the same requirement is to add a scope attribute to a channel. The value of the attribute can be the name of a scope that is available within the context. For example, in a web environment, certain scopes are available, and any custom scope implementations can be registered with the context. The following example shows a thread-local scope being applied to a channel, including the registration of the scope itself:

<int:channel id="threadScopedChannel" scope="thread">
     <int:queue />
</int:channel>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
        </map>
    </property>
</bean>

The channel defined in the previous example also delegates to a queue internally, but the channel is bound to the current thread, so the contents of the queue are similarly bound. That way, the thread that sends to the channel can later receive those same messages, but no other thread would be able to access them. While thread-scoped channels are rarely needed, they can be useful in situations where DirectChannel instances are being used to enforce a single thread of operation but any reply messages should be sent to a “terminal” channel. If that terminal channel is thread-scoped, the original sending thread can collect its replies from the terminal channel.

Now, since any channel can be scoped, you can define your own scopes in addition to thread-Local.