Spring Integration components can be configured with XML elements that map directly to the terminology and concepts of enterprise integration. In many cases, the element names match those of the Enterprise Integration Patterns.
To enable Spring Integration's namespace support within your Spring configuration files, add the following namespace reference and schema mapping in your top-level 'beans' element:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:integration="http://www.springframework.org/schema/integration" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
You can choose any name after "xmlns:"; integration is used here for clarity, but you might prefer a shorter abbreviation. Of course if you are using an XML-editor or IDE support, then the availability of auto-completion may convince you to keep the longer name for clarity. Alternatively, you can create configuration files that use the Spring Integration schema as the primary namespace:
<beans:beans xmlns="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
When using this alternative, no prefix is necessary for the Spring Integration elements. On the other hand, if you want to define a generic Spring "bean" within the same configuration file, then a prefix would be required for the bean element (<beans:bean ... />). Since it is generally a good idea to modularize the configuration files themselves based on responsibility and/or architectural layer, you may find it appropriate to use the latter approach in the integration-focused configuration files, since generic beans are seldom necessary within those same files. For purposes of this documentation, we will assume the "integration" namespace is primary.
To create a Message Channel instance, you can use the generic 'channel' element:
<channel id="exampleChannel"/>
The default channel type is Point to Point. To create a Publish Subscribe channel, use the "publish-subscribe-channel" element:
<publish-subscribe-channel id="exampleChannel"/>
To create a Datatype Channel that only
accepts messages containing a certain payload type, provide the fully-qualified class name in the
channel element's datatype
attribute:
<channel id="numberChannel" datatype="java.lang.Number"/>
Note that the type check passes for any type that is assignable to the channel's
datatype. In other words, the "numberChannel" above would accept messages whose payload is
java.lang.Integer
or java.lang.Double
. Multiple types can be
provided as a comma-delimited list:
<channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>
When using the "channel" element, the creation of the channel instances will be deferred to the ChannelFactory
bean whose name is "channelFactory" if defined within the ApplicationContext. If no such bean is defined, the default factory will
be used. The default implementation is QueueChannelFactory
.
It is also possible to use more specific elements for the various channel types (as described in Section 2.4, “MessageChannel”). Depending on the channel, these may provide additional configuration options. Examples of each are shown below.
To create a QueueChannel
, use the "queue-channel" element.
By using this element, you can also specify the channel's capacity:
<queue-channel id="exampleChannel" capacity="25"/>
To create a PublishSubscribeChannel
, use the "publish-subscribe-channel" element.
When using this element, you can also specify the "task-executor" used for publishing
Messages (if none is specified it simply publishes in the sender's thread):
<publish-subscribe-channel id="exampleChannel" task-executor="someTaskExecutor"/>
To create a PriorityChannel
, use the "priority-channel" element:
<priority-channel id="exampleChannel"/>
By default, the channel will consult the MessagePriority
header of the
message. However, a custom Comparator
reference may be
provided instead. Also, note that the PriorityChannel
(like the other types)
does support the "datatype" attribute. As with the "queue-channel", it also supports a "capacity" attribute.
The following example demonstrates all of these:
<priority-channel id="exampleChannel" datatype="example.Widget" comparator="widgetComparator" capacity="10"/>
The RendezvousChannel
does not provide any additional configuration options.
<rendezvous-channel id="exampleChannel"/>
The DirectChannel
does not provide any additional configuration options.
<direct-channel id="exampleChannel"/>
The ThreadLocalChannel
does not provide any additional configuration options.
<thread-local-channel id="exampleChannel"/>
Message channels may also have interceptors as described in Section 2.5, “ChannelInterceptor”. One or
more <interceptor> elements can be added as sub-elements of <channel> (or the more specific element
types). Provide the "ref" attribute to reference any Spring-managed object that implements the
ChannelInterceptor
interface:
<channel id="exampleChannel"> <interceptor ref="trafficMonitoringInterceptor"/> </channel>
In general, it is a good idea to define the interceptor implementations in a separate location since they usually provide common behavior that can be reused across multiple channels.
Each of the endpoint types (channel-adapter, service-activator, etc) has its own element in the namespace.
A "channel-adapter" element can connect any implementation of the MessageSource
interface to a MessageChannel
. When the MessageBus
registers the endpoint, it will activate the subscription and if necessary create a poller for the endpoint.
The Message Bus delegates to a TaskScheduler
for scheduling the poller based
on its schedule. To configure the polling 'period' or 'cronExpression' for an individual channel-adapter's
schedule, provide a 'poller' sub-element with the 'period' (in milliseconds) or 'cron' attribute:
<channel-adapter source="source1" channel="channel1"> <poller period="5000"/> </channel-adapter> <channel-adapter source="source2" channel="channel2"> <poller cron="30 * * * * ?"/> </channel-adapter>
Note | |
---|---|
Cron support does require the Quartz JAR and its transitive dependencies. Also, keep in mind that pollers only
apply for |
A "channel-adapter" element can also connect a MessageChannel
to any implementation
of the MessageTarget
interface.
<channel-adapter channel="exampleChannel" target="exampleTarget"/>
Again, it is possible to provide a poller:
<channel-adapter channel="exampleChannel" target="exampleTarget"> <poller period="3000"/> </channel-adapter>
To create a Service Activator, use the 'service-activator' element with the 'input-channel' and 'ref' attributes:
<service-activator input-channel="exampleChannel" ref="exampleHandler"/>
The configuration above assumes that "exampleHandler" is an actual implementation of the
MessageHandler
interface as described in Section 2.6, “MessageHandler”.
To delegate to an arbitrary method of any object, simply add the "method" attribute.
<service-activator input-channel="exampleChannel" ref="somePojo" method="someMethod"/>
In either case (MessageHandler
or arbitrary object/method), when the handling
method returns a non-null value, the endpoint will attempt to send the reply message to an appropriate reply
channel. To determine the reply channel, it will first check if the NEXT_TARGET
header contains
a non-null value, next it will check if an "output-channel" was provided in the endpoint configuration:
<service-activator input-channel="exampleChannel" output-channel="replyChannel" ref="somePojo" method="someMethod"/>
If no "output-channel" is available, it will finally check the message header's RETURN_ADDRESS
property. If that value is available, it will then check its type. If it is a MessageTarget
,
the reply message will be sent to that target. If it is a String
, then the endpoint will
attempt to resolve the channel by performing a lookup in the ChannelRegistry
.
If the target cannot be resolved, then a MessageHandlingException
will be thrown.
Message Endpoints also support MessageSelectors
as described in
Section 2.9, “MessageSelector”. To configure a selector with namespace support, simply add the
"selector" attribute to the endpoint definition and reference an implementation of the
MessageSelector
interface.
<service-activator id="endpoint" input-channel="channel" ref="handler" selector="exampleSelector"/>
Another important configuration option for message endpoints is the inclusion of
EndpointInterceptors
. The interface is defined as follows:
public interface EndpointInterceptor { Message<?> preHandle(Message<?> requestMessage); Message<?> aroundHandle(Message<?> requestMessage, MessageHandler handler); Message<?> postHandle(Message<?> replyMessage); }
There is also an EndpointInterceptorAdapter that provides no-op methods for convenience when subclassing. Within an endpoint configuration, interceptors can be added within the <interceptors> sub-element. It accepts either "ref" elements or inner "beans":
<service-activator id="exampleEndpoint" input-channel="requestChannel" ref="someObject" method="someMethod" output-channel="replyChannel"> <poller period="1000"/> <interceptors> <ref bean="someInterceptor"/> <beans:bean class="example.AnotherInterceptor"/> </interceptors> </service-activator>
Spring Integration also provides transaction support for the pollers so that each receive-and-forward operation can be performed as an atomic unit-of-work. To configure transactions for a poller, simply add the <transactional/> sub-element. The attributes for this element should be familiar to anyone who has experience with Spring's Transaction management:
<service-activator id="exampleEndpoint" input-channel="requestChannel" ref="someObject" method="someMethod" output-channel="replyChannel"> <poller period="1000"> <transactional transaction-manager="txManager" propagation="REQUIRES_NEW" isolation="REPEATABLE_READ" timeout="10000" read-only="false"/> </poller> </service-activator>
Spring Integration also provides support for executing the pollers with a
TaskExceutor
. This enables concurrency for an endpoint or group of
endpoints. As a convenience, there is also namespace support for creating a simple thread pool executor.
The <pool-executor/> element defines attributes for common concurrency settings such as core-size,
max-size, and queue-capacity. Configuring a thread-pooling executor can make a substantial difference in
how the endpoint performs under load. These settings are available per-endpoint since the performance
characteristics of an endpoint's handler or is one of the major factors to consider (the other major factor
being the expected volume on the channel to which the endpoint subscribes). To enable concurrency for an
endpoint that is configured with the XML namespace support, provide the 'task-executor' reference on its
<poller/> element and then provide one or more of the properties shown below:
<service-activator input-channel="exampleChannel" ref="exampleHandler"> <poller period="5000" task-executor="pool"/> </service-activator> <pool-executor id="pool" core-size="5" max-size="25" queue-capacity="20" keep-alive-seconds="120"/>
If no 'task-executor' is provided, the endpoint's handler or target will be invoked in the caller's thread.
Note that the "caller" is usually the MessageBus' task scheduler except in the case of a subscribable channel.
Also, keep in mind that you the 'task-executor' attribute can provide a reference to any implementation of
Spring's TaskExecutor
interface.
As described in Section 2.7, “MessageBus”, the MessageBus
plays a central role.
Nevertheless, its configuration is quite simple since it is primarily concerned with managing internal details
based on the configuration of channels and endpoints. The bus is aware of its host application context, and
therefore is also capable of auto-detecting the channels and endpoints. Typically, the
MessageBus
can be configured with a single empty element:
<message-bus/>
The Message Bus provides default error handling for its components in the form of a configurable error channel, and it will first check for a channel bean named 'errorChannel' within the context:
<message-bus/> <channel id="errorChannel" capacity="500"/>
When exceptions occur in a scheduled poller task's execution, those exceptions will be wrapped in
ErrorMessages
and sent to the 'errorChannel' by default. To enable global error
handling, simply register a handler on that channel. For example, you can configure Spring Integration's
RootCauseErrorMessageRouter
as the handler of an endpoint that is subscribed to the
'errorChannel'. That router can then spread the error messages across multiple channels based on
Exception
type. However, since most of the errors will already have been wrapped in
MessageDeliveryException
or MessageHandlingException
,
the RootCauseErrorMessageRouter
is typically a better option.
The 'message-bus' element accepts several more optional attributes. First, you can control whether the
MessageBus
will be started automatically (the default) or will require explicit startup
by invoking its start()
method (MessageBus
implements
Spring's Lifecycle
interface):
<message-bus auto-startup="false"/>
Another configurable property is the size of the default dispatcher thread pool. The dispatcher threads are responsible for polling channels and then passing the messages to handlers.
<message-bus dispatcher-pool-size="25"/>
When the endpoints are concurrency-enabled as described in the previous section, the invocation of the handling methods will happen within the handler thread pool and not the dispatcher pool. However, when no task-executor is provided to an endpoint's poller, then it will be invoked in the dispatcher's thread (with the exception of subscribable channels).
Finally, the type of channel that gets created automatically by the bus can be customized by defining a bean that implements the ChannelFactory interface and whose name is "channelFactory".
<message-bus/> <beans:bean id="channelFactory" class="org.springframework.integration.channel.factory.PriorityChannelFactory"/>
With this definition, all the channels created automatically will be PriorityChannel
instances.
Without a "channelFactory" bean, the Message Bus will assume a default QueueChannelFactory
.
The most convenient way to configure Source and Target adapters is by using the namespace support. The following examples demonstrate the namespace-based configuration of several source, target, gateway, and handler adapters:
<jms-source id="jmsSource" connection-factory="connFactory" destination="inQueue"/> <!-- using the default "connectionFactory" reference --> <jms-target id="jmsTarget" destination="outQueue"/> <file-source id="fileSource" directory="/tmp/in"/> <file-target id="fileTarget" directory="/tmp/out"/> <rmi-gateway id="rmiSource" request-channel="rmiSourceInput"/> <rmi-handler id="rmiTarget" local-channel="rmiTargetOutput" remote-channel="someRemoteChannel" host="somehost"/> <httpinvoker-gateway id="httpSource" name="/some/path" request-channel="httpInvokerInput"/> <httpinvoker-handler id="httpTarget" channel="httpInvokerOutput" url="http://somehost/test"/> <mail-target id="mailTarget" host="somehost" username="someuser" password="somepassword"/> <ws-handler id="wsTarget" uri="http://example.org" channel="wsOutput"/> <ftp-source id="ftpSource" host="example.org" username="someuser" password="somepassword" local-working-directory="/some/path" remote-working-directory="/some/path"/>
In the examples above, notice that simple implementations of the MessageSource
and MessageTarget
interfaces do not accept any 'channel' references. To
connect such sources and targets to a channel, register them within a 'channel-adapter'. For example, here
is a File source with an endpoint whose polling will be scheduled to execute every 30 seconds by the
MessageBus
.
<channel-adapter source="fileSource" channel="exampleChannel"> <poller period="30000"/> </channel-adapter> <file-source id="fileSource" directory="/tmp/in"/>
Likewise, here is an example of a JMS target that is registered within a 'channel-adapter' and whose Messages will be received from the "exampleChannel" that is polled every 500 milliseconds.
<channel-adapter channel="exampleChannel" target="jmsTarget"> <poller period="500"/> </channel-adapter> <jms-target id="jmsTarget" destination="targetDestination"/>
Any Channel Adapter can be created without a "channel" reference in which case it will implicitly create an
instance of DirectChannel
. The created channel's name will match the "id" attribute
of the <channel-adapter/> element. Therefore, if the "channel" is not provided, the "id" is required.