2.5 MessageBus

There is a rather obvious gap in what we have reviewed thus far. The MessageChannel provides a receive() method that returns a Message, and the MessageHandler provides a handle() method that accepts a Message, but how do the messages get passed from the channel to the handler? As mentioned earlier, the MessageBus provides a runtime form of inversion of control, and so the short answer is: you don't need to worry about it. Nevertheless since this is a reference guide, we will explore this in a bit of detail.

The MessageBus is an example of a mediator. It performs a number of roles - mostly by delegating to other strategies. One of its fundamental responsibilities is to manage registration of the MessageChannels and MessageHandlers. It provides the following methods:

public void registerChannel(String name, MessageChannel channel)
public void registerHandler(String name, MessageHandler handler, Subscription subscription)
public void registerHandler(String name, MessageHandler handler, Subscription subscription,
                            ConcurrencyPolicy concurrencyPolicy)

As those method signatures reveal, the message bus is handling several of the concerns here so that the channel and handler objects can be as simple as possible. These responsibilities include the creation and lifecycle management of message dispatchers, the activation of handler subscriptions, and the configuration of thread pools. The bus coordinates all of that behavior based upon the metadata provided via these registration methods, and typically developers will not even use this API directly since the metadata can be provided in XML and/or annotations. We will briefly take a look at each of those metadata objects.

The bus creates and manages dispatchers that pull messages from a channel in order to push those messages to handlers subscribed to that channel. Each channel has a DispatcherPolicy that contains metadata for configuring those dispatchers:

Table 2.2. Properties of the DispatcherPolicy

Property NameDefault ValueDescription
publishSubscribefalsewhether the dispatcher should attempt to publish to all of its handlers (rather than just one)
maxMessagesPerTask1maximum number of messages to retrieve per poll
receiveTimeout1000 (milliseconds)how long to block on the receive call (0 for no blocking, -1 for indefinite block)
rejectionLimit5maximum number of attempts to invoke handlers (e.g. no threads available)
retryInterval1000 (milliseconds)amount of time to wait between successive attempts to invoke handlers
shouldFailOnRejectionLimittruewhether to throw a MessageDeliveryException if the 'rejectionLimit' is reached - if this is set to 'false', then such undeliverable messages would be dropped silently


The bus registers handlers with a channel's dispatcher based upon the Subscription metadata provided to the registerHandler() method.

Table 2.3. Properties of the Subscription

Property NameDescription
channelthe channel instance to subscribe to (an object reference)
channelNamethe name of the channel to subscribe to - only used as a fallback if 'channel' is null
schedulethe scheduling metadata (see below)


The scheduling metadata is provided as an implementation of the Schedule interface. This is an abstraction designed to allow extensibility of schedulers for messaging tasks. Currently, there is a single implementation called PollingSchedule that provides the following properties:

Table 2.4. Properties of the PollingSchedule

Property NameDefault ValueDescription
periodN/Athe delay interval between each poll
initialDelay0the delay prior to the first poll
timeUnitTimeUnit.MILLISECONDStime unit for 'period' and 'initialDelay'
fixedRatefalse'false' indicates fixed-delay (no backlog)


The PollingSchedule constructor requires the 'period' value.

The ConcurrencyPolicy is an optional parameter to provide when registering a handler. When the MessageBus registers a handler, it will use these properties to configure that handler's thread pool. These parameters are configurable on a per-handler basis since handlers may have different performance characteristics and may have different expectations with regard to the volume of throughput. The following table lists the available properties and their default values:

Table 2.5. Properties of the ConcurrencyPolicy

Property NameDefault ValueDescription
coreSize1the core size of the thread pool
maxSize10the maximum size the thread pool can reach when under demand
queueCapacity0capacity of the queue which defers an increase of the pool size
keepAliveSeconds60how long added threads (beyond core size) should remain idle before being removed from the pool