There is a rather obvious gap in what we have reviewed thus far. The
MessageChannel
provides a receive()
method that returns
a Message
, and the MessageHandler
provides a
handle()
method that accepts a Message
, but how do the
messages get passed from the channel to the handler? As mentioned earlier, the MessageBus
provides a runtime form of inversion of control, and so the short answer is: you don't need to worry about it.
Nevertheless since this is a reference guide, we will explore this in a bit of detail.
The MessageBus
is an example of a mediator. It performs a number of roles - mostly
by delegating to other strategies. One of its fundamental responsibilities is to manage registration of the
MessageChannels
and MessageHandlers
. It provides
the following methods:
public void registerChannel(String name, MessageChannel channel) public void registerHandler(String name, MessageHandler handler, Subscription subscription) public void registerHandler(String name, MessageHandler handler, Subscription subscription, ConcurrencyPolicy concurrencyPolicy)
As those method signatures reveal, the message bus is handling several of the concerns here so that the channel and handler objects can be as simple as possible. These responsibilities include the creation and lifecycle management of message dispatchers, the activation of handler subscriptions, and the configuration of thread pools. The bus coordinates all of that behavior based upon the metadata provided via these registration methods, and typically developers will not even use this API directly since the metadata can be provided in XML and/or annotations. We will briefly take a look at each of those metadata objects.
The bus creates and manages dispatchers that pull messages from a channel in order to push those messages to
handlers subscribed to that channel. Each channel has a DispatcherPolicy
that contains
metadata for configuring those dispatchers:
Table 2.2. Properties of the DispatcherPolicy
Property Name | Default Value | Description |
---|---|---|
publishSubscribe | false | whether the dispatcher should attempt to publish to all of its handlers (rather than just one) |
maxMessagesPerTask | 1 | maximum number of messages to retrieve per poll |
receiveTimeout | 1000 (milliseconds) | how long to block on the receive call (0 for no blocking, -1 for indefinite block) |
rejectionLimit | 5 | maximum number of attempts to invoke handlers (e.g. no threads available) |
retryInterval | 1000 (milliseconds) | amount of time to wait between successive attempts to invoke handlers |
shouldFailOnRejectionLimit | true | whether to throw a MessageDeliveryException if the 'rejectionLimit' is
reached - if this is set to 'false', then such undeliverable messages would be dropped silently |
The bus registers handlers with a channel's dispatcher based upon the Subscription
metadata provided to the registerHandler()
method.
Table 2.3. Properties of the Subscription
Property Name | Description |
---|---|
channel | the channel instance to subscribe to (an object reference) |
channelName | the name of the channel to subscribe to - only used as a fallback if 'channel' is null |
schedule | the scheduling metadata (see below) |
The scheduling metadata is provided as an implementation of the Schedule
interface. This is an abstraction designed to allow extensibility of schedulers for messaging tasks. Currently,
there is a single implementation named PollingSchedule
that provides the following
properties:
Table 2.4. Properties of the PollingSchedule
Property Name | Default Value | Description |
---|---|---|
period | N/A | the delay interval between each poll |
initialDelay | 0 | the delay prior to the first poll |
timeUnit | TimeUnit.MILLISECONDS | time unit for 'period' and 'initialDelay' |
fixedRate | false | 'false' indicates fixed-delay (no backlog) |
The PollingSchedule
constructor requires the 'period' value.
The ConcurrencyPolicy
is an optional parameter to provide when registering a handler.
When the MessageBus
registers a handler, it will use these properties to configure
that handler's thread pool. These parameters are configurable on a per-handler basis since handlers may have
different performance characteristics and may have different expectations with regard to the volume of
throughput. The following table lists the available properties and their default values:
Table 2.5. Properties of the ConcurrencyPolicy
Property Name | Default Value | Description |
---|---|---|
coreSize | 1 | the core size of the thread pool |
maxSize | 10 | the maximum size the thread pool can reach when under demand |
queueCapacity | 0 | capacity of the queue which defers an increase of the pool size |
keepAliveSeconds | 60 | how long added threads (beyond core size) should remain idle before being removed from the pool |