Spring Integration components can be configured with XML elements that map directly to the terminology and concepts of enterprise integration. In many cases, the element names match those of the Enterprise Integration Patterns.
To enable Spring Integration's namespace support within your Spring configuration files, add the following namespace reference and schema mapping in your top-level 'beans' element:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:integration="http://www.springframework.org/schema/integration" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
You can choose any name after "xmlns:"; integration is used here for clarity, but you might prefer a shorter abbreviation. Of course if you are using an XML-editor or IDE support, then the availability of auto-completion may convince you to keep the longer name for clarity. Alternatively, you can create configuration files that use the Spring Integration schema as the primary namespace:
<beans:beans xmlns="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
When using this alternative, no prefix is necessary for the Spring Integration elements. On the other hand, if you want to define a generic Spring "bean" within the same configuration file, then a prefix would be required for the bean element (<beans:bean ... />). Since it is generally a good idea to modularize the configuration files themselves based on responsibility and/or architectural layer, you may find it appropriate to use the latter approach in the integration-focused configuration files, since generic beans are seldom necessary within those same files. For purposes of this documentation, we will assume the "integration" namespace is primary.
To create a Message Channel instance, you can use the generic 'channel' element:
<channel id="exampleChannel"/>
The default channel type is Point to Point. To create a Publish Subscribe channel, provide a value of true for the 'publish-subscribe' attribute of the channel element:
<channel id="exampleChannel" publish-subscribe="true"/>
When the MessageBus
detects and registers channels, it will establish a dispatcher for
each channel. The default dispatcher settings were previously displayed in
Table 2.2, “Properties of the DispatcherPolicy”. To customize these settings for a particular channel, add
the 'dispatcher-policy' sub-element and provide one or more of the attributes shown below:
<channel id="exampleChannel" publish-subscribe="true"> <dispatcher-policy max-messages-per-task="25" receive-timeout="10" rejection-limit="3" retry-interval="500" should-fail-on-rejection-limit="false"/> </channel>
To create a Datatype Channel that only
accepts messages containing a certain payload type, provide the fully-qualified class name in the
channel element's datatype
attribute:
<channel id="numberChannel" datatype="java.lang.Number"/>
Note that the type check passes for any type that is assignable to the channel's
datatype. In other words, the "numberChannel" above would accept messages whose payload is
java.lang.Integer
or java.lang.Double
. Multiple types can be
provided as a comma-delimited list:
<channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>
When using the "channel" element, the creation of the channel instances will be deferred to the ChannelFactory
defined on the MessageBus
(see below).
It is also possible to use more specific elements for the various channel types (as described in Section 2.4, “MessageChannel”). Depending on the channel, these may provide additional configuration options. Examples of each are shown below.
To create a QueueChannel
, use the "queue-channel" element.
By using this element, you can also specify the channel's capacity:
<queue-channel id="exampleChannel" capacity="25"/>
To create a PriorityChannel
, use the "priority-channel" element:
<priority-channel id="exampleChannel"/>
By default, the channel will consult the MessagePriority
value in the
message's header. However, a custom Comparator
reference may be
provided instead. Also, note that the PriorityChannel
(like the other types)
does support the "datatype" attribute. As with the "queue-channel", it also supports a "capacity" attribute.
The following example demonstrates all of these:
<priority-channel id="exampleChannel" datatype="example.Widget" comparator="widgetComparator" capacity="10"/>
The RendezvousChannel
does not provide any additional configuration options.
<rendezvous-channel id="exampleChannel"/>
The DirectChannel
does not provide any additional configuration options.
<direct-channel id="exampleChannel"/>
The ThreadLocalChannel
does not provide any additional configuration options.
<thread-local-channel id="exampleChannel"/>
Message channels may also have interceptors as described in Section 2.5, “ChannelInterceptor”. One or
more <interceptor> elements can be added as sub-elements of <channel> (or the more specific element
types). Provide the "ref" attribute to reference any Spring-managed object that implements the
ChannelInterceptor
interface:
<channel id="exampleChannel"> <interceptor ref="trafficMonitoringInterceptor"/> </channel>
In general, it is a good idea to define the interceptor implementations in a separate location since they usually provide common behavior that can be reused across multiple channels.
Each of the three endpoint types (source, target, and handler) has its own element in the namespace.
A SourceEndpoint
connects an implementation of the Source
interface to a MessageChannel
. The <source-endpoint/> therefore requires
these two references as well as the scheduling information so that the MessageBus
can
manage the message-receiving tasks.
<source-endpoint source="exampleSource" channel="exampleChannel"> <schedule period="5000"/> </source-endpoint>
A TargetEndpoint
connects a MessageChannel
to an implementation
of the Target
interface. The <target-endpoint/> requires these two references.
<target-endpoint input-channel="exampleChannel" target="exampleTarget"/>
When the MessageBus
registers the endpoint, it will activate the subscription
by assigning the endpoint to the input channel's dispatcher. The dispatcher is capable of handling multiple
endpoint subscriptions for its channel and delegates to a scheduler for managing the tasks that pull messages
from the channel and push them to the endpoints. To configure the polling period for an individual endpoint's
schedule, provide a 'schedule' sub-element with the 'period' in milliseconds:
<target-endpoint input-channel="exampleChannel" target="exampleTarget"> <schedule period="3000"/> </target-endpoint>
Note | |
---|---|
Individual endpoint schedules only apply for "Point-to-Point" channels, since in that case only a single subscriber needs to receive the message. On the other hand, when a Spring Integration channel is configured as a "Publish-Subscribe" channel, then the dispatcher will drive all endpoint notifications according to its own default schedule, and any 'schedule' element configured for those endpoints will be ignored. |
The <target-endpoint/> accepts additional attributes and child elements, but since these configuration options are also available for the <handler-endpoint/> element, they will be discussed below.
To create a Handler Endpoint instance, use the 'handler-endpoint' element with the 'input-channel' and 'handler' attributes:
<handler-endpoint input-channel="exampleChannel" handler="exampleHandler"/>
The configuration above assumes that "exampleHandler" is an actual implementation of the
MessageHandler
interface as described in Section 2.6, “MessageHandler”.
To delegate to an arbitrary method of any object, simply add the "method" attribute.
<handler-endpoint input-channel="exampleChannel" handler="somePojo" method="someMethod"/>
In either case (MessageHandler
or arbitrary object/method), when the handling
method returns a non-null value, the endpoint will attempt to send the reply message to an appropriate reply
channel. To determine the reply channel, it will first check if an "output-channel" was provided in the
endpoint configuration:
<handler-endpoint input-channel="exampleChannel" output-channel="replyChannel" handler="somePojo" method="someMethod"/>
If no "output-channel" is available, it will next check the message header's 'returnAddress
'
property. If that value is available, it will then check its type. If it is a MessageChannel
,
the reply message will be sent to that channel. If it is a String
, then the endpoint will
attempt to resolve the channel by performing a lookup in the ChannelRegistry
.
To reverse the order so that the 'returnAddress' is given priority over the endpoint's "output-channel", then provide the "return-address-overrides" attribute with a value of 'true':
<handler-endpoint input-channel="exampleChannel" output-channel="replyChannel" handler="somePojo" method="someMethod" return-address-overrides="true"/>
If neither is available, then a MessageHandlingException
will be thrown.
Handler and Target Endpoints also support MessageSelectors
as described in
Section 2.9, “MessageSelector”. To configure a selector with namespace support, simply add the
"selector" attribute to the endpoint definition and reference an implementation of the
MessageSelector
interface.
<handler-endpoint id="endpoint" input-channel="channel" handler="handler" selector="exampleSelector"/>
Another important configuration option for handler and target endpoints is the concurrency policy. Each endpoint is capable of managing a thread pool for its handler or target, and the values you provide for that pool's core and max size can make a substantial difference in how the handler or target performs under load. These settings are available per-endpoint since the performance characteristics of an endpoint's handler or target is one of the major factors to consider (the other major factor being the expected volume on the channel to which the endpoint subscribes). To enable concurrency for an endpoint that is configured with the XML namespace support, provide the 'concurrency' sub-element and one or more of the properties shown below:
<handler-endpoint input-channel="exampleChannel" handler="exampleHandler"> <concurrency core="5" max="25" queue-capacity="20" keep-alive="120"/> </handler-endpoint>
Recall the default concurrency policy values as listed in Table 2.5, “Properties of the ConcurrencyPolicy”.
If no concurrency settings are provided (i.e. a null
ConcurrencyPolicy
), the endpoint's handler or target will be invoked in the caller's thread.
Note that the "caller" is usually the dispatcher except in the case of a DirectChannel
(see the section called “DirectChannel” for more detail).
Tip | |
---|---|
For the concurrency settings, the default queue capacity of 0 triggers the creation of a
|
As described in Section 2.7, “MessageBus”, the MessageBus
plays a central role.
Nevertheless, its configuration is quite simple since it is primarily concerned with managing internal details
based on the configuration of channels and endpoints. The bus is aware of its host application context, and
therefore is also capable of auto-detecting the channels and endpoints. Typically, the
MessageBus
can be configured with a single empty element:
<message-bus/>
The Message Bus provides default error handling for its components in the form of a configurable error channel, and the 'message-bus' element accepts a reference with its 'error-channel' attribute:
<message-bus error-channel="errorChannel"/> <channel id="errorChannel" publish-subscribe="true" capacity="500"/>
When exceptions occur in a concurrent endpoint's execution of its MessageHandler
callback, those exceptions will be wrapped in ErrorMessages
and sent to the Message Bus'
'errorChannel' by default. To enable global error handling, simply register a handler on that channel. For
example, you can configure Spring Integration's PayloadTypeRouter
as the handler of
an endpoint that is subscribed to the 'errorChannel'. That router can then spread the error messages across
multiple channels based on Exception
type. However, since most of the errors will already
have been wrapped in MessageDeliveryException
or MessageHandlingException
,
the RootCauseErrorMessageRouter
is typically a better option.
The 'message-bus' element accepts several more optional attributes. First, you can control whether the
MessageBus
will be started automatically (the default) or will require explicit startup
by invoking its start()
method (MessageBus
implements
Spring's Lifecycle
interface):
<message-bus auto-startup="false"/>
Another configurable property is the size of the dispatcher thread pool. The dispatcher threads are responsible for polling channels and then passing the messages to handlers.
<message-bus dispatcher-pool-size="25"/>
When the endpoints are concurrency-enabled as described in the previous section, the invocation of the handling
methods will happen within the handler thread pool and not the dispatcher pool. However, when no concurrency
policy is provided to an endpoint, then it will be invoked in the dispatcher's thread (with the exception of
DirectChannels
).
Also, the Message Bus is capable of automatically creating channel instances if an endpoint registers a subscription by providing the name of a channel that the bus does not recognize.
<message-bus auto-create-channels="true"/>
Finally, the type of channel that gets created automatically by the bus can be customized by using the "channel-factory" attribute on the "message-bus" definition as in the following example:
<message-bus channel-factory="channelFactoryBean"/> <beans:bean id="channelFactoryBean" class="org.springframework.integration.channel.factory.PriorityChannelFactory"/>
With this definition, all the channels created automatically will be PriorityChannel
instances.
Without the "channel-factory" element, the Message Bus will assume a default QueueChannelFactory
.
The most convenient way to configure Source and Target adapters is by using the namespace support. The following examples demonstrate the namespace-based configuration of several sources and targets:
<jms-source id="jmsSource" connection-factory="connFactory" destination="inQueue"/> <!-- using the default "connectionFactory" reference --> <jms-target id="jmsTarget" destination="outQueue"/> <file-source id="fileSource" directory="/tmp/in"/> <file-target id="fileTarget" directory="/tmp/out"/> <rmi-source id="rmiSource" request-channel="rmiSourceInput"/> <rmi-target id="rmiTarget" local-channel="rmiTargetOutput" remote-channel="someRemoteChannel" host="somehost"/> <httpinvoker-source id="httpSource" name="/some/path" request-channel="httpInvokerInput"/> <httpinvoker-target id="httpTarget" channel="httpInvokerOutput" url="http://somehost/test"/> <mail-target id="mailTarget" host="somehost" username="someuser" password="somepassword"/> <ws-target id="wsTarget" uri="http://example.org" channel="wsOutput"/> <ftp-source id="ftpSource" host="example.org" username="someuser" password="somepassword" local-working-directory="/some/path" remote-working-directory="/some/path"/>
In the examples above, notice that simple implementations of the Source
and Target
interfaces do not accept any 'channel' references. To
connect such sources and targets to a channel, register them within an endpoint. For example, here
is a File source with an endpoint whose polling will be scheduled to execute every 30 seconds by the
MessageBus
.
<source-endpoint source="fileSource" channel="exampleChannel"> <schedule period="30000"/> </source-endpoint> <file-source id="fileSource" directory="/tmp/in"/>
Likewise, here is an example of a JMS target that is registered with a target-endpoint whose Messages will be received from the "exampleChannel" that is polled every 500 milliseconds.
<target-endpoint input-channel="exampleChannel" target="jmsTarget"> <schedule period="500"/> </target-endpoint> <jms-target id="jmsTarget" destination="targetDestination"/>