Spring Integration provides Channel Adapters for receiving and sending messages over internet protocols. Both UDP (User Datagram Protocol) and TCP (Transmission Control Protocol) adapters are provided. Each adapter provides for one-way communication over the underlying protocol. In addition, simple inbound and outbound tcp gateways are provided. These are used when two-way communication is needed.
Two flavors each of UDP inbound and outbound channel adapters are provided UnicastSendingMessageHandler
sends a datagram packet to a single destination.
UnicastReceivingChannelAdapter
receives incoming datagram packets.
MulticastSendingMessageHandler
sends (broadcasts) datagram packets to a multicast address.
MulticastReceivingChannelAdapter
receives incoming datagram packets by joining to a multicast address.
TCP inbound and outbound channel adapters are provided TcpSendingMessageHandler
sends messages over TCP.
TcpReceivingChannelAdapter
receives messages over TCP.
An inbound TCP gateway is provided; this allows for simple request/response processing. While the gateway can support any number of connections, each connection can only process serially. The thread that reads from the socket waits for, and sends, the response before reading again. If the connection factory is configured for single use connections, the connection is closed after the socket times out.
An outbound TCP gateway is provided; this allows for simple request/response processing. If the associated connection factory is configured for single use connections, a new connection is immediately created for each new request. Otherwise, if the connection is in use, the calling thread blocks on the connection until either a response is received or a timeout or I/O error occurs.
The TCP and UDP inbound channel adapters, and the TCP inbound gateway, support the "error-channel" attribute. This provides the same basic functionality as described in Section 8.4.1, “Enter the GatewayProxyFactoryBean”.
<int-ip:udp-outbound-channel-adapter id="udpOut" host="somehost" port="11111" multicast="false" channel="exampleChannel"/>
A simple UDP outbound channel adapter.
Tip | |
---|---|
When setting multicast to true, provide the multicast address in the host attribute. |
UDP is an efficient, but unreliable protocol. Two attributes are added to improve reliability. When check-length is set to true, the adapter precedes the message data with a length field (4 bytes in network byte order). This enables the receiving side to verify the length of the packet received. If a receiving system uses a buffer that is too short the contain the packet, the packet can be truncated. The length header provides a mechanism to detect this.
Starting with version 4.3, the port
can be set to 0
, in which case the Operating System chooses the port; the
chosen port can be discovered by invoking getPort()
after the adapter is started and isListening()
returns true
.
<int-ip:udp-outbound-channel-adapter id="udpOut" host="somehost" port="11111" multicast="false" check-length="true" channel="exampleChannel"/>
An outbound channel adapter that adds length checking to the datagram packets.
Tip | |
---|---|
The recipient of the packet must also be configured to expect a length to precede the actual data.
For a Spring Integration UDP inbound channel adapter, set its |
The second reliability improvement allows an application-level acknowledgment protocol to be used. The receiver must send an acknowledgment to the sender within a specified time.
<int-ip:udp-outbound-channel-adapter id="udpOut" host="somehost" port="11111" multicast="false" check-length="true" acknowledge="true" ack-host="thishost" ack-port="22222" ack-timeout="10000" channel="exampleChannel"/>
An outbound channel adapter that adds length checking to the datagram packets and waits for an acknowledgment.
Tip | |
---|---|
Setting acknowledge to true implies the recipient of the packet can interpret the header added to the packet containing acknowledgment data (host and port). Most likely, the recipient will be a Spring Integration inbound channel adapter. |
Tip | |
---|---|
When multicast is true, an additional attribute min-acks-for-success specifies how many acknowledgments must be received within the ack-timeout. |
For even more reliable networking, TCP can be used.
Starting with version 4.3, the ackPort
can be set to 0
, in which case the Operating System chooses the port.
Also starting with version 4.3, the destination-expression
and socket-expression
options are available
for the <int-ip:udp-outbound-channel-adapter>
(UnicastSendingMessageHandler
).
The destination-expression
can be used as a runtime alternative to the hardcoded host
/port
pair to determine
the destination address for the outgoing datagram packet against requestMessage
as a root object for evaluation context.
The expression must evaluate to URI
, or String
in the URI style (see RFC-2396)
or SocketAddress
.
The new IpHeaders.PACKET_ADDRESS
header can be used for this expression as well.
In the Framework this header is populated by the DatagramPacketMessageMapper
, when we receive datagrams in the
UnicastReceivingChannelAdapter
and convert them to messages.
The header value is exactly the result of DatagramPacket.getSocketAddress()
of incoming datagram.
With the socket-expression
help the Outbound Channel Adapter can use e.g. Inbound Channel Adapter socket
to send datagrams through same port which they were received.
It’s useful in a scenario when our application works as a UDP server and clients operate behind the NAT.
This expression must evaluate to the DatagramSocket
.
The requestMessage
is used as a root object for evaluation context.
The socket-expression
parameter cannot be used with parameters like multicast
and acknowledge
.
<int-ip:udp-inbound-channel-adapter id="inbound" port="0" channel="in" /> <int:channel id="in" /> <int:transformer expression="new String(payload).toUpperCase()" input-channel="in" output-channel="out"/> <int:channel id="out" /> <int-ip:udp-outbound-channel-adapter id="outbound" socket-expression="@inbound.socket" destination-expression="headers['ip_packetAddress']" channel="out" />
<int-ip:udp-inbound-channel-adapter id="udpReceiver" channel="udpOutChannel" port="11111" receive-buffer-size="500" multicast="false" check-length="true"/>
A basic unicast inbound udp channel adapter.
<int-ip:udp-inbound-channel-adapter id="udpReceiver" channel="udpOutChannel" port="11111" receive-buffer-size="500" multicast="true" multicast-address="225.6.7.8" check-length="true"/>
A basic multicast inbound udp channel adapter.
By default, reverse DNS lookups are done on inbound packets to convert IP addresses to hostnames for use in message headers.
In environments where DNS is not configured, this can cause delays.
This default behavior can be overridden by setting the lookup-host
attribute to "false".
For TCP, the configuration of the underlying connection is provided using a Connection Factory. Two types of connection factory are provided; a client connection factory and a server connection factory. Client connection factories are used to establish outgoing connections; Server connection factories listen for incoming connections.
A client connection factory is used by an outbound channel adapter but a reference to a client connection factory can also be provided to an inbound channel adapter and that adapter will receive any incoming messages received on connections created by the outbound adapter.
A server connection factory is used by an inbound channel adapter or gateway (in fact the connection factory will not function without one). A reference to a server connection factory can also be provided to an outbound adapter; that adapter can then be used to send replies to incoming messages to the same connection.
Tip | |
---|---|
Reply messages will only be routed to the connection if the reply contains the header |
Tip | |
---|---|
This is the extent of message correlation performed when sharing connection factories between inbound and outbound adapters. Such sharing allows for asynchronous two-way communication over TCP. By default, only payload information is transferred using TCP; therefore any message correlation must be performed by downstream components such as aggregators or other endpoints. Support for transferring selected headers was introduced in version 3.0. For more information refer to Section 31.3.2, “TCP Failover Client Connection Factory”. |
A maximum of one adapter of each type may be given a reference to a connection factory.
Connection factories using java.net.Socket
and java.nio.channel.SocketChannel
are provided.
<int-ip:tcp-connection-factory id="server" type="server" port="1234"/>
A simple server connection factory that uses java.net.Socket
connections.
<int-ip:tcp-connection-factory id="server" type="server" port="1234" using-nio="true"/>
A simple server connection factory that uses java.nio.channel.SocketChannel
connections.
Note | |
---|---|
Starting with Spring Integration version 4.2, if the server is configured to listen on a random port (0),
the actual port chosen by the OS can be obtained using |
<int-ip:tcp-connection-factory id="client" type="client" host="localhost" port="1234" single-use="true" so-timeout="10000"/>
A client connection factory that uses java.net.Socket
connections and creates a new connection for each message.
<int-ip:tcp-connection-factory id="client" type="client" host="localhost" port="1234" single-use="true" so-timeout="10000" using-nio=true/>
A client connection factory that uses java.nio.channel.Socket
connections and creates a new connection for each message.
TCP is a streaming protocol; this means that some structure has to be provided to data transported over TCP, so the receiver can demarcate the data into discrete messages. Connection factories are configured to use (de)serializers to convert between the message payload and the bits that are sent over TCP. This is accomplished by providing a deserializer and serializer for inbound and outbound messages respectively. A number of standard (de)serializers are provided.
The ByteArrayCrlfSerializer
*, converts a byte array to a stream of bytes followed by carriage return and linefeed characters (\r\n).
This is the default (de)serializer and can be used with telnet as a client, for example.
The ByteArraySingleTerminatorSerializer
*, converts a byte array to a stream of bytes followed by a single termination character (default 0x00).
The ByteArrayLfSerializer
*, converts a byte array to a stream of bytes followed by a single linefeed character (0x0a).
The ByteArrayStxEtxSerializer
*, converts a byte array to a stream of bytes preceded by an STX (0x02) and followed by an ETX (0x03).
The ByteArrayLengthHeaderSerializer
, converts a byte array to a stream of bytes preceded by a binary length in network byte order (big endian).
This a very efficient deserializer because it does not have to parse every byte looking for a termination character sequence.
It can also be used for payloads containing binary data; the above serializers only support text in the payload.
The default size of the length header is 4 bytes (Integer), allowing for messages up to (2^31 - 1) bytes.
However, the length header can be a single byte (unsigned) for messages up to 255 bytes, or an unsigned short (2 bytes) for messages up to (2^16 - 1) bytes.
If you need any other format for the header, you can subclass this class and provide implementations for the readHeader and writeHeader methods.
The absolute maximum data size supported is (2^31 - 1) bytes.
The ByteArrayRawSerializer
*, converts a byte array to a stream of bytes and adds no additional message demarcation data; with this (de)serializer, the end of a message is indicated by the client closing the socket in an orderly fashion.
When using this serializer, message reception will hang until the client closes the socket, or a timeout occurs; a timeout will NOT result in a message.
When this serializer is being used, and the client is a Spring Integration application, the client must use a connection factory that is configured with single-use=true - this causes the adapter to close the socket after sending the message; the serializer will not, itself, close the connection.
This serializer should only be used with connection factories used by channel adapters (not gateways), and the connection factories should be used by either an inbound or outbound adapter, and not both.
Note | |
---|---|
Before version 4.2.2, when using NIO, this serializer treated a timeout (during read) as an end of file and the
data read so far was emitted as a message.
This is unreliable and should not be used to delimit messages; it now treats such conditions as an exception.
In the unlikely event you are using it this way, the previous behavior can be restored by setting the
|
Each of these is a subclass of AbstractByteArraySerializer
which implements both org.springframework.core.serializer.Serializer
and org.springframework.core.serializer.Deserializer
.
For backwards compatibility, connections using any subclass of AbstractByteArraySerializer
for serialization will also accept a String which will be converted to a byte array first.
Each of these (de)serializers converts an input stream containing the corresponding format to a byte array payload.
To avoid memory exhaustion due to a badly behaved client (one that does not adhere to the protocol of the configured serializer), these serializers impose a maximum message size.
If the size is exceeded by an incoming message, an exception will be thrown.
The default maximum message size is 2048 bytes, and can be increased by setting the maxMessageSize
property.
If you are using the default (de)serializer and wish to increase the maximum message size, you must declare it as an explicit bean with the property set and configure the connection factory to use that bean.
The classes marked with * above use an intermediate buffer and copy the decoded data to a final buffer of the correct
size.
Starting with version 4.3, these can be configured with a poolSize
property to allow these raw buffers to be reused
instead of being allocated and discarded for each message, which is the default behavior.
Setting the property to a negative value will create a pool that has no bounds.
If the pool is bounded, you can also set the poolWaitTimeout
property (milliseconds) after which an exception is
thrown if no buffer becomes available; it defaults to infinity.
Such an exception will cause the socket to be closed.
If you wish to use the same mechanism in custom deserializers, subclass AbstractPooledBufferByteArraySerializer
instead of its super class AbstractByteArraySerializer
, and implement doDeserialize()
instead of deserialize()
.
The buffer will be returned to the pool automatically.
AbstractPooledBufferByteArraySerializer
also provides a convenient utility method copyToSizedArray()
.
The MapJsonSerializer
uses a Jackson ObjectMapper
to convert between a Map
and JSON.
This can be used in conjunction with a MessageConvertingTcpMessageMapper
and a MapMessageConverter
to transfer selected headers and the payload in a JSON format.
Note | |
---|---|
The Jackson |
The final standard serializer is org.springframework.core.serializer.DefaultSerializer
which can be used to convert Serializable objects using java serialization.org.springframework.core.serializer.DefaultDeserializer
is provided for inbound deserialization of streams containing Serializable objects.
To implement a custom (de)serializer pair, implement the org.springframework.core.serializer.Deserializer
and org.springframework.core.serializer.Serializer
interfaces.
If you do not wish to use the default (de)serializer (ByteArrayCrLfSerializer
), you must supply serializer
and deserializer
attributes on the connection factory (example below).
<bean id="javaSerializer" class="org.springframework.core.serializer.DefaultSerializer" /> <bean id="javaDeserializer" class="org.springframework.core.serializer.DefaultDeserializer" /> <int-ip:tcp-connection-factory id="server" type="server" port="1234" deserializer="javaDeserializer" serializer="javaSerializer"/>
A server connection factory that uses java.net.Socket
connections and uses Java serialization on the wire.
For full details of the attributes available on connection factories, see the reference at the end of this section.
By default, reverse DNS lookups are done on inbound packets to convert IP addresses to hostnames for use in message headers.
In environments where DNS is not configured, this can cause connection delays.
This default behavior can be overridden by setting the lookup-host
attribute to "false".
Note | |
---|---|
It is possible to modify the creation of and/or attributes of sockets - see Section 31.3.2, “TCP Failover Client Connection Factory”. As is noted there, such modifications are possible whether or not SSL is being used. |
As noted above, TCP sockets can be single-use (one request/response) or shared. Shared sockets do not perform well with outbound gateways, in high-volume environments, because the socket can only process one request/response at a time.
To improve performance, users could use collaborating channel adapters instead of gateways, but that requires application-level message correlation. See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.
Spring Integration 2.2 introduced a caching client connection factory, where a pool of shared sockets is used, allowing a gateway to process multiple concurrent requests with a pool of shared connections.
It is now possible to configure a TCP connection factory that supports failover to one or more other servers. When sending a message, the factory will iterate over all its configured factories until either the message can be sent, or no connection can be found. Initially, the first factory in the configured list is used; if a connection subsequently fails the next factory will become the current factory.
<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory"> <constructor-arg> <list> <ref bean="clientFactory1"/> <ref bean="clientFactory2"/> </list> </constructor-arg> </bean>
Note | |
---|---|
When using the failover connection factory, the |
The connection factory has two properties when used with a shared connection (singleUse=false
):
refreshSharedInterval
closeOnRefresh
These are 0
and false
to retain the same behavior that existed before the properties were added.
Consider the following scenario based on the above configuration:
Let’s say clientFactory1
cannot establish a connection but clientFactory2
can.
Each time the failCF
getConnection()
method is called, we will again attempt to connect using clientFactory1
; if successful, the "old" connection will remain open and may be reused in future if the first factory fails once more.
Set refreshSharedInterval
to only attempt to reconnect with the first factory after that time has expired; set it to Long.MAX_VALUE
if you only want to fail back to the first factory when the current connection fails.
Set closeOnRefresh
to close the "old" connection after a refresh actually creates a new connection.
Important | |
---|---|
These properties do not apply if any of the delegate factories is a |
Note | |
---|---|
When using the failover connection factory, the singleUse property must be consistent between the factory itself and the list of factories it is configured to use. |
=== TCP Connection Interceptors
Connection factories can be configured with a reference to a TcpConnectionInterceptorFactoryChain
.
Interceptors can be used to add behavior to connections, such as negotiation, security, and other setup.
No interceptors are currently provided by the framework but, for an example, see the InterceptedSharedConnectionTests
in the source repository.
The HelloWorldInterceptor
used in the test case works as follows:
When configured with a client connection factory, when the first message is sent over a connection that is intercepted, the interceptor sends Hello over the connection, and expects to receive world!. When that occurs, the negotiation is complete and the original message is sent; further messages that use the same connection are sent without any additional negotiation.
When configured with a server connection factory, the interceptor requires the first message to be Hello and, if it is, returns world!. Otherwise it throws an exception causing the connection to be closed.
All TcpConnection
methods are intercepted.
Interceptor instances are created for each connection by an interceptor factory.
If an interceptor is stateful, the factory should create a new instance for each connection; if there is no state, the same interceptor can wrap each connection.
Interceptor factories are added to the configuration of an interceptor factory chain, which is provided to a connection factory using the interceptor-factory
attribute.
Interceptors must extend TcpConnectionInterceptorSupport
; factories must implement the TcpConnectionInterceptorFactory
interface.
TcpConnectionInterceptorSupport
is provided with passthrough methods; by extending this class, you only need to implement those methods you wish to intercept.
<bean id="helloWorldInterceptorFactory" class="o.s.i.ip.tcp.connection.TcpConnectionInterceptorFactoryChain"> <property name="interceptors"> <array> <bean class="o.s.i.ip.tcp.connection.HelloWorldInterceptorFactory"/> </array> </property> </bean> <int-ip:tcp-connection-factory id="server" type="server" port="12345" using-nio="true" single-use="true" interceptor-factory-chain="helloWorldInterceptorFactory"/> <int-ip:tcp-connection-factory id="client" type="client" host="localhost" port="12345" single-use="true" so-timeout="100000" using-nio="true" interceptor-factory-chain="helloWorldInterceptorFactory"/>
Configuring a connection interceptor factory chain.
Beginning with version 3.0, changes to TcpConnection
s are reported by TcpConnectionEvent
s.
TcpConnectionEvent
is a subclass of ApplicationEvent
and thus can be received by any ApplicationListener
defined in the ApplicationContext
.
Note | |
---|---|
The following is deprecated as of version 4.2; use the generic Event Inbound Channel Adapter instead. See Section 12.1, “Receiving Spring Application Events”. For convenience, a |
TcpConnectionEvents
have the following properties:
connectionId
- the connection identifier which can be used in a message header to send data to the connection
connectionFactoryName
- the bean name of the connection factory the connection belongs to
throwable
- the Throwable
(for TcpConnectionExceptionEvent
events only)
source
- the TcpConnection
; this can be used, for example, to determine the remote IP Address with getHostAddress()
(cast required)
In addition, since version 4.0 the standard deserializers discussed in Section 31.3, “TCP Connection Factories” now emit TcpDeserializationExceptionEvent
s when problems are encountered decoding the data stream.
These events contain the exception, the buffer that was in the process of being built, and an offset into the buffer (if available) at the point the exception occurred.
Applications can use a normal ApplicationListener
, or see Section 12.1, “Receiving Spring Application Events”, to capture these events, allowing analysis of the problem.
Starting with versions 4.0.7, 4.1.3, TcpConnectionServerExceptionEvent
s are published whenever an unexpected exception occurs on a server socket (such as a BindException
when the server socket is in use).
These events have a reference to the connection factory and the cause.
Starting with version 4.2, TcpConnectionFailedCorrelationEvent
s are published whenever an endpoint (inbound gateway or
collaborating outbound channel adapter) receives a message that cannot be routed to a connection because the
ip_connectionId
header is invalid.
Outbound gateways also publish this event when a late reply is received (the sender thread has timed out).
The event contains the connection id as well as an exception in the cause
property that contains the failed message.
Starting with version 4.3, a TcpConnectionServerListeningEvent
is emitted when a server connection factory is started.
This is useful when the factory is configured to listen on port 0, meaning that the operating system chooses the port.
It can also be used instead of polling isListening()
, if you need to wait before starting some other process that will
connect to the socket.
Important | |
---|---|
To avoid delaying the listening thread from accepting connections, the event is published on a separate thread. |
Starting with version 4.3.2, a TcpConnectionFailedEvent
is emitted whenever a client connection can’t be created.
The source of the event is the connection factory which can be used to determine the host and port to which the connection could not be established.
TCP inbound and outbound channel adapters that utilize the above connection factories are provided.
These adapters have attributes connection-factory
and channel
.
The channel attribute specifies the channel on which messages arrive at an outbound adapter and on which messages are placed by an inbound adapter.
The connection-factory attribute indicates which connection factory is to be used to manage connections for the adapter.
While both inbound and outbound adapters can share a connection factory, server connection factories are always owned by an inbound adapter; client connection factories are always owned by an outbound adapter.
One, and only one, adapter of each type may get a reference to a connection factory.
<bean id="javaSerializer" class="org.springframework.core.serializer.DefaultSerializer"/> <bean id="javaDeserializer" class="org.springframework.core.serializer.DefaultDeserializer"/> <int-ip:tcp-connection-factory id="server" type="server" port="1234" deserializer="javaDeserializer" serializer="javaSerializer" using-nio="true" single-use="true"/> <int-ip:tcp-connection-factory id="client" type="client" host="localhost" port="#{server.port}" single-use="true" so-timeout="10000" deserializer="javaDeserializer" serializer="javaSerializer"/> <int:channel id="input" /> <int:channel id="replies"> <int:queue/> </int:channel> <int-ip:tcp-outbound-channel-adapter id="outboundClient" channel="input" connection-factory="client"/> <int-ip:tcp-inbound-channel-adapter id="inboundClient" channel="replies" connection-factory="client"/> <int-ip:tcp-inbound-channel-adapter id="inboundServer" channel="loop" connection-factory="server"/> <int-ip:tcp-outbound-channel-adapter id="outboundServer" channel="loop" connection-factory="server"/> <int:channel id="loop"/>
In this configuration, messages arriving in channel input are serialized over connections created by client received at the server and placed on channel loop. Since loop is the input channel for outboundServer the message is simply looped back over the same connection and received by inboundClient and deposited in channel replies. Java serialization is used on the wire.
Normally, inbound adapters use a type="server" connection factory, which listens for incoming connection requests. In some cases, it is desirable to establish the connection in reverse, whereby the inbound adapter connects to an external server and then waits for inbound messages on that connection.
This topology is supported by using client-mode="true" on the inbound adapter. In this case, the connection factory must be of type client and must have single-use set to false.
Two additional attributes are used to support this mechanism: retry-interval specifies (in milliseconds) how often the framework will attempt to reconnect after a connection failure.
scheduler is used to supply a TaskScheduler
used to schedule the connection attempts, and to test that the connection is still active.
For an outbound adapter, the connection is normally established when the first message is sent. client-mode="true" on an outbound adapter will cause the connection to be established when the adapter is started. Adapters are automatically started by default. Again, the connection factory must be of type client and have single-use set to false and retry-interval and scheduler are also supported. If a connection fails, it will be re-established either by the scheduler or when the next message is sent.
For both inbound and outbound, if the adapter is started, you may force the adapter to establish a connection by sending a <control-bus /> command: @adapter_id.retryConnection()
and examine the current state with @adapter_id.isClientModeConnected()
.
The inbound TCP gateway TcpInboundGateway
and outbound TCP gateway TcpOutboundGateway
use a server and client connection factory respectively.
Each connection can process a single request/response at a time.
The inbound gateway, after constructing a message with the incoming payload and sending it to the requestChannel, waits for a response and sends the payload from the response message by writing it to the connection.
Note | |
---|---|
For the inbound gateway, care must be taken to retain, or populate, the ip_connectionId header because it is used to correlate the message to a connection. Messages that originate at the gateway will automatically have the header set. If the reply is constructed as a new message, you will need to set the header. The header value can be captured from the incoming message. |
As with inbound adapters, inbound gateways normally use a type="server" connection factory, which listens for incoming connection requests. In some cases, it is desirable to establish the connection in reverse, whereby the inbound gateway connects to an external server and then waits for, and replies to, inbound messages on that connection.
This topology is supported by using client-mode="true" on the inbound gateway. In this case, the connection factory must be of type client and must have single-use set to false.
Two additional attributes are used to support this mechanism: retry-interval specifies (in milliseconds) how often the framework will attempt to reconnect after a connection failure.
scheduler is used to supply a TaskScheduler
used to schedule the connection attempts, and to test that the connection is still active.
If the gateway is started, you may force the gateway to establish a connection by sending a <control-bus /> command: @adapter_id.retryConnection()
and examine the current state with @adapter_id.isClientModeConnected()
.
The outbound gateway, after sending a message over the connection, waits for a response and constructs a response message and puts in on the reply channel. Communications over the connections are single-threaded. Users should be aware that only one message can be handled at a time and, if another thread attempts to send a message before the current response has been received, it will block until any previous requests are complete (or time out). If, however, the client connection factory is configured for single-use connections each new request gets its own connection and is processed immediately.
<int-ip:tcp-inbound-gateway id="inGateway" request-channel="tcpChannel" reply-channel="replyChannel" connection-factory="cfServer" reply-timeout="10000"/>
A simple inbound TCP gateway; if a connection factory configured with the default (de)serializer is used, messages will be \r\n delimited data and the gateway can be used by a simple client such as telnet.
<int-ip:tcp-outbound-gateway id="outGateway" request-channel="tcpChannel" reply-channel="replyChannel" connection-factory="cfClient" request-timeout="10000" remote-timeout="10000"/> <!-- or e.g. remote-timeout-expression="headers['timeout']" -->
A simple outbound TCP gateway.
client-mode
is not currently available with the outbound gateway.
==== Overview
One goal of the IP Endpoints is to provide communication with systems other than another Spring Integration application.
For this reason, only message payloads are sent and received, by default.
Since 3.0, headers can be transferred, using JSON, Java serialization, or with custom Serializer
s and Deserializer
s; see Section 31.3.2, “TCP Failover Client Connection Factory” for more information.
No message correlation is provided by the framework, except when using the gateways, or collaborating channel adapters on the server side.
In the paragraphs below we discuss the various correlation techniques available to applications.
In most cases, this requires specific application-level correlation of messages, even when message payloads contain some natural correlation data (such as an order number).
==== Gateways
The gateways will automatically correlate messages. However, an outbound gateway should only be used for relatively low-volume use. When the connection factory is configured for a single shared connection to be used for all message pairs (single-use="false"), only one message can be processed at a time. A new message will have to wait until the reply to the previous message has been received. When a connection factory is configured for each new message to use a new connection (single-use="true"), the above restriction does not apply. While this may give higher throughput than a shared connection environment, it comes with the overhead of opening and closing a new connection for each message pair.
Therefore, for high-volume messages, consider using a collaborating pair of channel adapters. However, you will need to provide collaboration logic.
Another solution, introduced in Spring Integration 2.2, is to use a CachingClientConnectionFactory
, which allows the use of a pool of shared connections.
==== Collaborating Outbound and Inbound Channel Adapters
To achieve high-volume throughput (avoiding the pitfalls of using gateways as mentioned above) you may consider configuring a pair of collaborating outbound and inbound channel adapters. Collaborating adapters can also be used (server-side or client-side) for totally asynchronous communication (rather than with request/reply semantics). On the server side, message correlation is automatically handled by the adapters because the inbound adapter adds a header allowing the outbound adapter to determine which connection to use to send the reply message.
Note | |
---|---|
On the server side, care must be taken to populate the ip_connectionId header because it is used to correlate the message to a connection. Messages that originate at the inbound adapter will automatically have the header set. If you wish to construct other messages to send, you will need to set the header. The header value can be captured from an incoming message. |
On the client side, the application will have to provide its own correlation logic, if needed. This can be done in a number of ways.
If the message payload has some natural correlation data, such as a transaction id or an order number, AND there is no need to retain any information (such as a reply channel header) from the original outbound message, the correlation is simple and would done at the application level in any case.
If the message payload has some natural correlation data, such as a transaction id or an order number, but there is a need to retain some information (such as a reply channel header) from the original outbound message, you may need to retain a copy of the original outbound message (perhaps by using a publish-subscribe channel) and use an aggregator to recombine the necessary data.
For either of the previous two paragraphs, if the payload has no natural correlation data, you may need to provide a transformer upstream of the outbound channel adapter to enhance the payload with such data. Such a transformer may transform the original payload to a new object containing both the original payload and some subset of the message headers. Of course, live objects (such as reply channels) from the headers can not be included in the transformed payload.
If such a strategy is chosen you will need to ensure the connection factory has an appropriate serializer/deserializer pair to handle such a payload, such as the DefaultSerializer/Deserializer
which use java serialization, or a custom serializer and deserializer.
The ByteArray*Serializer
options mentioned in Section 31.3, “TCP Connection Factories”, including the default ByteArrayCrLfSerializer
, do not support such payloads, unless the transformed payload is a String
or byte[]
,
Note | |
---|---|
Before the 2.2 release, when a client connection factory was used by collaborating channel adapters, the so-timeout attribute defaulted to the default reply timeout (10 seconds). This meant that if no data were received by the inbound adapter for this period of time, the socket was closed. This default behavior was not appropriate in a truly asynchronous environment, so it now defaults to an infinite timeout. You can reinstate the previous default behavior by setting the so-timeout attribute on the client connection factory to 10000 milliseconds. |
TCP is a streaming protocol; Serializers
and Deserializers
are used to demarcate messages within the stream.
Prior to 3.0, only message payloads (String or byte[]) could be transferred over TCP.
Beginning with 3.0, you can now transfer selected headers as well as the payload.
It is important to understand, though, that "live" objects, such as the replyChannel
header cannot be serialized.
Sending header information over TCP requires some additional configuration.
The first step is to provide the ConnectionFactory
with a MessageConvertingTcpMessageMapper
using the mapper
attribute.
This mapper delegates to any MessageConverter
implementation to convert the message to/from some object that can be (de)serialized by the configured serializer
and deserializer
.
A MapMessageConverter
is provided, which allows the specification of a list of headers that will be added to a Map
object, along with the payload.
The generated Map has two entries: payload
and headers
.
The headers
entry is itself a Map
containing the selected headers.
The second step is to provide a (de)serializer that can convert between a Map
and some wire format.
This can be a custom (de)Serializer
, which would typically be needed if the peer system is not a Spring Integration application.
A MapJsonSerializer
is provided that will convert a Map to/from JSON.
This uses a Spring Integration JsonObjectMapper
to perform this function.
You can provide a custom JsonObjectMapper
if needed.
By default, the serializer inserts a linefeed`0x0a` character between objects.
See the JavaDocs for more information.
Note | |
---|---|
At the time of writing, the |
You can also use standard Java serialization of the Map, using the DefaultSerializer
and DefaultDeserializer
.
The following example shows the configuration of a connection factory that transfers the correlationId
, sequenceNumber
, and sequenceSize
headers using JSON.
<int-ip:tcp-connection-factory id="client" type="client" host="localhost" port="12345" mapper="mapper" serializer="jsonSerializer" deserializer="jsonSerializer"/> <bean id="mapper" class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper"> <constructor-arg name="messageConverter"> <bean class="o.sf.integration.support.converter.MapMessageConverter"> <property name="headerNames"> <list> <value>correlationId</value> <value>sequenceNumber</value> <value>sequenceSize</value> </list> </property> </bean> </constructor-arg> </bean> <bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />
A message sent with the above configuration, with payload foo would appear on the wire like so:
{"headers":{"correlationId":"bar","sequenceSize":5,"sequenceNumber":1},"payload":"foo"}
Using NIO (see using-nio
in Section 31.3.2, “TCP Failover Client Connection Factory”) avoids dedicating a thread to read from each socket.
For a small number of sockets, you will likely find that not using NIO, together with an async handoff (e.g.
to a QueueChannel
), will perform as well as, or better than, using NIO.
Consider using NIO when handling a large number of connections. However, the use of NIO has some other ramifications. A pool of threads (in the task executor) is shared across all the sockets; each incoming message is assembled and sent to the configured channel as a separate unit of work on a thread selected from that pool. Two sequential messages arriving on the same socket might be processed by different threads. This means that the order in which the messages are sent to the channel is indeterminate; the strict ordering of the messages arriving on the socket is not maintained.
For some applications, this is not an issue; for others it is.
If strict ordering is required, consider setting using-nio
to false and using async handoff.
Alternatively, you may choose to insert a resequencer downstream of the inbound endpoint to return the messages to their proper sequence. Set apply-sequence to true on the connection factory, and messages arriving on a TCP connection will have sequenceNumber and correlationId headers set. The resequencer uses these headers to return the messages to their proper sequence.
Pool Size
The pool size attribute is no longer used; previously, it specified the size of the default thread pool when a task-executor was not specified. It was also used to set the connection backlog on server sockets. The first function is no longer needed (see below); the second function is replaced by the backlog attribute.
Previously, when using a fixed thread pool task executor (which was the default), with NIO, it was possible to get a deadlock and processing would stop. The problem occurred when a buffer was full, a thread reading from the socket was trying to add more data to the buffer, and there were no threads available to make space in the buffer. This only occurred with a very small pool size, but it could be possible under extreme conditions. Since 2.2, two changes have eliminated this problem. First, the default task executor is a cached thread pool executor. Second, deadlock detection logic has been added such that if thread starvation occurs, instead of deadlocking, an exception is thrown, thus releasing the deadlocked resources.
Important | |
---|---|
Now that the default task executor is unbounded, it is possible that an out of memory condition might occur with high rates of incoming messages, if message processing takes extended time. If your application exhibits this type of behavior, you are advised to use a pooled task executor with an appropriate pool size, but see the next section. |
==== Thread Pool Task Executor with CALLER_RUNS Policy
There are some important considerations when using a fixed thread pool with the CallerRunsPolicy
(CALLER_RUNS
when using the <task/>
namespace) and the queue capacity is small.
The following does not apply if you are not using a fixed thread pool.
With NIO connections there are 3 distinct task types; the IO Selector processing is performed on one dedicated thread - detecting events, accepting new connections, and dispatching the IO read operations to other threads, using the task executor. When an IO reader thread (to which the read operation is dispatched) reads data, it hands off to another thread to assemble the incoming message; large messages may take several reads to complete. These "assembler" threads can block waiting for data. When a new read event occurs, the reader determines if this socket already has an assembler and runs a new one if not. When the assembly process is complete, the assembler thread is returned to the pool.
This can cause a deadlock when the pool is exhausted and the CALLER_RUNS rejection policy is in use, and the task queue is full.
When the pool is empty and there is no room in the queue, the IO selector thread receives an OP_READ
event and dispatches the read using the executor; the queue is full, so the selector thread itself starts the read process; now, it detects that there is not an assembler for this socket and, before it does the read, fires off an assembler; again, the queue is full, and the selector thread becomes the assembler.
The assembler is now blocked awaiting the data to be read, which will never happen.
The connection factory is now deadlocked because the selector thread can’t handle new events.
We must avoid the selector (or reader) threads performing the assembly task to avoid this deadlock. It is desirable to use seperate pools for the IO and assembly operations.
The framework provides a CompositeExecutor
, which allows the configuration of two distinct executors; one for performing IO operations, and one for message assembly.
In this environment, an IO thread can never become an assembler thread, and the deadlock cannot occur.
In addition, the task executors should be configured to use a AbortPolicy
(ABORT when using <task>
).
When an IO cannot be completed, it is deferred for a short time and retried continually until it can be completed and an assembler allocated.
By default, the delay is 100ms but it can be changed using the readDelay
property on the connection factory (read-delay
when configuring with the XML namespace).
Example configuration of the composite executor is shown below.
@Bean private CompositeExecutor compositeExecutor() { ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor(); ioExec.setCorePoolSize(4); ioExec.setMaxPoolSize(10); ioExec.setQueueCapacity(0); ioExec.setThreadNamePrefix("io-"); ioExec.setRejectedExecutionHandler(new AbortPolicy()); ioExec.initialize(); ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor(); assemblerExec.setCorePoolSize(4); assemblerExec.setMaxPoolSize(10); assemblerExec.setQueueCapacity(0); assemblerExec.setThreadNamePrefix("assembler-"); assemblerExec.setRejectedExecutionHandler(new AbortPolicy()); assemblerExec.initialize(); return new CompositeExecutor(ioExec, assemblerExec); }
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor"> <constructor-arg ref="io"/> <constructor-arg ref="assembler"/> </bean> <task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" /> <task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor"> <constructor-arg> <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="threadNamePrefix" value="io-" /> <property name="corePoolSize" value="4" /> <property name="maxPoolSize" value="8" /> <property name="queueCapacity" value="0" /> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" /> </property> </bean> </constructor-arg> <constructor-arg> <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="threadNamePrefix" value="assembler-" /> <property name="corePoolSize" value="4" /> <property name="maxPoolSize" value="10" /> <property name="queueCapacity" value="0" /> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" /> </property> </bean> </constructor-arg> </bean>
==== Overview
Secure Sockets Layer/Transport Layer Security is supported.
When using NIO, the JDK 5+ SSLEngine
feature is used to handle handshaking after the connection is established.
When not using NIO, standard SSLSocketFactory
and SSLServerSocketFactory
objects are used to create connections.
A number of strategy interfaces are provided to allow significant customization; default implementations of these interfaces provide for the simplest way to get started with secure communications.
==== Getting Started
Regardless of whether NIO is being used, you need to configure the ssl-context-support
attribute on the connection factory.
This attribute references a <bean/> definition that describes the location and passwords for the required key stores.
SSL/TLS peers require two keystores each; a keystore containing private/public key pairs identifying the peer; a truststore, containing the public keys for peers that are trusted.
See the documentation for the keytool
utility provided with the JDK.
The essential steps are
Repeat for the other peer.
Note | |
---|---|
It is common in test cases to use the same key stores on both peers, but this should be avoided for production. |
After establishing the key stores, the next step is to indicate their locations to the TcpSSLContextSupport
bean, and provide a reference to that bean to the connection factory.
<bean id="sslContextSupport" class="o.sf.integration.ip.tcp.connection.support.DefaultTcpSSLContextSupport"> <constructor-arg value="client.ks"/> <constructor-arg value="client.truststore.ks"/> <constructor-arg value="secret"/> <constructor-arg value="secret"/> </bean> <ip:tcp-connection-factory id="clientFactory" type="client" host="localhost" port="1234" ssl-context-support="sslContextSupport" />
The DefaulTcpSSLContextSupport
class also has an optional protocol
property, which can be SSL
or TLS
(default).
The keystore file names (first two constructor arguments) use the Spring Resource
abstraction; by default the files will be located on the classpath, but this can be overridden by using the file:
prefix, to find the files on the filesystem instead.
Starting with version 4.3.6, when using NIO, you can specify an ssl-handshake-timeout
(seconds) on the connection factory.
This timeout (default 30) is used during SSL handshake when waiting for data; if the timeout is exceeded, the process is aborted and the socket closed.
==== Strategy Interfaces
In many cases, the configuration described above is all that is needed to enable secure communication over TCP/IP. However, a number of strategy interfaces are provided to allow customization and modification of socket factories and sockets.
TcpSSLContextSupport
TcpSocketFactorySupport
TcpSocketSupport
TcpNioConnectionSupport
public interface TcpSSLContextSupport { SSLContext getSSLContext() throws Exception; }
Implementations of this interface are responsible for creating an SSLContext.
The implementation provided by the framework is the DefaultTcpSSLContextSupport
described above.
If you require different behavior, implement this interface and provide the connection factory with a reference to a bean of your class' implementation.
public interface TcpSocketFactorySupport { ServerSocketFactory getServerSocketFactory(); SocketFactory getSocketFactory(); }
Implementations of this interface are responsible for obtaining references to ServerSocketFactory
and SocketFactory
.
Two implementations are provided; the first is DefaultTcpNetSocketFactorySupport
for non-SSL sockets (when no ssl-context-support
attribute is defined); this simply uses the JDK’s default factories.
The second implementation is DefaultTcpNetSSLSocketFactorySupport
; this is used, by default, when an ssl-context-support
attribute is defined; it uses the SSLContext
created by that bean to create the socket factories.
Note | |
---|---|
This interface only applies if |
public interface TcpSocketSupport { void postProcessServerSocket(ServerSocket serverSocket); void postProcessSocket(Socket socket); }
Implementations of this interface can modify sockets after they are created, and after all configured attributes have been applied, but before the sockets are used.
This applies whether or not NIO is being used.
For example, you could use an implementation of this interface to modify the supported cipher suites on an SSL socket, or you could add a listener that gets notified after SSL handshaking is complete.
The sole implementation provided by the framework is the DefaultTcpSocketSupport
which does not modify the sockets in any way
To supply your own implementation of TcpSocketFactorySupport
or TcpSocketSupport
, provide the connection factory with references to beans of your custom type using the socket-factory-support
and socket-support
attributes, respectively.
public interface TcpNioConnectionSupport { TcpNioConnection createNewConnection(SocketChannel socketChannel, boolean server, boolean lookupHost, ApplicationEventPublisher applicationEventPublisher, String connectionFactoryName) throws Exception; }
This interface is invoked to create TcpNioConnection
objects (or subclasses).
Two implementations are provided DefaultTcpNioSSLConnectionSupport
and DefaultTcpNioConnectionSupport
which are used depending on whether SSL is in use or not.
A common use case would be to subclass DefaultTcpNioSSLConnectionSupport
and override postProcessSSLEngine
; see the example below.
==== Example: Enabling SSL Client Authentication
To enable client certificate authentication when using SSL, the technique depends on whether NIO is in use or not.
When NIO is not being used, provide a custom TcpSocketSupport
implementation to post-process the server socket:
serverFactory.setTcpSocketSupport(new DefaultTcpSocketSupport() { @Override public void postProcessServerSocket(ServerSocket serverSocket) { ((SSLServerSocket) serverSocket).setNeedClientAuth(true); } });
(When using XML configuration, provide a reference to your bean using the socket-support
attribute).
When using NIO, provide a custom TcpNioSslConnectionSupport
implementation to post-process the SSLEngine
.
@Bean public DefaultTcpNioSSLConnectionSupport tcpNioConnectionSupport() { return new DefaultTcpNioSSLConnectionSupport(serverSslContextSupport) { @Override protected void postProcessSSLEngine(SSLEngine sslEngine) { sslEngine.setNeedClientAuth(true); } } } @Bean public TcpNioServerConnectionFactory server() { ... serverFactory.setTcpNioConnectionSupport(tcpNioConnectionSupport()); ... }
(When using XML configuration, since version 4.3.7, provide a reference to your bean using the nio-connection-support
attribute).
=== IP Configuration Attributes
Attribute Name | Client? | Server? | Allowed Values | Attribute Description |
---|---|---|---|---|
type | Y | Y | client, server | Determines whether the connection factory is a client or server. |
host | Y | N | The host name or ip address of the destination. | |
port | Y | Y | The port. | |
serializer | Y | Y | An implementation of | |
deserializer | Y | Y | An implementation of | |
using-nio | Y | Y | true, false | Whether or not connection uses NIO. Refer to the java.nio package for more information. See Section 31.3.2, “TCP Failover Client Connection Factory”. Default false. |
using-direct-buffers | Y | N | true, false | When using NIO, whether or not the connection uses direct buffers.
Refer to |
apply-sequence | Y | Y | true, false | When using NIO, it may be necessary to resequence messages. When this attribute is set to true, correlationId and sequenceNumber headers will be added to received messages. See Section 31.3.2, “TCP Failover Client Connection Factory”. Default false. |
so-timeout | Y | Y | Defaults to 0 (infinity), except for server connection factories with single-use="true". In that case, it defaults to the default reply timeout (10 seconds). | |
so-send-buffer-size | Y | Y | See | |
so-receive-buffer- size | Y | Y | See | |
so-keep-alive | Y | Y | true, false | See |
so-linger | Y | Y | Sets linger to true with supplied value.
See | |
so-tcp-no-delay | Y | Y | true, false | See |
so-traffic-class | Y | Y | See | |
local-address | N | Y | On a multi-homed system, specifies an IP address for the interface to which the socket will be bound. | |
task-executor | Y | Y | Specifies a specific Executor to be used for socket handling. If not supplied, an internal cached thread executor will be used. Needed on some platforms that require the use of specific task executors such as a WorkManagerTaskExecutor. | |
single-use | Y | Y | true, false | Specifies whether a connection can be used for multiple messages. If true, a new connection will be used for each message. |
pool-size | N | N | This attribute is no longer used. For backward compatibility, it sets the backlog but users should use backlog to specify the connection backlog in server factories | |
backlog | N | Y | Sets the connection backlog for server factories. | |
lookup-host | Y | Y | true, false | Specifies whether reverse lookups are done on IP addresses to convert to host names for use in message headers. If false, the IP address is used instead. Defaults to true. |
interceptor-factory-chain | Y | Y | See Section 31.3.2, “TCP Failover Client Connection Factory” | |
ssl-context-support | Y | Y | See Section 31.3.2, “TCP Failover Client Connection Factory” | |
socket-factory-support | Y | Y | See Section 31.3.2, “TCP Failover Client Connection Factory” | |
socket-support | Y | Y | See Section 31.3.2, “TCP Failover Client Connection Factory” | |
nio-connection-support | Y | Y | See Section 31.3.2, “TCP Failover Client Connection Factory” | |
read-delay | Y | Y | long > 0 | The delay (in milliseconds) before retrying a read after the previous attempt failed due to insufficient threads.
Default 100.
Only applies if |
Table 31.1. UDP Inbound Channel Adapter Attributes
Attribute Name | Allowed Values | Attribute Description |
---|---|---|
port | The port on which the adapter listens. | |
multicast | true, false | Whether or not the udp adapter uses multicast. |
multicast-address | When multicast is true, the multicast address to which the adapter joins. | |
pool-size | Specifies the concurrency. Specifies how many packets can be handled concurrently. It only applies if task-executor is not configured. Defaults to 5. | |
task-executor | Specifies a specific Executor to be used for socket handling. If not supplied, an internal pooled executor will be used. Needed on some platforms that require the use of specific task executors such as a WorkManagerTaskExecutor. See pool-size for thread requirements. | |
receive-buffer-size | The size of the buffer used to receive DatagramPackets. Usually set to the MTU size. If a smaller buffer is used than the size of the sent packet, truncation can occur. This can be detected by means of the check-length attribute.. | |
check-length | true, false | Whether or not a udp adapter expects a data length field in the packet received. Used to detect packet truncation. |
so-timeout | See | |
so-send-buffer-size | Used for udp acknowledgment packets.
See | |
so-receive-buffer- size | See | |
local-address | On a multi-homed system, specifies an IP address for the interface to which the socket will be bound. | |
error-channel | If an Exception is thrown by a downstream component, the MessagingException message containing the exception and failed message is sent to this channel. | |
lookup-host | true, false | Specifies whether reverse lookups are done on IP addresses to convert to host names for use in message headers. If false, the IP address is used instead. Defaults to true. |
Table 31.2. UDP Outbound Channel Adapter Attributes
Attribute Name | Allowed Values | Attribute Description |
---|---|---|
host | The host name or ip address of the destination. For multicast udp adapters, the multicast address. | |
port | The port on the destination. | |
multicast | true, false | Whether or not the udp adapter uses multicast. |
acknowledge | true, false | Whether or not a udp adapter requires an acknowledgment from the destination. when enabled, requires setting the following 4 attributes. |
ack-host | When acknowledge is true, indicates the host or ip address to which the acknowledgment should be sent. Usually the current host, but may be different, for example when Network Address Transaction (NAT) is being used. | |
ack-port | When acknowledge is true, indicates the port to which the acknowledgment should be sent. The adapter listens on this port for acknowledgments. | |
ack-timeout | When acknowledge is true, indicates the time in milliseconds that the adapter will wait for an acknowledgment. If an acknowledgment is not received in time, the adapter will throw an exception. | |
min-acks-for- success | Defaults to 1. For multicast adapters, you can set this to a larger value, requiring acknowledgments from multiple destinations. | |
check-length | true, false | Whether or not a udp adapter includes a data length field in the packet sent to the destination. |
time-to-live | For multicast adapters, specifies the time to live attribute for the | |
so-timeout | See | |
so-send-buffer-size | See | |
so-receive-buffer- size | Used for udp acknowledgment packets.
See | |
local-address | On a multi-homed system, for the UDP adapter, specifies an IP address for the interface to which the socket will be bound for reply messages. For a multicast adapter it is also used to determine which interface the multicast packets will be sent over. | |
task-executor | Specifies a specific Executor to be used for acknowledgment handling. If not supplied, an internal single threaded executor will be used. Needed on some platforms that require the use of specific task executors such as a WorkManagerTaskExecutor. One thread will be dedicated to handling acknowledgments (if the acknowledge option is true). | |
destination-expression | SpEL expression | A SpEL expression to be evaluated to determine which |
socket-expression | SpEL expression | A SpEL expression to be evaluated to determine which datagram socket use for sending outgoing UDP packets. |
Table 31.3. TCP Inbound Channel Adapter Attributes
Attribute Name | Allowed Values | Attribute Description |
---|---|---|
channel | The channel to which inbound messages will be sent. | |
connection-factory | If the connection factory has a type server, the factory is owned by this adapter. If it has a type client, it is owned by an outbound channel adapter and this adapter will receive any incoming messages on the connection created by the outbound adapter. | |
error-channel | If an Exception is thrown by a downstream component, the MessagingException message containing the exception and failed message is sent to this channel. | |
client-mode | true, false | When true, the inbound adapter will act as a client, with respect to establishing the connection and then receive incoming messages on that connection. Default = false. Also see retry-interval and scheduler. The connection factory must be of type client and have single-use set to false. |
retry-interval | When in client-mode, specifies the number of milliseconds to wait between connection attempts, or after a connection failure. Default 60,000 (60 seconds). | |
scheduler | true, false | Specifies a |
Table 31.4. TCP Outbound Channel Adapter Attributes
Attribute Name | Allowed Values | Attribute Description |
---|---|---|
channel | The channel on which outbound messages arrive. | |
connection-factory | If the connection factory has a type client, the factory is owned by this adapter. If it has a type server, it is owned by an inbound channel adapter and this adapter will attempt to correlate messages to the connection on which an original inbound message was received. | |
client-mode | true, false | When true, the outbound adapter will attempt to establish the connection as soon as it is started. When false, the connection is established when the first message is sent. Default = false. Also see retry-interval and scheduler. The connection factory must be of type client and have single-use set to false. |
retry-interval | When in client-mode, specifies the number of milliseconds to wait between connection attempts, or after a connection failure. Default 60,000 (60 seconds). | |
scheduler | true, false | Specifies a |
Table 31.5. TCP Inbound Gateway Attributes
Attribute Name | Allowed Values | Attribute Description |
---|---|---|
connection-factory | The connection factory must be of type server. | |
request-channel | The channel to which incoming messages will be sent. | |
reply-channel | The channel on which reply messages may arrive. Usually replies will arrive on a temporary reply channel added to the inbound message header | |
reply-timeout | The time in milliseconds for which the gateway will wait for a reply. Default 1000 (1 second). | |
error-channel | If an Exception is thrown by a downstream component, the MessagingException message containing the exception and failed message is sent to this channel; any reply from that flow will then be returned as a response by the gateway. | |
client-mode | true, false | When true, the inbound gateway will act as a client, with respect to establishing the connection and then receive (and reply to) incoming messages on that connection. Default = false. Also see retry-interval and scheduler. The connection factory must be of type client and have single-use set to false. |
retry-interval | When in client-mode, specifies the number of milliseconds to wait between connection attempts, or after a connection failure. Default 60,000 (60 seconds). | |
scheduler | true, false | Specifies a |
Table 31.6. TCP Outbound Gateway Attributes
Attribute Name | Allowed Values | Attribute Description |
---|---|---|
connection-factory | The connection factory must be of type client. | |
request-channel | The channel on which outgoing messages will arrive. | |
reply-channel | Optional. The channel to which reply messages may be sent. | |
remote-timeout | The time in milliseconds for which the gateway will wait for a reply from the remote system.
Mutually exclusive with | |
remote-timeout-expression | A SpEL expression, evaluated against the message to determine the time in milliseconds for which the gateway will wait for a reply from the remote system.
Mutually exclusive with | |
request-timeout | If a single-use connection factory is not being used, The time in milliseconds for which the gateway will wait to get access to the shared connection. | |
reply-timeout | The time in milliseconds for which the gateway will wait when sending the reply to the reply-channel. Only applies if the reply-channel might block, such as a bounded QueueChannel that is currently full. |
IP Message Headers.
=== IP Message Headers
The following MessageHeader
s are used by this module:
Header Name | IpHeaders Constant | Description |
---|---|---|
ip_hostname | HOSTNAME | The host name from which a TCP message or UDP packet was received.
If |
ip_address | IP_ADDRESS | The ip address from which a TCP message or UDP packet was received. |
ip_port | PORT | The remote port for a UDP packet. |
ip_localInetAddress | IP_LOCAL_ADDRESS | The local |
ip_ackTo | ACKADDRESS | The remote ip address to which UDP application-level acks will be sent. The framework includes acknowledgment information in the data packet. |
ip_ackId | ACK_ID | A correlation id for UDP application-level acks. The framework includes acknowledgment information in the data packet. |
ip_tcp_remotePort | REMOTE_PORT | The remote port for a TCP connection. |
ip_connectionId | CONNECTION_ID | A unique identifier for a TCP connection; set by the framework for inbound messages; when sending to a server-side inbound channel adapter, or replying to an inbound gateway, this header is required so the endpoint can determine which connection to send the message to. |
ip_actualConnectionId | ACTUAL_ CONNECTION_ID | For information only - when using a cached or failover client connection factory, contains the actual underlying connection id. |
contentType | MessageHeaders. CONTENT_TYPE | An optional content type for inbound messages; see below.
Note that, unlike the other header constants, this constant is in the class |
For inbound messages, ip_hostname
, ip_address
, ip_tcp_remotePort
and ip_connectionId
are mapped by the default
TcpHeaderMapper
.
Set the mapper’s addContentTypeHeader
property to true
and the mapper will set the contentType
header (application/octet-stream;charset="UTF-8"
) by default.
You can change the default by setting the contentType
property.
Users can add additional headers by subclassing TcpHeaderMapper
and overriding the method supplyCustomHeaders
.
For example, when using SSL, properties of the SSLSession
can be added by obtaining the session object from the
TcpConnection
object which is provided as an argument to the supplyCustomHeaders
method.
For outbound messages, String
payloads are converted to byte[]
using the default (UTF-8
) charset.
Set the charset
property to change the default.
When customizing the mapper properties, or subclassing, declare the mapper as a bean and provide an instance to the connection factory using the mapper
property
=== Annotation-Based Configuration
The following example from the samples repository is used to illustrate some of the configuration options when using annotations instead of XML.
@EnableIntegration @IntegrationComponentScan @Configuration public static class Config { @Value(${some.port}) private int port; @MessagingGateway(defaultRequestChannel="toTcp") public interface Gateway { String viaTcp(String in); } @Bean @ServiceActivator(inputChannel="toTcp") public MessageHandler tcpOutGate(AbstractClientConnectionFactory connectionFactory) { TcpOutboundGateway gate = new TcpOutboundGateway(); gate.setConnectionFactory(connectionFactory); gate.setOutputChannelName("resultToString"); return gate; } @Bean public TcpInboundGateway tcpInGate(AbstractServerConnectionFactory connectionFactory) { TcpInboundGateway inGate = new TcpInboundGateway(); inGate.setConnectionFactory(connectionFactory); inGate.setRequestChannel(fromTcp()); return inGate; } @Bean public MessageChannel fromTcp() { return new DirectChannel(); } @MessageEndpoint public static class Echo { @Transformer(inputChannel="fromTcp", outputChannel="toEcho") public String convert(byte[] bytes) { return new String(bytes); } @ServiceActivator(inputChannel="toEcho") public String upCase(String in) { return in.toUpperCase(); } @Transformer(inputChannel="resultToString") public String convertResult(byte[] bytes) { return new String(bytes); } } @Bean public AbstractClientConnectionFactory clientCF() { return new TcpNetClientConnectionFactory("localhost", this.port); } @Bean public AbstractServerConnectionFactory serverCF() { return new TcpNetServerConnectionFactory(this.port); } }
Standard Spring Integration annotation enabling the infrastructure for an integration application. | |
Searches for | |
The entry point to the client-side of the flow. The calling application can | |
Outbound endpoints consist of a | |
Inbound endpoints (in the TCP/UDP module) are all message-driven so just need to be declared as simple | |
This class provides a number of POJO methods for use in this sample flow (a | |
The client-side connection factory. | |
The server-side connection factory. |
Spring Integration provides support for interacting with Twitter. With the Twitter adapters you can both receive and send Twitter messages. You can also perform a Twitter search based on a schedule and publish the search results within Messages. Since version 4.0, a search outbound gateway is provided to perform dynamic searches.
Twitter is a social networking and micro-blogging service that enables its users to send and read messages known as tweets. Tweets are text-based posts of up to 140 characters displayed on the author’s profile page and delivered to the author’s subscribers who are known as followers.
Important | |
---|---|
Versions of Spring Integration prior to 2.1 were dependent upon the Twitter4J API, but with the release of Spring Social 1.0 GA, Spring Integration, as of version 2.1, now builds directly upon Spring Social’s Twitter support, instead of Twitter4J.
All Twitter endpoints require the configuration of a |
Spring Integration provides a convenient namespace configuration to define Twitter artifacts. You can enable it by adding the following within your XML header.
xmlns:int-twitter="http://www.springframework.org/schema/integration/twitter" xsi:schemaLocation="http://www.springframework.org/schema/integration/twitter https://www.springframework.org/schema/integration/twitter/spring-integration-twitter.xsd"
=== Twitter OAuth Configuration
For authenticated operations, Twitter uses OAuth - an authentication protocol that allows users to approve an application to act on their behalf without sharing their password. More information can be found at https://oauth.net or in this article https://hueniverse.com/oauth from Hueniverse. Please also see OAuth FAQ for more information about OAuth and Twitter.
In order to use OAuth authentication/authorization with Twitter you must create a new Application on the Twitter Developers site. Follow the directions below to create a new application and obtain consumer keys and an access token:
Register an app
link and fill out all required fields on the form provided; set Application Type
to Client
and depending on the nature of your application select Default Access Type
as Read & Write or Read-only and Submit the form.
If everything is successful you’ll be presented with the Consumer Key
and Consumer Secret
.
Copy both values in a safe place.
My Access Token
button on the side bar (right).
Click on it and you’ll be presented with two more values: Access Token
and Access Token Secret
.
Copy these values in a safe place as well.
=== Twitter Template
As mentioned above, Spring Integration relies upon Spring Social, and that library provides an implementation of the template pattern, o.s.social.twitter.api.impl.TwitterTemplate
to interact with Twitter.
For anonymous operations (e.g., search), you don’t have to define an instance of TwitterTemplate
explicitly, since a default instance will be created and injected into the endpoint.
However, for authenticated operations (update status, send direct message, etc.), you must configure a TwitterTemplate
as a bean and inject it explicitly into the endpoint, because the authentication configuration is required.
Below is a sample configuration of TwitterTemplate:
<bean id="twitterTemplate" class="o.s.social.twitter.api.impl.TwitterTemplate"> <constructor-arg value="4XzBPacJQxyBzzzH"/> <constructor-arg value="AbRxUAvyCtqQtvxFK8w5ZMtMj20KFhB6o"/> <constructor-arg value="21691649-4YZY5iJEOfz2A9qCFd9SjBRGb3HLmIm4HNE"/> <constructor-arg value="AbRxUAvyNCtqQtxFK8w5ZMtMj20KFhB6o"/> </bean>
Note | |
---|---|
The values above are not real. |
As you can see from the configuration above, all we need to do is to provide OAuth `attributes` as constructor arguments. The values would be those you obtained in the previous step. The order of constructor arguments is: 1) `consumerKey`, 2) `consumerSecret`, 3) `accessToken`, and 4) `accessTokenSecret`.
A more practical way to manage OAuth connection attributes would be via Spring’s property placeholder support by simply creating a property file (e.g., oauth.properties):
twitter.oauth.consumerKey=4XzBPacJQxyBzzzH
twitter.oauth.consumerSecret=AbRxUAvyCtqQtvxFK8w5ZMtMj20KFhB6o
twitter.oauth.accessToken=21691649-4YZY5iJEOfz2A9qCFd9SjBRGb3HLmIm4HNE
twitter.oauth.accessTokenSecret=AbRxUAvyNCtqQtxFK8w5ZMtMj20KFhB6o
Then, you can configure a property-placeholder
to point to the above property file:
<context:property-placeholder location="classpath:oauth.properties"/> <bean id="twitterTemplate" class="o.s.social.twitter.api.impl.TwitterTemplate"> <constructor-arg value="${twitter.oauth.consumerKey}"/> <constructor-arg value="${twitter.oauth.consumerSecret}"/> <constructor-arg value="${twitter.oauth.accessToken}"/> <constructor-arg value="${twitter.oauth.accessTokenSecret}"/> </bean>
Twitter inbound adapters allow you to receive Twitter Messages. There are several types of twitter messages, or tweets
Spring Integration version 2.0 and above provides support for receiving tweets as Timeline Updates, Direct Messages, Mention Messages as well as Search Results.
Important | |
---|---|
Every Inbound Twitter Channel Adapter is a Polling Consumer which means you have to provide a poller configuration. Twitter defines a concept of Rate Limiting. You can read more about it here: Rate Limiting. In a nutshell, Rate Limiting is a mechanism that Twitter uses to manage how often an application can poll for updates. You should consider this when setting your poller intervals so that the adapter polls in compliance with the Twitter policies. With Spring Integration prior to version 3.0, a hard-coded limit within the adapters was used to ensure the polling interval could not be less than 15 seconds. This is no longer the case and the poller configuration is applied directly. |
Another issue that we need to worry about is handling duplicate Tweets.
The same adapter (e.g., Search or Timeline Update) while polling on Twitter may receive the same values more than once.
For example if you keep searching on Twitter with the same search criteria you’ll end up with the same set of tweets unless some other new tweet that matches your search criteria was posted in between your searches.
In that situation you’ll get all the tweets you had before plus the new one.
But what you really want is only the new tweet(s).
Spring Integration provides an elegant mechanism for handling these situations.
The latest Tweet id will be stored in an instance of the org.springframework.integration.metadata.MetadataStore
strategy (e.g.
last retrieved tweet in this case).
For more information see Section 9.5, “Metadata Store”.
Note | |
---|---|
The key used to persist the latest twitter id is the value of the (required) |
Prior to version 4.0, the page size was hard-coded to 20.
This is now configurable using the page-size
attribute (defaults to 20).
==== Inbound Message Channel Adapter
This adapter allows you to receive updates from everyone you follow. It’s essentially the "Timeline Update" adapter.
<int-twitter:inbound-channel-adapter twitter-template="twitterTemplate" channel="inChannel"> <int:poller fixed-rate="5000" max-messages-per-poll="3"/> </int-twitter:inbound-channel-adapter>
==== Direct Inbound Message Channel Adapter
This adapter allows you to receive Direct Messages that were sent to you from other Twitter users.
<int-twitter:dm-inbound-channel-adapter twitter-template="twiterTemplate" channel="inboundDmChannel"> <int-poller fixed-rate="5000" max-messages-per-poll="3"/> </int-twitter:dm-inbound-channel-adapter>
==== Mentions Inbound Message Channel Adapter
This adapter allows you to receive Twitter Messages that Mention you via @user syntax.
<int-twitter:mentions-inbound-channel-adapter twitter-template="twiterTemplate" channel="inboundMentionsChannel"> <int:poller fixed-rate="5000" max-messages-per-poll="3"/> </int-twitter:mentions-inbound-channel-adapter>
==== Search Inbound Message Channel Adapter
This adapter allows you to perform searches. As you can see it is not necessary to define twitter-template since a search can be performed anonymously, however you must define a search query.
<int-twitter:search-inbound-channel-adapter query="#springintegration" channel="inboundMentionsChannel"> <int:poller fixed-rate="5000" max-messages-per-poll="3"/> </int-twitter:search-inbound-channel-adapter>
Refer to https://dev.twitter.com/docs/using-search to learn more about Twitter queries.
As you can see the configuration of all of these adapters is very similar to other inbound adapters with one exception.
Some may need to be injected with the twitter-template
.
Once received each Twitter Message would be encapsulated in a Spring Integration Message and sent to the channel specified by the channel
attribute.
Currently the Payload type of any Message is org.springframework.integration.twitter.core.Tweet
which is very similar to the object with the same name in Spring Social.
As we migrate to Spring Social we’ll be depending on their API and some of the artifacts that are currently in use will be obsolete, however we’ve already made sure that the impact of such migration is minimal by aligning our API with the current state (at the time of writing) of Spring Social.
To get the text from the org.springframework.social.twitter.api.Tweet
simply invoke the getText()
method.
Twitter outbound channel adapters allow you to send Twitter Messages, or tweets.
Spring Integration version 2.0 and above supports sending Status Update Messages and Direct Messages. Twitter outbound channel adapters will take the Message payload and send it as a Twitter message. Currently the only supported payload type is`String`, so consider adding a transformer if the payload of the incoming message is not a String.
==== Twitter Outbound Update Channel Adapter
This adapter allows you to send regular status updates by simply sending a Message to the channel identified by the channel
attribute.
<int-twitter:outbound-channel-adapter twitter-template="twitterTemplate" channel="twitterChannel"/>
The only extra configuration that is required for this adapter is the `twitter-template` reference.
Starting with version 4.0 the <int-twitter:outbound-channel-adapter>
supports a tweet-data-expression
to populate the TweetData
argument (Spring Social Twitter) using the message as the root object of the expression evaluation context.
The result can be a String
, which will be used for the TweetData
message; a Tweet
object, the text
of which will be used for the TweetData
message; or an entire TweetData
object.
For convenience, the TweetData
can be built from the expression directly without needing a fully qualified class name:
<int-twitter:outbound-channel-adapter twitter-template="twitterTemplate" channel="twitterChannel" tweet-data-expression="new TweetData(payload).withMedia(headers.media).displayCoordinates(true)/>
This allows, for example, attaching an image to the tweet.
==== Twitter Outbound Direct Message Channel Adapter
This adapter allows you to send Direct Twitter Messages (i.e., @user) by simply sending a Message to the channel identified by the channel
attribute.
<int-twitter:dm-outbound-channel-adapter twitter-template="twitterTemplate" channel="twitterChannel"/>
The only extra configuration that is required for this adapter is the `twitter-template` reference.
When it comes to Twitter Direct Messages, you must specify who you are sending the message to - the target userid.
The Twitter Outbound Direct Message Channel Adapter will look for a target userid in the Message headers under the name twitter_dmTargetUserId
which is also identified by the following constant: TwitterHeaders.DM_TARGET_USER_ID
.
So when creating a Message all you need to do is add a value for that header.
Message message = MessageBuilder.withPayload("hello") .setHeader(TwitterHeaders.DM_TARGET_USER_ID, "z_oleg").build();
The above approach works well if you are creating the Message programmatically. However it’s more common to provide the header value within a messaging flow. The value can be provided by an upstream <header-enricher>.
<int:header-enricher input-channel="in" output-channel="out"> <int:header name="twitter_dmTargetUserId" value="z_oleg"/> </int:header-enricher>
It’s quite common that the value must be determined dynamically. For those cases you can take advantage of SpEL support within the <header-enricher>.
<int:header-enricher input-channel="in" output-channel="out"> <int:header name="twitter_dmTargetUserId" expression="@twitterIdService.lookup(headers.username)"/> </int:header-enricher>
Important | |
---|---|
Twitter does not allow you to post duplicate Messages. This is a common problem during testing when the same code works the first time but does not work the second time. So, make sure to change the content of the Message each time. Another thing that works well for testing is to append a timestamp to the end of each message. |
=== Twitter Search Outbound Gateway
In Spring Integration, an outbound gateway is used for two-way request/response communication with an external service.
The Twitter Search Outbound Gateway allows you to issue dynamic twitter searches.
The reply message payload is a collection of Tweet
objects.
If the search returns no results, the payload is an empty collection.
You can limit the number of tweets and you can page through a larger set of tweets by making multiple calls.
To facilitate this, search reply messages contain a header twitter_searchMetadata
with its value being a SearchMetadata
object.
For more information on the Tweet
, SearchParameters
and SearchMetadata
classes, refer to the Spring Social Twitter documentation.
Configuring the Outbound Gateway
<int-twitter:search-outbound-gateway id="twitter" request-channel="in" twitter-template="twitterTemplate" search-args-expression="payload" reply-channel="out" reply-timeout="123" order="1" auto-startup="false" phase="100" />
The channel used to send search requests to this gateway. | |
A reference to a | |
A SpEL expression that evaluates to argument(s) for the search.
Default: "payload" - in which case the payload can be a | |
The channel to which to send the reply; if omitted, the | |
The timeout when sending the reply message to the reply channel; only applies if the reply channel can block, for example a bounded queue channel that is full. | |
When subscribed to a publish/subscribe channel, the order in which this endpoint will be invoked. | |
| |
|
Starting with version 4.1 Spring Integration has introduced WebSocket support.
It is based on architecture, infrastructure and API from the Spring Framework’s web-socket module.
Therefore, many of Spring WebSocket’s components (e.g.
SubProtocolHandler
or WebSocketClient
) and configuration options (e.g.
@EnableWebSocketMessageBroker
) can be reused within Spring Integration.
For more information, please, refer to the Spring Framework WebSocket Support chapter in the Spring Framework reference manual.
Note | |
---|---|
Since the Spring Framework WebSocket infrastructure is based on the Spring Messaging foundation and provides a basic Messaging framework based on the same |
@MessagingGateway @Controller public interface WebSocketGateway { @MessageMapping("/greeting") @SendToUser("/queue/answer") @Gateway(requestChannel = "greetingChannel") String greeting(String payload); }
Since the WebSocket protocol is streaming by definition and we can send and receive messages to/from a WebSocket at the same time, we can simply deal with an appropriate WebSocketSession
, regardless of being on the client or server side.
To encapsulate the connection management and WebSocketSession
registry, the IntegrationWebSocketContainer
is provided with ClientWebSocketContainer
and ServerWebSocketContainer
implementations.
Thanks to the WebSocket API and its implementation in the Spring Framework, with many extensions, the same classes are used on the server side as well as the client side (from a Java perspective, of course).
Hence most connection and WebSocketSession
registry options are the same on both sides.
That allows us to reuse many configuration items and infrastructure hooks to build WebSocket applications on the server side as well as on the client side:
//Client side @Bean public WebSocketClient webSocketClient() { return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient()))); } @Bean public IntegrationWebSocketContainer clientWebSocketContainer() { return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint"); } //Server side @Bean public IntegrationWebSocketContainer serverWebSocketContainer() { return new ServerWebSocketContainer("/endpoint").withSockJs(); }
The IntegrationWebSocketContainer
is designed to achieve bidirectional messaging and can be shared between Inbound and Outbound Channel Adapters (see below), can be referenced only from one of them (when using one-way - sending or receiving - WebSocket messaging).
It can be used without any Channel Adapter, but in this case, IntegrationWebSocketContainer
only plays a role as the WebSocketSession
registry.
Note | |
---|---|
The |
=== WebSocket Inbound Channel Adapter
The WebSocketInboundChannelAdapter
implements the receiving part of WebSocketSession
interaction.
It must be supplied with a IntegrationWebSocketContainer
, and the adapter registers itself as a WebSocketListener
to handle incoming messages and WebSocketSession
events.
Note | |
---|---|
Only one |
For WebSocket _sub-protocol_s, the WebSocketInboundChannelAdapter
can be configured with SubProtocolHandlerRegistry
as the second constructor argument.
The adapter delegates to the SubProtocolHandlerRegistry
to determine the appropriate SubProtocolHandler
for the accepted WebSocketSession
and to convert WebSocketMessage
to a Message
according to the sub-protocol implementation.
Note | |
---|---|
By default, the |
The WebSocketInboundChannelAdapter
accepts and sends to the underlying integration flow only Message
s with SimpMessageType.MESSAGE
or an empty simpMessageType
header.
All other Message
types are handled through the ApplicationEvent
s emitted from a SubProtocolHandler
implementation (e.g.
StompSubProtocolHandler
).
On the server side WebSocketInboundChannelAdapter
can be configured with the useBroker = true
option, if the @EnableWebSocketMessageBroker
configuration is present.
In this case all non-MESSAGE
Message
types are delegated to the provided AbstractBrokerMessageHandler
.
In addition, if the Broker Relay is configured with destination prefixes, those Messages, which match to the Broker destinations, are routed to the AbstractBrokerMessageHandler
, instead of to the outputChannel
of the WebSocketInboundChannelAdapter
.
If useBroker = false
and received message is of SimpMessageType.CONNECT
type, the WebSocketInboundChannelAdapter
sends SimpMessageType.CONNECT_ACK
message to the WebSocketSession
immediately without sending it to the channel.
Note | |
---|---|
Spring’s WebSocket Support allows the configuration of only one Broker Relay, hence we don’t require an |
For more configuration options see Section 31.3.2, “TCP Failover Client Connection Factory”.
=== WebSocket Outbound Channel Adapter
The WebSocketOutboundChannelAdapter
accepts Spring Integration messages from its MessageChannel
, determines the WebSocketSession
id
from the MessageHeaders
, retrieves the WebSocketSession
from the provided IntegrationWebSocketContainer
and delegates the conversion and sending WebSocketMessage
work to the appropriate SubProtocolHandler
from the provided SubProtocolHandlerRegistry
.
On the client side, the WebSocketSession
id
message header isn’t required, because ClientWebSocketContainer
deals only with a single connection and its WebSocketSession
respectively.
To use the STOMP sub-protocol, this adapter should be configured with a StompSubProtocolHandler
.
Then you can send any STOMP message type to this adapter, using StompHeaderAccessor.create(StompCommand...)
and a MessageBuilder
, or just using a HeaderEnricher
(see Section 7.2.2, “Header Enricher”).
For more configuration options see below.
=== WebSockets Namespace Support
Spring Integration WebSocket namespace includes several components described below. To include it in your configuration, simply provide the following namespace declaration in your application context configuration file:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket" xsi:schemaLocation=" http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/websocket https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd"> ... </beans>
<int-websocket:client-container>
<int-websocket:client-container id="" client="" uri="" uri-variables="" origin="" send-time-limit="" send-buffer-size-limit="" auto-startup="" phase=""> <int-websocket:http-headers> <entry key="" value=""/> </int-websocket:http-headers> </int-websocket:client-container>
The component bean name. | |
The | |
The | |
Comma-separated values for the URI variable placeholders within the | |
The | |
The WebSocket session send timeout limit.
Defaults to | |
The WebSocket session send message size limit.
Defaults to | |
Boolean value indicating whether this endpoint should start automatically.
Defaults to | |
The lifecycle phase within which this endpoint should start and stop.
The lower the value the earlier this endpoint will start and the later it will stop.
The default is | |
A |
<int-websocket:server-container>
<int-websocket:server-container id="" path="" handshake-handler="" handshake-interceptors="" decorator-factories="" send-time-limit="" send-buffer-size-limit="" allowed-origins=""> <int-websocket:sockjs client-library-url="" stream-bytes-limit="" session-cookie-needed="" heartbeat-time="" disconnect-delay="" message-cache-size="" websocket-enabled="" scheduler="" (16) message-codec="" (17) transport-handlers="" (18) suppress-cors="true"="" /> (19) </int-websocket:server-container>
The component bean name. | |
A path (or comma-separated paths) that maps a particular request to a | |
The | |
List of | |
Configure one or more factories ( | |
See the same option on the | |
See the same option on the | |
Configure allowed Origin header values. Multiple origins may be specified as a comma-separated list. This check is mostly designed for browser clients. There is noting preventing other types of client to modify the Origin header value. When SockJS is enabled and allowed origins are restricted, transport types that do not use Origin headers for cross origin requests (jsonp-polling, iframe-xhr-polling, iframe-eventsource and iframe-htmlfile) are disabled. As a consequence, IE6/IE7 are not supported and IE8/IE9 will only be supported without cookies. By default, all origins are allowed. | |
Transports with no native cross-domain communication (e.g.
"eventsource", "htmlfile") must get a simple page from the "foreign" domain in an invisible iframe so that code in the iframe can run from a domain local to the SockJS server.
Since the iframe needs to load the SockJS javascript client library, this property allows specifying where to load it from.
By default this is set to point to | |
Minimum number of bytes that can be send over a single HTTP streaming request before it will be closed.
Defaults to | |
The "cookie_needed" value in the response from the SockJs | |
The amount of time in milliseconds when the server has not sent any messages and after which the server should
send a heartbeat frame to the client in order to keep the connection from breaking.
The default value is | |
The amount of time in milliseconds before a client is considered disconnected after not having a receiving
connection, i.e.
an active connection over which the server can send data to the client.
The default value is | |
The number of server-to-client messages that a session can cache while waiting for the next HTTP polling request
from the client.
The default size is | |
Some load balancers don’t support websockets.
Set this option to | |
The | |
The | |
List of | |
The option to disable automatic addition of CORS headers for SockJS requests.
The default value is |
<int-websocket:outbound-channel-adapter>
<int-websocket:outbound-channel-adapter id="" channel="" container="" default-protocol-handler="" protocol-handlers="" message-converters="" merge-with-default-converters="" auto-startup="" phase=""/>
The component bean name.
If the | |
Identifies the channel attached to this adapter. | |
The reference to the | |
Optional reference to a | |
List of | |
List of | |
Flag to indicate if the default converters should be registered after any custom converters.
This flag is used only if | |
Boolean value indicating whether this endpoint should start automatically.
Default to | |
The lifecycle phase within which this endpoint should start and stop.
The lower the value the earlier this endpoint will start and the later it will stop.
The default is |
<int-websocket:inbound-channel-adapter>
<int-websocket:inbound-channel-adapter id="" channel="" error-channel="" container="" default-protocol-handler="" protocol-handlers="" message-converters="" merge-with-default-converters="" send-timeout="" payload-type="" use-broker="" auto-startup="" phase=""/>
The component bean name.
If the | |
Identifies the channel attached to this adapter. | |
The | |
See the same option on the | |
See the same option on the | |
See the same option on the | |
See the same option on the | |
See the same option on the | |
Maximum amount of time in milliseconds to wait when sending a message to the channel if the channel may block.
For example, a | |
Fully qualified name of the java type for the target | |
Flag to indicate if this adapter will send | |
See the same option on the | |
See the same option on the |
Starting with version 4.3.13, the ClientStompEncoder
is provided as an extension of standard StompEncoder
for using on client side of the WebSocket Channel Adapters.
An instance of the ClientStompEncoder
must be injected into the StompSubProtocolHandler
for proper client side message preparation.
One of the problem of the default StompSubProtocolHandler
that it was designed for the server side, so it updates the SEND
stompCommand
header into MESSAGE
as it must be by the STOMP protocol from server side.
If client doesn’t send its messages in the proper SEND
web socket frame, some STOMP brokers won’t accept them.
The purpose of the ClientStompEncoder
, in this case, is to override stompCommand
header to the SEND
value before encoding the message to the byte[]
.
=== Outbound Web Service Gateways
To invoke a Web Service upon sending a message to a channel, there are two options - both of which build upon the Spring Web Services project: SimpleWebServiceOutboundGateway
and MarshallingWebServiceOutboundGateway
.
The former will accept either a String
or javax.xml.transform.Source
as the message payload.
The latter provides support for any implementation of the Marshaller
and Unmarshaller
interfaces.
Both require a Spring Web Services DestinationProvider
for determining the URI of the Web Service to be called.
simpleGateway = new SimpleWebServiceOutboundGateway(destinationProvider); marshallingGateway = new MarshallingWebServiceOutboundGateway(destinationProvider, marshaller);
Note | |
---|---|
When using the namespace support described below, you will only need to set a URI.
Internally, the parser will configure a fixed URI |
For more detail on the inner workings, see the Spring Web Services reference guide’s chapter covering client access as well as the chapter covering Object/XML mapping.
=== Inbound Web Service Gateways
To send a message to a channel upon receiving a Web Service invocation, there are two options again: SimpleWebServiceInboundGateway
and MarshallingWebServiceInboundGateway
.
The former will extract a javax.xml.transform.Source
from the WebServiceMessage
and set it as the message payload.
The latter provides support for implementation of the Marshaller
and Unmarshaller
interfaces.
If the incoming web service message is a SOAP message the SOAP Action header will be added to the headers of the`Message` that is forwarded onto the request channel.
simpleGateway = new SimpleWebServiceInboundGateway(); simpleGateway.setRequestChannel(forwardOntoThisChannel); simpleGateway.setReplyChannel(listenForResponseHere); //Optional marshallingGateway = new MarshallingWebServiceInboundGateway(marshaller); //set request and optionally reply channel
Both gateways implement the Spring Web Services MessageEndpoint
interface, so they can be configured with a MessageDispatcherServlet
as per standard Spring Web Services configuration.
For more detail on how to use these components, see the Spring Web Services reference guide’s chapter covering creating a Web Service. The chapter covering Object/XML mapping is also applicable again.
To include the SimpleWebServiceInboundGateway
and MarshallingWebServiceInboundGateway
configurations to the Spring WS
infrastructure you should add the EndpointMapping
definition between MessageDispatcherServlet
and the target
MessageEndpoint
implementations like you do that with normal Spring WS application.
For this purpose (from Spring Integration perspective), the Spring WS provides these convenient EndpointMapping
implementations:
o.s.ws.server.endpoint.mapping.UriEndpointMapping
o.s.ws.server.endpoint.mapping.PayloadRootQNameEndpointMapping
o.s.ws.soap.server.endpoint.mapping.SoapActionEndpointMapping
o.s.ws.server.endpoint.mapping.XPathPayloadEndpointMapping
The beans for these classes must be specified in the application context referencing to the
SimpleWebServiceInboundGateway
and/or MarshallingWebServiceInboundGateway
bean definitions according to the WS
mapping algorithm.
Please, refer to the Endpoint mappings for the more information.
=== Web Service Namespace Support
To configure an outbound Web Service Gateway, use the "outbound-gateway" element from the "ws" namespace:
<int-ws:outbound-gateway id="simpleGateway" request-channel="inputChannel" uri="https://example.org"/>
Note | |
---|---|
Notice that this example does not provide a reply-channel.
If the Web Service were to return a non-empty response, the Message containing that response would be sent to the reply
channel provided in the request Message’s |
Tip | |
---|---|
When invoking a Web Service that returns an empty response after using a String payload for the request Message, no reply Message will be sent by default. Therefore you don’t need to set a reply-channel or have a REPLY_CHANNEL header in the request Message. If for any reason you actually do want to receive the empty response as a Message, then provide the ignore-empty-responses attribute with a value of false (this only applies for Strings, because using a Source or Document object simply leads to a NULL response and will therefore never generate a reply Message). |
To set up an inbound Web Service Gateway, use the "inbound-gateway":
<int-ws:inbound-gateway id="simpleGateway" request-channel="inputChannel"/>
To use Spring OXM Marshallers and/or Unmarshallers, provide bean references. For outbound:
<int-ws:outbound-gateway id="marshallingGateway" request-channel="requestChannel" uri="https://example.org" marshaller="someMarshaller" unmarshaller="someUnmarshaller"/>
And for inbound:
<int-ws:inbound-gateway id="marshallingGateway" request-channel="requestChannel" marshaller="someMarshaller" unmarshaller="someUnmarshaller"/>
Note | |
---|---|
Most |
For either outbound gateway type, a "destination-provider" attribute can be specified instead of the "uri" (exactly one of them is required). You can then reference any Spring Web Services DestinationProvider implementation (e.g. to lookup the URI at runtime from a registry).
For either outbound gateway type, the "message-factory" attribute can also be configured with a reference to any Spring Web Services WebServiceMessageFactory
implementation.
For the simple inbound gateway type, the "extract-payload" attribute can be set to false to forward the entire WebServiceMessage
instead of just its payload as a Message
to the request channel.
This might be useful, for example, when a custom Transformer works against the WebServiceMessage
directly.
=== Outbound URI Configuration
For all URI-schemes supported by Spring Web Services (URIs and Transports) <uri-variable/>
substitution is provided:
<ws:outbound-gateway id="gateway" request-channel="input" uri="https://springsource.org/{foo}-{bar}"> <ws:uri-variable name="foo" expression="payload.substring(1,7)"/> <ws:uri-variable name="bar" expression="headers.x"/> </ws:outbound-gateway> <ws:outbound-gateway request-channel="inputJms" uri="jms:{destination}?deliveryMode={deliveryMode}&priority={priority}" message-sender="jmsMessageSender"> <ws:uri-variable name="destination" expression="headers.jmsQueue"/> <ws:uri-variable name="deliveryMode" expression="headers.deliveryMode"/> <ws:uri-variable name="priority" expression="headers.jms_priority"/> </ws:outbound-gateway>
If a DestinationProvider
is supplied, variable substitution is not supported and a configuration error will result if variables are provided.
Controlling URI Encoding
By default, the URL string is encoded (see UriComponentsBuilder) to the URI object before sending the request.
In some scenarios with a non-standard URI it is undesirable to perform the encoding.
Since version 4.1 the <ws:outbound-gateway/>
provides an encode-uri
attribute.
To disable encoding the URL, this attribute should be set to false
(by default it is true
).
If you wish to partially encode some of the URL, this can be achieved using an expression
within a <uri-variable/>
:
<ws:outbound-gateway url="http://somehost/%2f/fooApps?bar={param}" encode-uri="false"> <http:uri-variable name="param" expression="T(org.apache.commons.httpclient.util.URIUtil) .encodeWithinQuery('Hello World!')"/> </ws:outbound-gateway>
Note, encode-uri
is ignored, if DestinationProvider
is supplied.
The Spring Integration WebService Gateways will map the SOAP Action header automatically.
It will be copied by default to and from Spring Integration MessageHeaders
using the
DefaultSoapHeaderMapper.
Of course, you can pass in your own implementation of SOAP specific header mappers, as the gateways have respective properties to support that.
Any user-defined SOAP headers will NOT
be copied to or from a SOAP Message, unless explicitly specified by the requestHeaderNames and/or
replyHeaderNames properties of the DefaultSoapHeaderMapper
.
When using the XML namespace for configuration, these properties can be set using the mapped-request-headers
and
mapped-reply-headers
, or a custom mapper can be provided using the header-mapper
attribute.
Tip | |
---|---|
When mapping user-defined headers, the values can also contain simple wildcard patterns (e.g. "foo*" or "*foo") to be matched.
For example, if you need to copy all user-defined headers simply use the wildcard character |
Starting with version 4.1, the AbstractHeaderMapper
(a DefaultSoapHeaderMapper
superclass) allows the
NON_STANDARD_HEADERS
token to be configured for the requestHeaderNames and/or replyHeaderNames
properties (in addition to existing STANDARD_REQUEST_HEADERS
and STANDARD_REPLY_HEADERS
) to map all
user-defined headers.
Note, it is recommended to use the combination like this STANDARD_REPLY_HEADERS, NON_STANDARD_HEADERS
instead of a
*
, to avoid mapping of request headers to the reply.
Starting with version 4.3, patterns in the header mappings can be negated by preceding the pattern with !
.
Negated patterns get priority, so a list such as
STANDARD_REQUEST_HEADERS,foo,ba*,!bar,!baz,qux,!foo
will NOT map foo
(nor bar
nor baz
); the standard headers plus bad
, qux
will be mapped.
Important | |
---|---|
If you have a user defined header that begins with |
Inbound SOAP headers (request headers for the inbound gateway, reply-headers for the outbound gateway) are mapped as
SoapHeaderElement
objects.
The contents can be explored by accessing the Source
:
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"> <soapenv:Header> <auth> <username>user</username> <password>pass</password> </auth> <bar>BAR</bar> <baz>BAZ</baz> <qux>qux</qux> </soapenv:Header> <soapenv:Body> ... </soapenv:Body> </soapenv:Envelope>
If mapped-request-headers
is "auth, ba*"
, the auth
, bar
and baz
headers are mapped but qux
is not.
... SoapHeaderElement header = (SoapHeaderElement) headers.get("auth"); DOMSource source = (DOMSource) header.getSource(); NodeList nodeList = source.getNode().getChildNodes(); assertEquals("username", nodeList.item(0).getNodeName()); assertEquals("user", nodeList.item(0).getFirstChild().getNodeValue()); ...
== XML Support - Dealing with XML Payloads
Spring Integration’s XML support extends the core of Spring Integration with the following components:
These components are designed to make working with XML messages in Spring Integration simple.
The provided messaging components are designed to work with XML represented in a range of formats including instances of java.lang.String
, org.w3c.dom.Document
and javax.xml.transform.Source
.
It should be noted however that where a DOM representation is required, for example in order to evaluate an XPath expression, the String
payload will be converted into the required type and then converted back again to String
.
Components that require an instance of DocumentBuilder
will create a namespace-aware instance if one is not provided.
In cases where you require greater control over document creation, you can provide an appropriately configured instance of DocumentBuilder
.
All components within the Spring Integration XML module provide namespace support. In order to enable namespace support, you need to import the respective schema for the Spring Integration XML Module. A typical setup is shown below:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-xml="http://www.springframework.org/schema/integration/xml" xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/xml https://www.springframework.org/schema/integration/xml/spring-integration-xml.xsd"> </beans>
Many of the components within the Spring Integration XML module work with XPath Expressions.
Each of those components will either reference an XPath Expression that has been defined as top-level element or via a nested <xpath-expression/>
element.
All forms of XPath expressions result in the creation of an XPathExpression
using the Spring org.springframework.xml.xpath.XPathExpressionFactory
.
When creating XPath expressions, the best XPath implementation that is available on the classpath is being used, either JAXP 1.3+ or Jaxen, whereby JAXP is preferred.
Note | |
---|---|
Spring Integration under the covers uses the XPath functionality as provided by the Spring Web Services project (https://www.spring.io/spring-ws). Specifically, Spring Web Services' XML module (spring-xml-x.x.x.jar) is being used. Therefore, for a deeper understanding, please refer to the respective documentation as well at: https://docs.spring.io/spring-ws/docs/current/reference/html/common.html#xpath |
Here is an overview of all available configuration parameters of the xpath-expression
element:
<int-xml:xpath-expression expression="" id="" namespace-map="" ns-prefix="" ns-uri=""> <map></map> </int-xml:xpath-expression>
Defines an XPath xpression. Required. | |
The Identifier of the underlying bean definition. Will be an instance of `org.springframework.xml.xpath.XPathExpression`Optional. | |
Reference to a map containing namespaces.
The key of the map defines the namespace prefix and the value of the map sets the namespace URI.
It is not valid to specify both this attribute and the | |
Allows you to set the namspace prefix directly as and attribute on the XPath expression element.
If you set | |
Allows you to set the namspace URI directly as an attribute on the XPath expression element.
If you set | |
Defines a map containing namespaces. Only one map child element is allowed. The key of the map defines the namespace prefix and the value of the map sets the namespace URI. |
It is not valid to specify both this sub-element and the map
attribute, or setting the ns-prefix
and ns-uri
attributes.
Optional.
===== Providing Namespaces (Optional) to XPath Expressions
For the XPath Expression Element, namespace information can be optionally provided as configuration parameters. As such, namespaces can be defined using one of the following 3 choices:
namespace-map
attribute
map
sub-element
ns-prefix
and the ns-uri
attribute
All three options are mutually exclusive. Only one option can be set.
Below, please find several different usage examples on how to use XPath expressions using the XML namespace support including the various option for setting the XML namespaces as discussed above.
<int-xml:xpath-filter id="filterReferencingXPathExpression" xpath-expression-ref="refToXpathExpression"/> <int-xml:xpath-expression id="refToXpathExpression" expression="/name"/> <int-xml:xpath-filter id="filterWithoutNamespace"> <int-xml:xpath-expression expression="/name"/> </int-xml:xpath-filter> <int-xml:xpath-filter id="filterWithOneNamespace"> <int-xml:xpath-expression expression="/ns1:name" ns-prefix="ns1" ns-uri="www.example.org"/> </int-xml:xpath-filter> <int-xml:xpath-filter id="filterWithTwoNamespaces"> <int-xml:xpath-expression expression="/ns1:name/ns2:type"> <map> <entry key="ns1" value="www.example.org/one"/> <entry key="ns2" value="www.example.org/two"/> </map> </int-xml:xpath-expression> </int-xml:xpath-filter> <int-xml:xpath-filter id="filterWithNamespaceMapReference"> <int-xml:xpath-expression expression="/ns1:name/ns2:type" namespace-map="defaultNamespaces"/> </int-xml:xpath-filter> <util:map id="defaultNamespaces"> <util:entry key="ns1" value="www.example.org/one"/> <util:entry key="ns2" value="www.example.org/two"/> </util:map>
===== Using XPath Expressions with Default Namespaces
When working with default namespaces, you may run into situations that behave differently than originally expected. Let’s assume we have the following XML document:
<?xml version="1.0" encoding="UTF-8"?> <order> <orderItem> <isbn>0321200683</isbn> <quantity>2</quantity> </orderItem> <orderItem> <isbn>1590596439</isbn> <quantity>1</quantity> </orderItem> </order>
This document is not declaring any namespace. Therefore, applying the following XPath Expression will work as expected:
<int-xml:xpath-expression expression="/order/orderItem" />
You might expect that the same expression will also work for the following XML file. It looks exactly the same as the previous example but in addition it also declares a default namespace:
<?xml version="1.0" encoding="UTF-8"?> <order xmlns="http://www.example.org/orders"> <orderItem> <isbn>0321200683</isbn> <quantity>2</quantity> </orderItem> <orderItem> <isbn>1590596439</isbn> <quantity>1</quantity> </orderItem> </order>
However, the XPath Expression used previously will fail in this case.
In order to solve this issue, you must provide a namespace prefix and a namespace URI using either the ns-prefix and ns-uri attribute or by providing a namespace-map attribute instead. The namespace URI must match the namespace declared in your XML document, which in this example is http://www.example.org/orders.
The namespace prefix, however, can be arbitrarily chosen. In fact, just providing an empty String will actually work (Null is not allowed). In the case of a namespace prefix consisting of an empty String, your Xpath Expression will use a colon (":") to indicate the default namespace. If you leave the colon off, the XPath expression will not match. The following XPath Expression will match against the XML document above:
<int-xml:xpath-expression expression="/:order/:orderItem" ns-prefix="" ns-uri="https://www.example.org/prodcuts"/>
Of course you can also provide any other arbitrarily chosen namespace prefix. The following XPath expression using the myorder namespace prefix will match also:
<int-xml:xpath-expression expression="/myorder:order/myorder:orderItem" ns-prefix="myorder" ns-uri="https://www.example.org/prodcuts"/>
It is important to remember that the namespace URI is the really important piece of information to declare, not the prefix itself. The Jaxen FAQ summarizes the point very well:
In XPath 1.0, all unprefixed names are unqualified. There is no requirement that the prefixes used in the XPath expression are the same as the prefixes used in the document being queried. Only the namespace URIs need to match, not the prefixes.
==== Configuring Transformers as Beans
This section will explain the workings of the following transformers and how to configure them as beans:
All of the provided XML transformers extend AbstractTransformer or AbstractPayloadTransformer and therefore implement Transformer. When configuring XML transformers as beans in Spring Integration, you would normally configure the Transformer in conjunction with a MessageTransformingHandler. This allows the transformer to be used as an Endpoint. Finally, the namespace support will be discussed, which allows for the simple configuration of the transformers as elements in XML.
===== UnmarshallingTransformer
An UnmarshallingTransformer allows an XML Source
to be unmarshalled using implementations of the Spring OXM Unmarshaller
.
Spring’s Object/XML Mapping support provides several implementations supporting marshalling and unmarshalling using JAXB, Castor and JiBX amongst others.
The unmarshaller requires an instance of Source
.
If the message payload is not an instance of Source
, conversion will be attempted.
Currently String
, File
and org.w3c.dom.Document
payloads are supported.
Custom conversion to a Source
is also supported by injecting an implementation of a SourceFactory.
Note | |
---|---|
If a |
<bean id="unmarshallingTransformer" class="o.s.i.xml.transformer.UnmarshallingTransformer"> <constructor-arg> <bean class="org.springframework.oxm.jaxb.Jaxb2Marshaller"> <property name="contextPath" value="org.example" /> </bean> </constructor-arg> </bean>
The MarshallingTransformer allows an object graph to be converted into XML using a Spring OXM Marshaller
.
By default the MarshallingTransformer
will return a DomResult
.
However, the type of result can be controlled by configuring an alternative ResultFactory
such as StringResultFactory
.
In many cases it will be more convenient to transform the payload into an alternative XML format.
To achieve this, configure a ResultTransformer
.
Two implementations are provided, one which converts to String
and another which converts to Document
.
<bean id="marshallingTransformer" class="o.s.i.xml.transformer.MarshallingTransformer"> <constructor-arg> <bean class="org.springframework.oxm.jaxb.Jaxb2Marshaller"> <property name="contextPath" value="org.example"/> </bean> </constructor-arg> <constructor-arg> <bean class="o.s.i.xml.transformer.ResultToDocumentTransformer"/> </constructor-arg> </bean>
By default, the MarshallingTransformer
will pass the payload Object to the Marshaller
, but if its boolean extractPayload
property is set to false
, the entire Message
instance will be passed to the Marshaller
instead.
That may be useful for certain custom implementations of the Marshaller
interface, but typically the payload is the appropriate source Object for marshalling when delegating to any of the various out-of-the-box Marshaller
implementations.
XsltPayloadTransformer transforms XML payloads using Extensible Stylesheet Language Transformations (XSLT).
The transformer’s constructor requires an instance of either Resource or Templates to be passed in.
Passing in a Templates
instance allows for greater configuration of the TransformerFactory
used to create the template instance.
As with the UnmarshallingTransformer, the XsltPayloadTransformer
will do the actual XSLT transformation using instances of Source
.
Therefore, if the message payload is not an instance of Source
, conversion will be attempted.
String
and Document
payloads are supported directly.
Custom conversion to a Source
is also supported by injecting an implementation of a SourceFactory.
Note | |
---|---|
If a |
By default, the XsltPayloadTransformer
will create a message with a Result payload, similar to the XmlPayloadMarshallingTransformer
.
This can be customised by providing a ResultFactory and/or a ResultTransformer.
<bean id="xsltPayloadTransformer" class="o.s.i.xml.transformer.XsltPayloadTransformer"> <constructor-arg value="classpath:org/example/xsl/transform.xsl"/> <constructor-arg> <bean class="o.s.i.xml.transformer.ResultToDocumentTransformer"/> </constructor-arg> </bean>
Starting with Spring Integration 3.0, you can now specify the transformer factory class name using a constructor argument.
This is configured using the transformer-factory-class
attribute when using the namespace.
Both the MarshallingTransformer
and the XsltPayloadTransformer
allow you to specify a ResultTransformer.
Thus, if the Marshalling or XSLT transformation returns a Result, than you have the option to also use a ResultTransformer
to transform the Result
into another format.
Spring Integration provides 2 concrete`ResultTransformer` implementations:
Using ResultTransformers with the MarshallingTransformer
By default, the MarshallingTransformer will always return a Result.
By specifying a ResultTransformer
, you can customize the type of payload returned.
Using ResultTransformers with the XsltPayloadTransformer
The behavior is slighly more complex for the XsltPayloadTransformer.
By default, if the input payload is an instance of String
or Document the resultTransformer property is ignored.
However, if the input payload is a Source or any other type, then the resultTransformer property is applied.
Additionally, you can set the property alwaysUseResultFactory to true
, which will also cause the specified resultTransformer to being used.
For more information and examples, please see Section 31.3.2, “TCP Failover Client Connection Factory”
==== Namespace Support for XML Transformers
Namespace support for all XML transformers is provided in the Spring Integration XML namespace, a template for which can be seen below.
The namespace support for transformers creates an instance of either`EventDrivenConsumer` or PollingConsumer
according to the type of the provided input channel.
The namespace support is designed to reduce the amount of XML configuration by allowing the creation of an endpoint and transformer using one element.
UnmarshallingTransformer
The namespace support for the UnmarshallingTransformer
is shown below.
Since the namespace is now creating an endpoint instance rather than a transformer, a poller can also be nested within the element to control the polling of the input channel.
<int-xml:unmarshalling-transformer id="defaultUnmarshaller" input-channel="input" output-channel="output" unmarshaller="unmarshaller"/> <int-xml:unmarshalling-transformer id="unmarshallerWithPoller" input-channel="input" output-channel="output" unmarshaller="unmarshaller"> <int:poller fixed-rate="2000"/> <int-xml:unmarshalling-transformer/>
MarshallingTransformer
The namespace support for the marshalling transformer requires an input-channel
, output-channel
and a reference to a marshaller
.
The optional result-type
attribute can be used to control the type of result created.
Valid values are StringResult
or DomResult
(the default).
<int-xml:marshalling-transformer input-channel="marshallingTransformerStringResultFactory" output-channel="output" marshaller="marshaller" result-type="StringResult" /> <int-xml:marshalling-transformer input-channel="marshallingTransformerWithResultTransformer" output-channel="output" marshaller="marshaller" result-transformer="resultTransformer" /> <bean id="resultTransformer" class="o.s.i.xml.transformer.ResultToStringTransformer"/>
Where the provided result types are not sufficient, a reference to a custom implementation of ResultFactory
can be provided as an alternative to setting the result-type
attribute, using the result-factory
attribute.
The attributes result-type and result-factory are mutually exclusive.
Note | |
---|---|
Internally, the result types |
XsltPayloadTransformer
Namespace support for the XsltPayloadTransformer
allows you to either pass in a Resource
, in order to create the Templates instance, or alternatively, you can pass in a precreated Templates
instance as a reference.
In common with the marshalling transformer, the type of the result output can be controlled by specifying either the result-factory
or result-type
attribute.
A result-transformer
attribute can also be used to reference an implementation of ResultTransformer
where conversion of the result is required before sending.
Important | |
---|---|
If you specify the |
<int-xml:xslt-transformer id="xsltTransformerWithResource" input-channel="withResourceIn" output-channel="output" xsl-resource="org/springframework/integration/xml/config/test.xsl"/> <int-xml:xslt-transformer id="xsltTransformerWithTemplatesAndResultTransformer" input-channel="withTemplatesAndResultTransformerIn" output-channel="output" xsl-templates="templates" result-transformer="resultTransformer"/>
Often you may need to have access to Message data, such as the Message Headers, in order to assist with transformation.
For example, you may need to get access to certain Message Headers and pass them on as parameters to a transformer (e.g., transformer.setParameter(..)
).
Spring Integration provides two convenient ways to accomplish this, as illustrated in following example:
<int-xml:xslt-transformer id="paramHeadersCombo" input-channel="paramHeadersComboChannel" output-channel="output" xsl-resource="classpath:transformer.xslt" xslt-param-headers="testP*, *foo, bar, baz"> <int-xml:xslt-param name="helloParameter" value="hello"/> <int-xml:xslt-param name="firstName" expression="headers.fname"/> </int-xml:xslt-transformer>
If message header names match 1:1 to parameter names, you can simply use xslt-param-headers
attribute.
There you can also use wildcards for simple pattern matching, which supports the following simple pattern styles: "xxx*", "xxx", "*xxx" and "xxx*yyy".
You can also configure individual Xslt parameters via the <xslt-param/> sub element.
There you can use either the expression
or value
attribute.
The expression
attribute should be any valid SpEL expression with Message being the root object of the expression evaluation context.
The value
attribute, just like any value
in Spring beans, allows you to specify simple scalar values.
You can also use property placeholders (e.g., ${some.value}).
So as you can see, with the expression
and value
attribute, Xslt parameters could now be mapped to any accessible part of the Message as well as any literal value.
Starting with Spring Integration 3.0, you can now specify the transformer factory class name using the transformer-factory-class
attribute.
==== Namespace Configuration and ResultTransformers
The usage of ResultTransformers
was previously introduced in Section 31.3.2, “TCP Failover Client Connection Factory”.
The following example illustrates several special use-cases using XML namespace configuration.
First, we define the ResultTransformer
:
<beans:bean id="resultToDoc" class="o.s.i.xml.transformer.ResultToDocumentTransformer"/>
This ResultTransformer
will accept either a StringResult
or a DOMResult
as input and converts the input into a Document
.
Now, let’s declare the transformer:
<int-xml:xslt-transformer input-channel="in" output-channel="fahrenheitChannel" xsl-resource="classpath:noop.xslt" result-transformer="resultToDoc"/>
If the incoming message’s payload is of type Source
, then as first step the Result
is determined using the ResultFactory
.
As we did not specify a ResultFactory
, the default DomResultFactory
is used, meaning that the transformation will yield a DomResult
.
However, as we specified a ResultTransformer, it will be used and the resulting Message payload will be of type`Document`.
Important | |
---|---|
If the incoming message’s payload is of type |
If the message payload is neither a Source
, String
or Document
, as a fallback option, it is attempted to create a`Source` using the default SourceFactory.
As we did not specify a SourceFactory
explicitly using the source-factory attribute, the default DomSourceFactory is used.
If successful, the XSLT transformation is executed as if the payload was of type Source
, which we described in the previous paragraphs.
Note | |
---|---|
The |
The next transformer declaration adds a result-type attribute using StringResult
as its value.
First, the result-type is internally represented by the StringResultFactory
.
Thus, you could have also added a reference to a StringResultFactory
, using the result-factory attribute, which would haven been the same.
<int-xml:xslt-transformer input-channel="in" output-channel="fahrenheitChannel" xsl-resource="classpath:noop.xslt" result-transformer="resultToDoc" result-type="StringResult"/>
Because we are using a ResultFactory
, the alwaysUseResultFactory property of the XsltPayloadTransformer
class will be implicitly set to true
.
Consequently, the referenced ResultToDocumentTransformer
will be used.
Therefore, if you transform a payload of type String
, the resulting payload will be of type Document.
XsltPayloadTransformer and <xsl:output method="text"/>
<xsl:output method="text"/>
tells the XSLT template to only produce text content from the input source.
In this particular case there is no reason to have a DomResult
.
Therefore, the XsltPayloadTransformer defaults to StringResult
if the output property called method
of the underlying javax.xml.transform.Transformer
returns "text"
.
This coercion is performed independent from the inbound payload type.
Keep in mind that this [quote]
smart behavior is only available, if the result-type
or result-factory
attributes aren’t provided for the respective <int-xml:xslt-transformer>
component.
=== Transforming XML Messages Using XPath
When it comes to message transformation XPath is a great way to transform Messages that have XML payloads by defining XPath transformers via <xpath-transformer/> element.
Simple XPath transformation
Let’s look at the following transformer configuration:
<int-xml:xpath-transformer input-channel="inputChannel" output-channel="outputChannel" xpath-expression="/person/@name" />
...and Message
Message<?> message =
MessageBuilder.withPayload("<person name='John Doe' age='42' married='true'/>").build();
After sending this message to the inputChannel the XPath transformer configured above will transform this XML Message to a simple Message with payload of John Doe all based on the simple XPath Expression specified in the xpath-expression
attribute.
XPath also has the capability to perform simple conversion of extracted elements to a desired type.
Valid return types are defined in javax.xml.xpath.XPathConstants
and follows the conversion rules specified by the javax.xml.xpath.XPath
interface.
The following constants are defined by the XPathConstants
class: BOOLEAN, DOM_OBJECT_MODEL, NODE, NODESET, NUMBER, STRING
You can configure the desired type by simply using the evaluation-type
attribute of the <xpath-transformer/>
element.
<int-xml:xpath-transformer input-channel="numberInput" xpath-expression="/person/@age" evaluation-type="NUMBER_RESULT" output-channel="output"/> <int-xml:xpath-transformer input-channel="booleanInput" xpath-expression="/person/@married = 'true'" evaluation-type="BOOLEAN_RESULT" output-channel="output"/>
Node Mappers
If you need to provide custom mapping for the node extracted by the XPath expression simply provide a reference to the implementation of the org.springframework.xml.xpath.NodeMapper
- an interface used by XPathOperations
implementations for mapping Node objects on a per-node basis.
To provide a reference to a NodeMapper
simply use node-mapper
attribute:
<int-xml:xpath-transformer input-channel="nodeMapperInput" xpath-expression="/person/@age" node-mapper="testNodeMapper" output-channel="output"/>
...and Sample NodeMapper implementation:
class TestNodeMapper implements NodeMapper { public Object mapNode(Node node, int nodeNum) throws DOMException { return node.getTextContent() + "-mapped"; } }
XML Payload Converter
You can also use an implementation of the org.springframework.integration.xml.XmlPayloadConverter
to provide more granular transformation:
<int-xml:xpath-transformer input-channel="customConverterInput" output-channel="output" xpath-expression="/test/@type" converter="testXmlPayloadConverter" />
...and Sample XmlPayloadConverter implementation:
class TestXmlPayloadConverter implements XmlPayloadConverter { public Source convertToSource(Object object) { throw new UnsupportedOperationException(); } // public Node convertToNode(Object object) { try { return DocumentBuilderFactory.newInstance().newDocumentBuilder().parse( new InputSource(new StringReader("<test type='custom'/>"))); } catch (Exception e) { throw new IllegalStateException(e); } } // public Document convertToDocument(Object object) { throw new UnsupportedOperationException(); } }
The DefaultXmlPayloadConverter
is used if this reference is not provided, and it should be sufficient in most cases since it can convert from Node
, Document
, Source
, File
, String
, InputStream
and byte[]
typed payloads.
If you need to extend beyond the capabilities of that default implementation, then an upstream Transformer
is probably a better option than providing a reference to a custom implementation of this strategy here.
XPathMessageSplitter
supports messages with either String
or Document
payloads.
The splitter uses the provided XPath expression to split the payload into a number of nodes.
By default this will result in each Node
instance becoming the payload of a new message.
Where it is preferred that each message be a Document the createDocuments
flag can be set.
Where a String
payload is passed in the payload will be converted then split before being converted back to a number of String messages.
The XPath splitter implements MessageHandler
and should therefore be configured in conjunction with an appropriate endpoint (see the namespace support below for a simpler configuration alternative).
<bean id="splittingEndpoint" class="org.springframework.integration.endpoint.EventDrivenConsumer"> <constructor-arg ref="orderChannel" /> <constructor-arg> <bean class="org.springframework.integration.xml.splitter.XPathMessageSplitter"> <constructor-arg value="/order/items" /> <property name="documentBuilder" ref="customisedDocumentBuilder" /> <property name="outputChannel" ref="orderItemsChannel" /> </bean> </constructor-arg> </bean>
XPath splitter namespace support allows the creation of a Message Endpoint with an input channel and output channel.
<!-- Split the order into items creating a new message for each item node --> <int-xml:xpath-splitter id="orderItemSplitter" input-channel="orderChannel" output-channel="orderItemsChannel"> <int-xml:xpath-expression expression="/order/items"/> </int-xml:xpath-splitter> <!-- Split the order into items creating a new document for each item--> <int-xml:xpath-splitter id="orderItemDocumentSplitter" input-channel="orderChannel" output-channel="orderItemsChannel" create-documents="true"> <int-xml:xpath-expression expression="/order/items"/> <int:poller fixed-rate="2000"/> </int-xml:xpath-splitter>
Starting with version 4.2
, the XPathMessageSplitter
exposes outputProperties
(such as OutputKeys.OMIT_XML_DECLARATION
) property for the javax.xml.transform.Transformer
instances when a
request payload
isn’t of org.w3c.dom.Node
type:
<util:properties id="outputProperties"> <beans:prop key="#{T (javax.xml.transform.OutputKeys).OMIT_XML_DECLARATION}">yes</beans:prop> </util:properties> <xpath-splitter input-channel="input" output-properties="outputProperties"> <xpath-expression expression="/orders/order"/> </xpath-splitter>
Starting with version 4.2
, the XPathMessageSplitter
exposes an iterator
option as a boolean
flag (defaults to true
).
This allows the "streaming" of split nodes in the downstream flow.
With the iterator
mode, each node is transformed while iterating. When false, all entries are transformed first,
before the split nodes start being sent to the output channel (transform, send, transform, send Vs. transform,
transform, send, send).
See Section 6.3, “Splitter” for more information.
=== Routing XML Messages Using XPath
Similar to SpEL-based routers, Spring Integration provides support for routing messages based on XPath expressions, allowing you to create a Message Endpoint with an input channel but no output channel. Instead, one or more output channels are determined dynamically.
<int-xml:xpath-router id="orderTypeRouter" input-channel="orderChannel"> <int-xml:xpath-expression expression="/order/type"/> </int-xml:xpath-router>
Note | |
---|---|
For an overview of attributes that are common among Routers, please see chapter: Section 6.1.2, “Common Router Parameters” |
Internally XPath expressions will be evaluated as NODESET type and converted to a List<String>
representing channel names.
Typically such a list will contain a single channel name.
However, based on the results of an XPath Expression, the XPath router can also take on the characteristics of a Recipient List Router if the XPath Expression returns more then one value.
In that case, the List<String>
will contain more then one channel name and consequently Messages will be sent to all channels in the list.
Thus, assuming that the XML file passed to the router configured below contains many responder
sub-elements representing channel names, the message will be sent to all of those channels.
<!-- route the order to all responders--> <int-xml:xpath-router id="responderRouter" input-channel="orderChannel"> <int-xml:xpath-expression expression="/request/responders"/> </int-xml:xpath-router>
If the returned values do not represent the channel names directly, additional mapping parameters can be specified, in order to map those returned values to actual channel names.
For example if the /request/responders
expression results in two values responderA
and responderB
but you don’t want to couple the responder names to channel names, you may provide additional mapping configuration such as the following:
<!-- route the order to all responders--> <int-xml:xpath-router id="responderRouter" input-channel="orderChannel"> <int-xml:xpath-expression expression="/request/responders"/> <int-xml:mapping value="responderA" channel="channelA"/> <int-xml:mapping value="responderB" channel="channelB"/> </int-xml:xpath-router>
As already mentioned, the default evaluation type for XPath expressions is NODESET, which is converted to a List<String>
of channel names, therefore handling single channel scenarios as well as multiple ones.
Nonetheless, certain XPath expressions may evaluate as String type from the very beginning. Take for example the following XPath Expression:
name(./node())
This expression will return the name of the root node. It will resulting in an exception, if the default evaluation type NODESET is being used.
For these scenarious, you may use the evaluate-as-string
attribute, which will allow you to manage the evaluation type.
It is FALSE
by default, however if set to TRUE
, the String evaluation type will be used.
Note | |
---|---|
To provide some background information: XPath 1.0 specifies 4 data types:
When the XPath Router evaluates expressions using the optional For further information, please see: |
For example if we want to route based on the name of the root node, we can use the following configuration:
<int-xml:xpath-router id="xpathRouterAsString" input-channel="xpathStringChannel" evaluate-as-string="true"> <int-xml:xpath-expression expression="name(./node())"/> </int-xml:xpath-router>
For XPath Routers, you can also specify the Converter to use when converting payloads prior to XPath evaluation.
As such, the XPath Router supports custom implementations of the XmlPayloadConverter
strategy, and when configuring an xpath-router
element in XML, a reference to such an implementation may be provided via the converter
attribute.
If this reference is not explicitly provided, the DefaultXmlPayloadConverter
is used.
It should be sufficient in most cases, since it can convert from Node, Document, Source, File, and String typed payloads.
If you need to extend beyond the capabilities of that default implementation, then an upstream Transformer is generally a better option in most cases, rather than providing a reference to a custom implementation of this strategy here.
The XPath Header Enricher defines a Header Enricher Message Transformer that evaluates XPath expressions against the message payload and inserts the result of the evaluation into a messsage header.
Please see below for an overview of all available configuration parameters:
<int-xml:xpath-header-enricher default-overwrite="true" id="" input-channel="" output-channel="" should-skip-nulls="true"> <int:poller></int:poller> <int-xml:header name="" evaluation-type="STRING_RESULT" header-type="int" overwrite="true" xpath-expression="" xpath-expression-ref=""/> </int-xml:xpath-header-enricher>
Specify the default boolean value for whether to overwrite existing header values. This will only take effect for sub-elements that do not provide their own overwrite attribute. If the default- overwrite attribute is not provided, then the specified header values will NOT overwrite any existing ones with the same header names. Optional. | |
Id for the underlying bean definition. Optional. | |
The receiving Message channel of this endpoint. Optional. | |
Channel to which enriched messages shall be send to. Optional. | |
Specify whether null values, such as might be returned from an expression evaluation, should be skipped. The default value is true. Set this to false if a null value should trigger removal of the corresponding header instead.Optional. | |
Optional. | |
The name of the header to be enriched. Mandatory. | |
The result type expected from the XPath evaluation.
This will be the type of the header value, if there is no | |
The fully qualified class name for the header value type.
The result of XPath evaluation will be converted to this type using the | |
Boolean value to indicate whether this header value should overwrite an existing header value for the same name if already present on the input Message. | |
The XPath Expression as a String.
Either this attribute or | |
The XPath Expression reference.
Either this attribute or |
This component defines an XPath-based Message Filter.
Under the covers this components uses a MessageFilter
that wraps an instance of AbstractXPathMessageSelector
.
Note | |
---|---|
Please also refer to the chapter on Message Filters for further details. |
In order to use the XPath Filter you must as a minimum provide an XPath Expression either by declaring the xpath-expression
sub-element or by referencing an XPath Expression using the xpath-expression-ref
attribute.
If the provided XPath expression will evaluate to a boolean
value, no further configuration parameters are necessary.
However, if the XPath expression will evaluate to a String, the match-value
attribute should be specified against which the evaluation result will be matched.
There are three options for the match-type
:
equals
on java.lang.String
.
The underlying implementation uses a StringValueTestXPathMessageSelector
equals-ignore-case
on java.lang.String
.
The underlying implementation uses a StringValueTestXPathMessageSelector
java.lang.String
.
The underlying implementation uses a RegexTestXPathMessageSelector
When providing a match-type value of regex, the value provided with the match-value
attribute must be a valid Regular Expression.
<int-xml:xpath-filter discard-channel="" id="" input-channel="" match-type="exact" match-value="" output-channel="" throw-exception-on-rejection="false" xpath-expression-ref=""> <int-xml:xpath-expression ... /> <int:poller ... /> </int-xml:xpath-filter>
Message Channel where you want rejected messages to be sent. Optional. | |
Id for the underlying bean definition. Optional. | |
The receiving Message channel of this endpoint. Optional. | |
Type of match to apply between the XPath evaluation result and the match-value. Default is exact. Optional. | |
String value to be matched against the XPath evaluation result. If this attribute is not provided, then the XPath evaluation MUST produce a boolean result directly. Optional. | |
The channel to which Messages that matched the filter criterias shall be dispatched to. Optional. | |
By default, this property is set to false and rejected Messages (Messages that did not match the filter criteria) will be silently dropped. However, if set to true message rejection will result in an error condition and the exception will be propagated upstream to the caller. Optional. | |
Reference to an XPath expression instance to evaluate. | |
This sub-element sets the XPath expression to be evaluated.
If this is not defined you MUST define the | |
Optional. |
Spring Integration, since version 3.0, provides the #xpath
built-in SpEL function, which invokes the static method XPathUtils.evaluate(...)
.
This method delegates to an org.springframework.xml.xpath.XPathExpression
.
The following shows some usage examples:
<transformer expression="#xpath(payload, '/name')"/> <filter expression="#xpath(payload, headers.xpath, 'boolean')"/> <splitter expression="#xpath(payload, '//book', 'document_list')"/> <router expression="#xpath(payload, '/person/@age', 'number')"> <mapping channel="output1" value="16"/> <mapping channel="output2" value="45"/> </router>
#xpath
also supports a third optional parameter for converting the result of the xpath evaluation.
It can be one of the String constants 'string'
, 'boolean'
, 'number'
, 'node'
, 'node_list'
and 'document_list'
or an org.springframework.xml.xpath.NodeMapper
instance.
By default the #xpath
SpEL function returns a String representation of the xpath evaluation.
Note | |
---|---|
To enable the |
For more information see Section 31.3.2, “TCP Failover Client Connection Factory”.
The XML Validating Filter allows you to validate incoming messages against provided schema instances. The following schema types are supported:
Messages that fail validation can either be silently dropped or they can be forwarded to a definable discard-channel
.
Furthermore you can configure this filter to throw an Exception
in case validation fails.
Please see below for an overview of all available configuration parameters:
<int-xml:validating-filter discard-channel="" id="" input-channel="" output-channel="" schema-location="" schema-type="xml-schema" throw-exception-on-rejection="false" xml-validator=""> <int:poller .../> </int-xml:validating-filter>
Message Channel where you want rejected messages to be sent. Optional. | |
Id for the underlying bean definition. Optional. | |
The receiving Message channel of this endpoint. Optional. | |
Message Channel where you want accepted messages to be sent. Optional. | |
Sets the location of the schema to validate the Message’s payload against.
Internally uses the | |
Sets the schema type.
Can be either | |
If | |
Reference to a custom | |
Optional. |
Spring Integration provides Channel Adapters for XMPP.
XMPP describes a way for multiple agents to communicate with each other in a distributed system. The canonical use case is to send and receive chat messages, though XMPP can be, and is, used for far more applications. XMPP is used to describe a network of actors. Within that network, actors may address each other directly, as well as broadcast status changes (e.g. "presence").
XMPP provides the messaging fabric that underlies some of the biggest Instant Messaging networks in the world, including Google Talk (GTalk) - which is also available from within GMail - and Facebook Chat. There are many good open-source XMPP servers available. Two popular implementations are Openfire and ejabberd
Spring integration provides support for XMPP via XMPP adapters which support sending and receiving both XMPP chat messages and presence changes from other entries in your roster. As with other adapters, the XMPP adapters come with support for a convenient namespace-based configuration. To configure the XMPP namespace, include the following elements in the headers of your XML configuration file:
xmlns:int-xmpp="http://www.springframework.org/schema/integration/xmpp" xsi:schemaLocation="http://www.springframework.org/schema/integration/xmpp https://www.springframework.org/schema/integration/xmpp/spring-integration-xmpp.xsd"
Before using inbound or outbound XMPP adapters to participate in the XMPP network, an actor must establish its XMPP connection.
This connection object could be shared by all XMPP adapters connected to a particular account.
Typically this requires - at a minimum -user
, password
, and host
.
To create a basic XMPP connection, you can utilize the convenience of the namespace.
<int-xmpp:xmpp-connection id="myConnection" user="user" password="password" host="host" port="port" resource="theNameOfTheResource" subscription-mode="accept_all"/>
Note | |
---|---|
For added convenience you can rely on the default naming convention and omit the |
If the XMPP Connection goes stale, reconnection attempts will be made with an automatic login as long as the previous connection state was logged (authenticated).
We also register a ConnectionListener
which will log connection events if the DEBUG logging level is enabled.
The subscription-mode
initiates the Roster listener to deal with incoming subscriptions from other users.
This functionality isn’t always available for the target XMPP servers.
For example GCM/FCM fully disables it.
To switch off the Roster listener for subscriptions you should configure it with an empty string when using XML configuration: subscription-mode=""
, or with XmppConnectionFactoryBean.setSubscriptionMode(null)
when using Java Configuration. Doing so will disable Roster at the login phase as well.
See Roster.setRosterLoadedAtLogin(Boolean)
for more information.
==== Inbound Message Channel Adapter
The Spring Integration adapters support receiving chat messages from other users in the system.
To do this, the Inbound Message Channel Adapter "logs in" as a user on your behalf and receives the messages sent to that user.
Those messages are then forwarded to your Spring Integration client.
Configuration support for the XMPP Inbound Message Channel Adapter is provided via the inbound-channel-adapter
element.
<int-xmpp:inbound-channel-adapter id="xmppInboundAdapter" channel="xmppInbound" xmpp-connection="testConnection" payload-expression="getExtension('google:mobile:data').json" stanza-filter="stanzaFilter" auto-startup="true"/>
As you can see amongst the usual attributes this adapter also requires a reference to an XMPP Connection.
It is also important to mention that the XMPP inbound adapter is an event driven adapter and a Lifecycle
implementation.
When started it will register a PacketListener
that will listen for incoming XMPP Chat Messages.
It forwards any received messages to the underlying adapter which will convert them to Spring Integration Messages and send them to the specified channel
.
It will unregister the PacketListener
when it is stopped.
Starting with version 4.3 the ChatMessageListeningEndpoint
(and its <int-xmpp:inbound-channel-adapter>
)
supports a org.jivesoftware.smack.filter.StanzaFilter
injection to be registered on the provided XMPPConnection
together with an internal StanzaListener
implementation.
See their JavaDocs for more information.
Also with the version 4.3 the payload-expression
has been introduced for the ChatMessageListeningEndpoint
.
The incoming org.jivesoftware.smack.packet.Message
represents a root object of evaluation context.
This option is useful in case of Section 31.3.2, “TCP Failover Client Connection Factory”.
For example, for the GCM protocol we can extract the body using expression:
payload-expression="getExtension('google:mobile:data').json"
for the XHTML protocol:
payload-expression="getExtension(T(org.jivesoftware.smackx.xhtmlim.packet.XHTMLExtension).NAMESPACE).bodies[0]"
To simplify the access to the Extension in the XMPP Message, the extension
variable is added into the
EvaluationContext
.
Note, it is done only when one and only one Extension is present in the Message.
The samples above with the namespace
manipulations can be simplified to something like:
---- payload-expression="#extension.json" payload-expression="#extension.bodies[0]" ----
Note | |
---|---|
The |
==== Outbound Message Channel Adapter
You may also send chat messages to other users on XMPP using the Outbound Message Channel Adapter.
Configuration support for the XMPP Outbound Message Channel Adapter is provided via the outbound-channel-adapter
element.
<int-xmpp:outbound-channel-adapter id="outboundEventAdapter" channel="outboundEventChannel" xmpp-connection="testConnection"/>
The adapter expects as its input - at a minimum - a payload of type java.lang.String
, and a header value for
XmppHeaders.CHAT_TO
that specifies to which user the Message should be sent.
To create a message you might use the following Java code:
Message<String> xmppOutboundMsg = MessageBuilder.withPayload("Hello, XMPP!" ) .setHeader(XmppHeaders.CHAT_TO, "userhandle") .build();
Another mechanism of setting the header is by using the XMPP header-enricher support. Here is an example.
<int-xmpp:header-enricher input-channel="input" output-channel="output"> <int-xmpp:chat-to value="[email protected]"/> </int-xmpp:header-enricher>
Starting with version 4.3 the packet extension support has been added to the ChatMessageSendingMessageHandler
(<int-xmpp:outbound-channel-adapter>
).
Alongside with the regular String
and org.jivesoftware.smack.packet.Message
payload
, now you can send a message
with a payload
as a org.jivesoftware.smack.packet.ExtensionElement
which is populated to the
org.jivesoftware.smack.packet.Message.addExtension()
instead of setBody()
.
For the convenience an extension-provider
option has been added for the ChatMessageSendingMessageHandler
to allow to inject org.jivesoftware.smack.provider.ExtensionElementProvider
, which builds an ExtensionElement
against the payload
at runtime.
For this case the payload must be String
in JSON or XML format depending of the XEP protocol.
XMPP also supports broadcasting state. You can use this capability to let people who have you on their roster see your state changes. This happens all the time with your IM clients; you change your away status, and then set an away message, and everybody who has you on their roster sees your icon or username change to reflect this new state, and additionally might see your new "away" message. If you would like to receive notification, or notify others, of state changes, you can use Spring Integration’s "presence" adapters.
==== Inbound Presence Message Channel Adapter
Spring Integration provides an Inbound Presence Message Channel Adapter which supports receiving Presence events from other users in the system who happen to be on your Roster.
To do this, the adapter "logs in" as a user on your behalf, registers a RosterListener
and forwards received Presence update events as Messages to the channel identified by the channel
attribute.
The payload of the Message will be a org.jivesoftware.smack.packet.Presence
object (see https://www.igniterealtime.org/builds/smack/docs/latest/javadoc/org/jivesoftware/smack/packet/Presence.html).
Configuration support for the XMPP Inbound Presence Message Channel Adapter is provided via the presence-inbound-channel-adapter
element.
<int-xmpp:presence-inbound-channel-adapter channel="outChannel" xmpp-connection="testConnection" auto-startup="false"/>
As you can see amongst the usual attributes this adapter also requires a reference to an XMPP Connection.
It is also important to mention that this adapter is an event driven adapter and a Lifecycle
implementation.
It will register a RosterListener
when started and will unregister that RosterListener
when stopped.
==== Outbound Presence Message Channel Adapter
Spring Integration also supports sending Presence events to be seen by other users in the network who happen to have you on their Roster.
When you send a Message to the Outbound Presence Message Channel Adapter it extracts the payload, which is expected to be of type org.jivesoftware.smack.packet.Presence
and sends it to the XMPP Connection, thus advertising your presence events to the rest of the network.
Configuration support for the XMPP Outbound Presence Message Channel Adapter is provided via the presence-outbound-channel-adapter
element.
<int-xmpp:presence-outbound-channel-adapter id="eventOutboundPresenceChannel" xmpp-connection="testConnection"/>
It can also be a Polling Consumer (if it receives Messages from a Pollable Channel) in which case you would need to register a Poller.
<int-xmpp:presence-outbound-channel-adapter id="pollingOutboundPresenceAdapter" xmpp-connection="testConnection" channel="pollingChannel"> <int:poller fixed-rate="1000" max-messages-per-poll="1"/> </int-xmpp:presence-outbound-channel-adapter>
Like its inbound counterpart, it requires a reference to an XMPP Connection.
Note | |
---|---|
If you are relying on the default naming convention for an XMPP Connection bean (described earlier), and you have only one XMPP Connection bean configured in your Application Context, you may omit the |
Since Spring Integration XMPP support is based on the Smack 4.0 API (https://www.igniterealtime.org/projects/smack/), it is important to know a few details related to more complex configuration of the XMPP Connection object.
As stated earlier the xmpp-connection
namespace support is designed to simplify basic connection configuration and only supports a few common configuration attributes.
However, the org.jivesoftware.smack.ConnectionConfiguration
object defines about 20 attributes, and there is no real value of adding namespace support for all of them.
So, for more complex connection configurations, simply configure an instance of our XmppConnectionFactoryBean
as a regular bean, and inject a org.jivesoftware.smack.ConnectionConfiguration
as a constructor argument to that FactoryBean.
Every property you need, can be specified directly on that ConnectionConfiguration instance (a bean definition with the p namespace would work well).
This way SSL, or any other attributes, could be set directly.
Here’s an example:
<bean id="xmppConnection" class="o.s.i.xmpp.XmppConnectionFactoryBean"> <constructor-arg> <bean class="org.jivesoftware.smack.ConnectionConfiguration"> <constructor-arg value="myServiceName"/> <property name="socketFactory" ref="..."/> </bean> </constructor-arg> </bean> <int:channel id="outboundEventChannel"/> <int-xmpp:outbound-channel-adapter id="outboundEventAdapter" channel="outboundEventChannel" xmpp-connection="xmppConnection"/>
Another important aspect of the Smack API is static initializers.
For more complex cases (e.g., registering a SASL Mechanism), you may need to execute certain static initializers.
One of those static initializers is SASLAuthentication
, which allows you to register supported SASL mechanisms.
For that level of complexity, we would recommend Spring JavaConfig-style of the XMPP Connection configuration.
Then, you can configure the entire component through Java code and execute all other necessary Java code including static initializers at the appropriate time.
@Configuration public class CustomConnectionConfiguration { @Bean public XMPPConnection xmppConnection() { SASLAuthentication.supportSASLMechanism("EXTERNAL", 0); // static initializer ConnectionConfiguration config = new ConnectionConfiguration("localhost", 5223); config.setTrustorePath("path_to_truststore.jks"); config.setSecurityEnabled(true); config.setSocketFactory(SSLSocketFactory.getDefault()); return new XMPPConnection(config); } }
For more information on the JavaConfig style of Application Context configuration, refer to the following section in the Spring Reference Manual.
The Spring Integration XMPP Adapters will map standard XMPP properties automatically.
These properties will be copied by default to and from Spring Integration MessageHeaders
using the
DefaultXmppHeaderMapper.
Any user-defined headers will NOT be copied to or from an XMPP Message, unless explicitly specified by the
requestHeaderNames and/or replyHeaderNames properties of the DefaultXmppHeaderMapper
.
Tip | |
---|---|
When mapping user-defined headers, the values can also contain simple wildcard patterns (e.g. "foo*" or "*foo") to be matched. |
Starting with version 4.1, the AbstractHeaderMapper
(a DefaultXmppHeaderMapper
superclass) allows the
NON_STANDARD_HEADERS
token to be configured for the requestHeaderNames property (in addition to existing
STANDARD_REQUEST_HEADERS
) to map all user-defined headers.
Class org.springframework.xmpp.XmppHeaders
identifies the default headers that will be used by the DefaultXmppHeaderMapper
:
Starting with version 4.3, patterns in the header mappings can be negated by preceding the pattern with !
.
Negated patterns get priority, so a list such as
STANDARD_REQUEST_HEADERS,foo,ba*,!bar,!baz,qux,!foo
will NOT map foo
(nor bar
nor baz
); the standard headers plus bad
, qux
will be mapped.
Important | |
---|---|
If you have a user defined header that begins with |
The XMPP protocol stands for eXstensible Messaging and Presence Protocol. The "extensible" part is important. XMPP is based around XML, a data format that supports a concept known as namespacing.
Through namespacing, you can add bits to XMPP that are not defined in the original specifications. This is important because the XMPP specification deliberately describes only a set of core things like:
Once you have implemented this, you have an XMPP client and can send any kind of data you like. But that’s not the end.
For example, perhaps you decide that you want to include formatting in a message (bold, italic, etc.) which is not defined in the core XMPP specification. Well, you can make up a way to do that, but unless everyone else does it the same way as you, no other software will be able interpret it (they will just ignore namespaces they don’t understand).
So the XMPP Standards Foundation (XSF) publishes a series of extra documents, known as XMPP Enhancement Proposals (XEPs). In general each XEP describes a particular activity (from message formatting, to file transfers, multi-user chats and many more), and they provide a standard format for everyone to use for that activity.
The Smack API provides many XEP implementations with its extensions
and experimental
projects.
And starting with Spring Integration version 4.3 any XEP can be use with the existing XMPP channel adapters.
To be able to process XEPs or any other custom XMPP extensions, the Smack’s ProviderManager
pre-configuration
must be provided.
It can be done via direct usage from the static
Java code:
ProviderManager.addIQProvider("element", "namespace", new MyIQProvider()); ProviderManager.addExtensionProvider("element", "namespace", new MyExtProvider());
or via .providers
configuration file in the specific instance and JVM argument:
-Dsmack.provider.file=file:///c:/my/provider/mycustom.providers
where mycustom.providers
might be like this:
<?xml version="1.0"?> <smackProviders> <iqProvider> <elementName>query</elementName> <namespace>jabber:iq:time</namespace> <className>org.jivesoftware.smack.packet.Time</className> </iqProvider> <iqProvider> <elementName>query</elementName> <namespace>https://jabber.org/protocol/disco#items</namespace> <className>org.jivesoftware.smackx.provider.DiscoverItemsProvider</className> </iqProvider> <extensionProvider> <elementName>subscription</elementName> <namespace>https://jabber.org/protocol/pubsub</namespace> <className>org.jivesoftware.smackx.pubsub.provider.SubscriptionProvider</className> </extensionProvider> </smackProviders>
For example the most popular XMPP messaging extension is
Google Cloud Messaging (GCM).
The Smack provides the particular org.jivesoftware.smackx.gcm.provider.GcmExtensionProvider
for that and
registers that by default with the smack-experimental
jar in the classpath using experimental.providers
resource:
<!-- GCM JSON payload --> <extensionProvider> <elementName>gcm</elementName> <namespace>google:mobile:data</namespace> <className>org.jivesoftware.smackx.gcm.provider.GcmExtensionProvider</className> </extensionProvider>
Also the GcmPacketExtension
is present for the target messaging protocol to parse incoming packets and build outgoing:
GcmPacketExtension gcmExtension = (GcmPacketExtension) xmppMessage.getExtension(GcmPacketExtension.NAMESPACE); String message = gcmExtension.getJson());
GcmPacketExtension packetExtension = new GcmPacketExtension(gcmJson); Message smackMessage = new Message(); smackMessage.addExtension(packetExtension);
See Section 31.3.2, “TCP Failover Client Connection Factory” and Section 31.3.2, “TCP Failover Client Connection Factory” above for more information.
=== Introduction
Zookeeper support was added to the framework in version 4.2, comprised of:
The ZookeeperMetadataStore
can be used where any MetadataStore
is needed, such as peristent file list filters,
etc.
See Section 9.5, “Metadata Store” for more information.
<bean id="client" class="org.springframework.integration.zookeeper.config.CuratorFrameworkFactoryBean"> <constructor-arg value="${connect.string}" /> </bean> <bean id="meta" class="org.springframework.integration.zookeeper.metadata.ZookeeperMetadataStore"> <constructor-arg ref="client" /> </bean>
@Bean public MetadataStore zkStore(CuratorFramework client) { return new ZookeeperMetadataStore(client); }
The ZookeeperLockRegistry
can be used where any LockRegistry
is needed, such as when using an Aggregator
in a
clustered environment, with a shared MessageStore
.
A LocRegistry
is used to "look up" a lock based on a key (the aggregator uses the correlationId
).
By default, locks in the ZookeeperLockRegistry
are maintained in zookeeper under the path
/SpringIntegration-LockRegistry/
.
You can customize the path by providing an implementation of ZookeeperLockRegistry.KeyToPathStrategy
.
public interface KeyToPathStrategy { String pathFor(String key); boolean bounded(); }
If the strategy returns true
from isBounded
, unused locks do not need to be harvested.
For unbounded strategies (such as the default) you will need to invoke expireUnusedOlderThan(long age)
from time
to time, to remove old unused locks from memory.
=== Zookeeper Leadership Event Handling
To configure an application for leader election using Zookeeper in XML:
<int-zk:leader-listener client="client" path="/siNamespace" role="cluster" />
client
is a reference to a CuratorFramework
bean; a CuratorFrameworkFactoryBean
is available.
When a leader is elected, an OnGrantedEvent
will be published for the role cluster
; any endpoints in that role
will be started.
When leadership is revoked, an OnRevokedEvent
will be published for the role cluster
; any endpoints in that role
will be stopped.
See Section 8.2, “Endpoint Roles” for more information.
In Java configuration you can create an instance of the leader initiator like this:
@Bean public LeaderInitiatorFactoryBean leaderInitiator(CuratorFramework client) { return new LeaderInitiatorFactoryBean() .setClient(client) .setPath("/siTest/") .setRole("cluster"); }
= Appendices
Advanced Topics and Additional Resources
== Spring Expression Language (SpEL)
Many Spring Integration components can be configured using expressions. These expressions are written in the Spring Expression Language.
In most cases, the #root object is the Message
which, of course, has two properties - headers
and payload
- allowing such expressions as payload
, payload.foo
, headers['my.header']
etc.
In some cases, additional variables are provided, for example the <int-http:inbound-gateway/>
provides #requestParams
(parameters from the HTTP request) and #pathVariables
(values from path placeholders in the URI).
For all SpEL expressions, a BeanResolver
is available, enabling references to any bean in the application context.
For example @myBean.foo(payload)
.
In addition, two PropertyAccessors
are available; a MapAccessor
enables accessing values in a Map
using a key, and a ReflectivePropertyAccessor
which allows access to fields and or JavaBean compliant properties (using getters and setters).
This is how the Message
headers and payload properties are accessible.
=== SpEL Evaluation Context Customization
Starting with Spring Integration 3.0, it is possible to add additional PropertyAccessor
s to the SpEL evaluation contexts used by the framework.
The framework provides the JsonPropertyAccessor
which can be used (read-only) to access fields from a JsonNode
, or JSON in a String
.
Or you can create your own PropertyAccessor
if you have specific needs.
In addition, custom functions can be added.
Custom functions are static
methods declared on a class.
Functions and property accessors are available in any SpEL expression used throughout the framework.
The following configuration shows how to directly configure the IntegrationEvaluationContextFactoryBean
with custom property accessors and functions.
However, for convenience, namespace support is provided for both, as described in the following sections, and the framework will automatically configure the factory bean on your behalf.
<bean id="integrationEvaluationContext" class="org.springframework.integration.config.IntegrationEvaluationContextFactoryBean"> <property name="propertyAccessors"> <util:map> <entry key="foo"> <bean class="foo.MyCustomPropertyAccessor"/> </entry> </util:map> </property> <property name="functions"> <map> <entry key="barcalc" value="#{T(foo.MyFunctions).getMethod('calc', T(foo.MyBar))}"/> </map> </property> </bean>
This factory bean definition will override the default integrationEvaluationContext
bean definition, adding the custom accessor to the list (which also includes the standard accessors mentioned above), and one custom function.
Note that custom functions are static methods.
In the above example, the custom function is a static method calc
on class MyFunctions
and takes a single parameter of type MyBar
.
Say you have a Message
with a payload that has a type MyFoo
on which you need to perform some action to create a MyBar
object from it, and you then want to invoke a custom function calc
on that object.
The standard property accessors wouldn’t know how to get a MyBar
from a MyFoo
so you could write and configure a custom property accessor to do so.
So, your final expression might be "#barcalc(payload.myBar)"
.
The factory bean has another property typeLocator
which allows you to customize the TypeLocator
used during SpEL evaluation.
This might be necessary when running in some environments that use a non-standard ClassLoader
.
In the following example, SpEL expressions will always use the bean factory’s class loader:
<bean id="integrationEvaluationContext" class="org.springframework.integration.config.IntegrationEvaluationContextFactoryBean"> <property name="typeLocator"> <bean class="org.springframework.expression.spel.support.StandardTypeLocator"> <constructor-arg value="#{beanFactory.beanClassLoader}"/> </bean> </property> </bean>
Namespace support is provided for easy addition of SpEL custom functions.
You can specify <spel-function/>
components to provide custom SpEL functions to the EvaluationContext
used throughout the framework.
Instead of configuring the factory bean above, simply add one or more of these components and the framework will automatically add them to the default integrationEvaluationContext factory bean.
For example, assuming we have a useful static method to evaluate XPath:
<int:spel-function id="xpath" class="com.foo.test.XPathUtils" method="evaluate(java.lang.String, java.lang.Object)"/> <int:transformer input-channel="in" output-channel="out" expression="#xpath('//foo/@bar', payload)" />
With this sample:
IntegrationEvaluationContextFactoryBean
bean with id integrationEvaluationContext is registered with the application context.
<spel-function/>
is parsed and added to the functions
Map of integrationEvaluationContext as map entry with id
as the key and the static Method
as the value.
StandardEvaluationContext
instance, and it is configured with the default PropertyAccessor
s, BeanResolver
and the custom functions.
EvaluationContext
instance is injected into the ExpressionEvaluatingTransformer
bean.
To provide a SpEL Function via Java Configuration you should declare a SpelFunctionFactoryBean
bean for each function.
The sample above can be configured as follows:
@Bean public SpelFunctionFactoryBean xpath() { return new SpelFunctionFactoryBean(XPathUtils.class, "evaluate"); }
Note | |
---|---|
SpEL functions declared in a parent context are also made available in any child context(s).
Each context has its own instance of the integrationEvaluationContext factory bean because each needs a different |
Built-in SpEL Functions
Spring Integration provides some standard functions, which are registered with the application context automatically on start up:
#jsonPath - to evaluate a jsonPath on some provided object.
This function invokes JsonPathUtils.evaluate(...)
.
This static method delegates to the Jayway JsonPath library.
The following shows some usage examples:
<transformer expression="#jsonPath(payload, '$.store.book[0].author')"/> <filter expression="#jsonPath(payload,'$..book[2].isbn') matches '\d-\d{3}-\d{5}-\d'"/> <splitter expression="#jsonPath(payload, '$.store.book')"/> <router expression="#jsonPath(payload, headers.jsonPath)"> <mapping channel="output1" value="reference"/> <mapping channel="output2" value="fiction"/> </router>
#jsonPath also supports the third optional parameter - an array of com.jayway.jsonpath.Filter
, which could be provided by a reference to a bean or bean method, for example.
Note | |
---|---|
Using this function requires the Jayway JsonPath library (json-path.jar) to be on the classpath; otherwise the #jsonPath SpEL function won’t be registered. |
For more information regarding JSON see JSON Transformers in Section 7.1, “Transformer”.
#xpath - to evaluate an xpath on some provided object. For more information regarding xml and xpath see Section 31.3.2, “TCP Failover Client Connection Factory”.
Namespace support is provided for the easy addition of SpEL custom PropertyAccessor
implementations.
You can specify the <spel-property-accessors/>
component to provide a list of custom PropertyAccessor
s to the EvaluationContext
used throughout the framework.
Instead of configuring the factory bean above, simply add one or more of these components, and the framework will automatically add the accessors to the default integrationEvaluationContext factory bean:
<int:spel-property-accessors> <bean id="jsonPA" class="org.springframework.integration.json.JsonPropertyAccessor"/> <ref bean="fooPropertyAccessor"/> </int:spel-property-accessors>
With this sample, two custom PropertyAccessor
s will be injected to the EvaluationContext
in the order that they are declared.
To provide PropertyAccessor
s via Java Configuration you should declare SpelPropertyAccessorRegistrar
bean with the spelPropertyAccessorRegistrar
(the IntegrationContextUtils.SPEL_PROPERTY_ACCESSOR_REGISTRAR_BEAN_NAME
constant) name.
The sample above can be configured like:
@Bean public SpelPropertyAccessorRegistrar spelPropertyAccessorRegistrar() { return new SpelPropertyAccessorRegistrar(new JsonPropertyAccessor()) .add(fooPropertyAccessor()); }
Note | |
---|---|
Custom |
The AOP Message Publishing feature allows you to construct and send a message as a by-product of a method invocation. For example, imagine you have a component and every time the state of this component changes you would like to be notified via a Message. The easiest way to send such notifications would be to send a message to a dedicated channel, but how would you connect the method invocation that changes the state of the object to a message sending process, and how should the notification Message be structured? The AOP Message Publishing feature handles these responsibilities with a configuration-driven approach.
=== Message Publishing Configuration
Spring Integration provides two approaches: XML and Annotation-driven.
==== Annotation-driven approach via @Publisher annotation
The annotation-driven approach allows you to annotate any method with the @Publisher
annotation, specifying a channel attribute.
The Message will be constructed from the return value of the method invocation and sent to a channel specified by the channel attribute.
To further manage message structure, you can also use a combination of both @Payload
and @Header
annotations.
Internally this message publishing feature of Spring Integration uses both Spring AOP by defining PublisherAnnotationAdvisor
and Spring 3.0’s Expression Language (SpEL) support, giving you considerable flexibility and control over the structure of the Message it will publish.
The PublisherAnnotationAdvisor
defines and binds the following variables:
Let’s look at a couple of examples:
@Publisher public String defaultPayload(String fname, String lname) { return fname + " " + lname; }
In the above example the Message will be constructed with the following structure:
@Publisher(channel="testChannel") public String defaultPayload(String fname, @Header("last") String lname) { return fname + " " + lname; }
In this example everything is the same as above, except that we are not using a default publishing channel.
Instead we are specifying the publishing channel via the channel attribute of the @Publisher
annotation.
We are also adding a @Header
annotation which results in the Message header named last having the same value as the lname method parameter.
That header will be added to the newly constructed Message.
@Publisher(channel="testChannel") @Payload public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) { return fname + " " + lname; }
The above example is almost identical to the previous one.
The only difference here is that we are using a @Payload
annotation on the method, thus explicitly specifying that the return value of the method should be used as the payload of the Message.
@Publisher(channel="testChannel") @Payload("#return + #args.lname") public String setName(String fname, String lname, @Header("x") int num) { return fname + " " + lname; }
Here we are expanding on the previous configuration by using the Spring Expression Language in the @Payload
annotation to further instruct the framework how the message should be constructed.
In this particular case the message will be a concatenation of the return value of the method invocation and the lname input argument.
The Message header named x will have its value determined by the num input argument.
That header will be added to the newly constructed Message.
@Publisher(channel="testChannel") public String argumentAsPayload(@Payload String fname, @Header String lname) { return fname + " " + lname; }
In the above example you see another usage of the @Payload
annotation.
Here we are annotating a method argument which will become the payload of the newly constructed message.
As with most other annotation-driven features in Spring, you will need to register a post-processor (PublisherAnnotationBeanPostProcessor
).
<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>
You can instead use namespace support for a more concise configuration:
<int:annotation-config default-publisher-channel="defaultChannel"/>
Similar to other Spring annotations (@Component, @Scheduled, etc.), @Publisher
can also be used as a meta-annotation.
That means you can define your own annotations that will be treated in the same way as the @Publisher
itself.
@Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Publisher(channel="auditChannel") public @interface Audit { }
Here we defined the @Audit
annotation which itself is annotated with @Publisher
.
Also note that you can define a channel
attribute on the meta-annotation thus encapsulating the behavior of where messages will be sent inside of this annotation.
Now you can annotate any method:
@Audit public String test() { return "foo"; }
In the above example every invocation of the test()
method will result in a Message with a payload created from its return value.
Each Message will be sent to the channel named auditChannel.
One of the benefits of this technique is that you can avoid the duplication of the same channel name across multiple annotations.
You also can provide a level of indirection between your own, potentially domain-specific annotations and those provided by the framework.
You can also annotate the class which would mean that the properties of this annotation will be applied on every public method of that class.
@Audit static class BankingOperationsImpl implements BankingOperations { public String debit(String amount) { . . . } public String credit(String amount) { . . . } }
==== XML-based approach via the <publishing-interceptor> element
The XML-based approach allows you to configure the same AOP-based Message Publishing functionality with simple namespace-based configuration of a MessagePublishingInterceptor
.
It certainly has some benefits over the annotation-driven approach since it allows you to use AOP pointcut expressions, thus possibly intercepting multiple methods at once or intercepting and publishing methods to which you don’t have the source code.
To configure Message Publishing via XML, you only need to do the following two things:
MessagePublishingInterceptor
via the <publishing-interceptor>
XML element.
MessagePublishingInterceptor
to managed objects.
<aop:config> <aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" /> </aop:config> <publishing-interceptor id="interceptor" default-channel="defaultChannel"> <method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel"> <header name="foo" value="bar"/> </method> <method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel"> <header name="foo" expression="'bar'.toUpperCase()"/> </method> <method pattern="echoDef*" payload="#return"/> </publishing-interceptor>
As you can see the <publishing-interceptor>
configuration looks rather similar to the Annotation-based approach, and it also utilizes the power of the Spring 3.0 Expression Language.
In the above example the execution of the echo
method of a testBean
will render a Message with the following structure:
value
is the value returned by an executed method.
echoChannel
.
The second method is very similar to the first. Here every method that begins with repl will render a Message with the following structure:
'bar'.toUpperCase()
.
echoChannel
.
The second method, mapping the execution of any method that begins with echoDef
of testBean
, will produce a Message with the following structure.
channel
attribute is not provided explicitly, the Message will be sent to the defaultChannel
defined by the publisher.
For simple mapping rules you can rely on the publisher defaults. For example:
<publishing-interceptor id="anotherInterceptor"/>
This will map the return value of every method that matches the pointcut expression to a payload and will be sent to a default-channel. If the defaultChannel_is not specified (as above) the messages will be sent to the global _nullChannel.
Async Publishing
One important thing to understand is that publishing occurs in the same thread as your component’s execution. So by default in is synchronous. This means that the entire message flow would have to wait until the publisher’s flow completes. However, quite often you want the complete opposite and that is to use this Message publishing feature to initiate asynchronous sub-flows. For example, you might host a service (HTTP, WS etc.) which receives a remote request.You may want to send this request internally into a process that might take a while. However you may also want to reply to the user right away. So, instead of sending inbound requests for processing via the output channel (the conventional way), you can simply use output-channel or a replyChannel header to send a simple acknowledgment-like reply back to the caller while using the Message publisher feature to initiate a complex flow.
EXAMPLE: Here is the simple service that receives a complex payload, which needs to be sent further for processing, but it also needs to reply to the caller with a simple acknowledgment.
public String echo(Object complexPayload) { return "ACK"; }
So instead of hooking up the complex flow to the output channel we use the Message publishing feature instead. We configure it to create a new Message using the input argument of the service method (above) and send that to the localProcessChannel. And to make sure this sub-flow is asynchronous all we need to do is send it to any type of asynchronous channel (ExecutorChannel in this example).
<int:service-activator input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/> <bean id="sampleservice" class="test.SampleService"/> <aop:config> <aop:advisor advice-ref="interceptor" pointcut="bean(sampleservice)" /> </aop:config> <int:publishing-interceptor id="interceptor" > <int:method pattern="echo" payload="#args[0]" channel="localProcessChannel"> <int:header name="sample_header" expression="'some sample value'"/> </int:method> </int:publishing-interceptor> <int:channel id="localProcessChannel"> <int:dispatcher task-executor="executor"/> </int:channel> <task:executor id="executor" pool-size="5"/>
Another way of handling this type of scenario is with a wire-tap.
==== Producing and publishing messages based on a scheduled trigger
In the above sections we looked at the Message publishing feature of Spring Integration which constructs and publishes messages as by-products of Method invocations.
However in those cases, you are still responsible for invoking the method.
In Spring Integration 2.0 we’ve added another related useful feature: support for scheduled Message producers/publishers via the new "expression" attribute on the inbound-channel-adapter element.
Scheduling could be based on several triggers, any one of which may be configured on the poller sub-element.
Currently we support cron
, fixed-rate
, fixed-delay
as well as any custom trigger implemented by you and referenced by the trigger attribute value.
As mentioned above, support for scheduled producers/publishers is provided via the <inbound-channel-adapter> xml element. Let’s look at couple of examples:
<int:inbound-channel-adapter id="fixedDelayProducer" expression="'fixedDelayTest'" channel="fixedDelayChannel"> <int:poller fixed-delay="1000"/> </int:inbound-channel-adapter>
In the above example an inbound Channel Adapter will be created which will construct a Message with its payload being the result of the expression defined in the expression
attribute.
Such messages will be created and sent every time the delay specified by the fixed-delay
attribute occurs.
<int:inbound-channel-adapter id="fixedRateProducer" expression="'fixedRateTest'" channel="fixedRateChannel"> <int:poller fixed-rate="1000"/> </int:inbound-channel-adapter>
This example is very similar to the previous one, except that we are using the fixed-rate
attribute which will allow us to send messages at a fixed rate (measuring from the start time of each task).
<int:inbound-channel-adapter id="cronProducer" expression="'cronTest'" channel="cronChannel"> <int:poller cron="7 6 5 4 3 ?"/> </int:inbound-channel-adapter>
This example demonstrates how you can apply a Cron trigger with a value specified in the cron
attribute.
<int:inbound-channel-adapter id="headerExpressionsProducer" expression="'headerExpressionsTest'" channel="headerExpressionsChannel" auto-startup="false"> <int:poller fixed-delay="5000"/> <int:header name="foo" expression="6 * 7"/> <int:header name="bar" value="x"/> </int:inbound-channel-adapter>
Here you can see that in a way very similar to the Message publishing feature we are enriching a newly constructed Message with extra Message headers which can take scalar values or the results of evaluating Spring expressions.
If you need to implement your own custom trigger you can use the trigger
attribute to provide a reference to any spring configured bean which implements the org.springframework.scheduling.Trigger
interface.
<int:inbound-channel-adapter id="triggerRefProducer" expression="'triggerRefTest'" channel="triggerRefChannel"> <int:poller trigger="customTrigger"/> </int:inbound-channel-adapter> <beans:bean id="customTrigger" class="o.s.scheduling.support.PeriodicTrigger"> <beans:constructor-arg value="9999"/> </beans:bean>
=== Understanding Transactions in Message flows
Spring Integration exposes several hooks to address transactional needs of you message flows. But to better understand these hooks and how you can benefit from them we must first revisit the 6 mechanisms that could be used to initiate Message flows and see how transactional needs of these flows could be addressed within each of these mechanisms.
Here are the 6 mechanisms to initiate a Message flow and their short summary (details for each are provided throughout this manual):
These 6 could be split in 2 general categories:
Clearly the Gateway Proxy, MessageChannel.send(..) and MessagePublisher all belong to the 1st category and Inbound Adapters/Gateways, Scheduler and Poller belong to the 2nd.
So, how do we address transactional needs in various scenarios within each category and is there a need for Spring Integration to provide something explicitly with regard to transactions for a particular scenario? Or, can Spring’s Transaction Support be leveraged instead?.
The first and most obvious goal is NOT to re-invent something that has already been invented unless you can provide a better solution. In our case Spring itself provides first class support for transaction management. So our goal here is not to provide something new but rather delegate/use Spring to benefit from the existing support for transactions. In other words as a framework we must expose hooks to the Transaction management functionality provided by Spring. But since Spring Integration configuration is based on Spring Configuration it is not always necessary to expose these hooks as they are already exposed via Spring natively. Remember every Spring Integration component is a Spring Bean after all.
With this goal in mind let’s look at the two scenarios.
If you think about it, Message flows that are initiated by the USER process (Category 1) and obviously configured in a Spring Application Context, are subject to transactional configuration of such processes and therefore don’t need to be explicitly configured by Spring Integration to support transactions.
The transaction could and should be initiated through standard Transaction support provided by Spring.
The Spring Integration message flow will honor the transactional semantics of the components naturally because it is Spring configured.
For example, a Gateway or ServiceActivator method could be annotated with @Transactional
or TransactionInterceptor
could be defined in an XML configuration with a point-cut expression pointing to specific methods that should be transactional.
The bottom line is that you have full control over transaction configuration and boundaries in these scenarios.
However, things are a bit different when it comes to Message flows initiated by the DAEMON process (Category 2). Although configured by the developer these flows do not directly involve a human or some other process to be initiated. These are trigger-based flows that are initiated by a trigger process (DAEMON process) based on the configuration of such process. For example, we could have a Scheduler initiating a message flow every Friday night of every week. We can also configure a trigger that initiates a Message flow every second, etc. So, we obviously need a way to let these trigger-based processes know of our intention to make the resulting Message flows transactional so that a Transaction context could be created whenever a new Message flow is initiated. In other words we need to expose some Transaction configuration, but ONLY enough to delegate to Transaction support already provided by Spring (as we do in other scenarios).
Spring Integration provides transactional support for Pollers. Pollers are a special type of component because we can call receive() within that poller task against a resource that is itself transactional thus including receive() call in the the boundaries of the Transaction allowing it to be rolled back in case of a task failure. If we were to add the same support for channels, the added transactions would affect all downstream components starting with that send() call. That is providing a rather wide scope for transaction demarcation without any strong reason especially when Spring already provides several ways to address the transactional needs of any component downstream. However the receive() method being included in a transaction boundary is the "strong reason" for pollers.
==== Poller Transaction Support
Any time you configure a Poller you can provide transactional configuration via the transactional sub-element and its attributes:
<int:poller max-messages-per-poll="1" fixed-rate="1000"> <transactional transaction-manager="txManager" isolation="DEFAULT" propagation="REQUIRED" read-only="true" timeout="1000"/> </poller>
As you can see this configuration looks very similar to native Spring transaction configuration.
You must still provide a reference to a Transaction manager and specify transaction attributes or rely on defaults (e.g., if the transaction-manager attribute is not specified, it will default to the bean with the name transactionManager).
Internally the process would be wrapped in Spring’s native Transaction where TransactionInterceptor
is responsible for handling transactions.
For more information on how to configure a Transaction Manager, the types of Transaction Managers (e.g., JTA, Datasource etc.) and other details related to transaction configuration please refer to Spring’s Reference manual (Chapter 10 - Transaction Management).
With the above configuration all Message flows initiated by this poller will be transactional. For more information and details on a Poller’s transactional configuration please refer to section - 21.1.1. Polling and Transactions.
Along with transactions, several more cross cutting concerns might need to be addressed when running a Poller. To help with that, the Poller element accepts an <advice-chain> _ sub-element which allows you to define a custom chain of Advice instances to be applied on the Poller. (see section 4.4 for more details) In Spring Integration 2.0, the Poller went through the a refactoring effort and is now using a proxy mechanism to address transactional concerns as well as other cross cutting concerns. One of the significant changes evolving from this effort is that we made _<transactional> and <advice-chain> elements mutually exclusive. The rationale behind this is that if you need more than one advice, and one of them is Transaction advice, then you can simply include it in the <advice-chain> with the same convenience as before but with much more control since you now have an option to position any advice in the desired order.
<int:poller max-messages-per-poll="1" fixed-rate="10000"> <advice-chain> <ref bean="txAdvice"/> <ref bean="someAotherAdviceBean" /> <beans:bean class="foo.bar.SampleAdvice"/> </advice-chain> </poller> <tx:advice id="txAdvice" transaction-manager="txManager"> <tx:attributes> <tx:method name="get*" read-only="true"/> <tx:method name="*"/> </tx:attributes> </tx:advice>
As you can see from the example above, we have provided a very basic XML-based configuration of Spring Transaction advice - "txAdvice" and included it within the <advice-chain> defined by the Poller. If you only need to address transactional concerns of the Poller, then you can still use the <transactional> element as a convenience.
Another important factor is the boundaries of Transactions within a Message flow. When a transaction is started, the transaction context is bound to the current thread. So regardless of how many endpoints and channels you have in your Message flow your transaction context will be preserved as long as you are ensuring that the flow continues on the same thread. As soon as you break it by introducing a Pollable Channel or Executor Channel or initiate a new thread manually in some service, the Transactional boundary will be broken as well. Essentially the Transaction will END right there, and if a successful handoff has transpired between the threads, the flow would be considered a success and a COMMIT signal would be sent even though the flow will continue and might still result in an Exception somewhere downstream. If such a flow were synchronous, that Exception could be thrown back to the initiator of the Message flow who is also the initiator of the transactional context and the transaction would result in a ROLLBACK. The middle ground is to use transactional channels at any point where a thread boundary is being broken. For example, you can use a Queue-backed Channel that delegates to a transactional MessageStore strategy, or you could use a JMS-backed channel.
=== Transaction Synchronization
In some environments, it is advantageous to synchronize operations with a transaction that encompasses the entire flow. For example, consider a <file:inbound-channel-adapter/> at the start of a flow, that performs a number of database updates. If the transaction commits, we might want to move the file to a success directory, while we might want to move it to a failures directory if the transaction rolls back.
Spring Integration 2.2 introduces the capability of synchronizing these operations with a transaction.
In addition, you can configure a PseudoTransactionManager
if you don’t have a real transaction, but still want to perform different actions on success, or failure.
For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.
Key strategy interfaces for this feature are
public interface TransactionSynchronizationFactory { TransactionSynchronization create(Object key); } public interface TransactionSynchronizationProcessor { void processBeforeCommit(IntegrationResourceHolder holder); void processAfterCommit(IntegrationResourceHolder holder); void processAfterRollback(IntegrationResourceHolder holder); }
The factory is responsible for creating a TransactionSynchronization object.
You can implement your own, or use the one provided by the framework: DefaultTransactionSynchronizationFactory
.
This implementation returns a TransactionSynchronization
that delegates to a default implementation of TransactionSynchronizationProcessor
, the ExpressionEvaluatingTransactionSynchronizationProcessor
.
This processor supports three SpEL expressions, beforeCommitExpression, afterCommitExpression, and afterRollbackExpression.
These actions should be self-explanatory to those familiar with transactions.
In each case, the #root variable is the original Message
; in some cases, other SpEL variables are made available, depending on the MessageSource
being polled by the poller.
For example, the MongoDbMessageSource
provides the #mongoTemplate variable which references the message source’s MongoTemplate
; the RedisStoreMessageSource
provides the #store variable which references the RedisStore
created by the poll.
To enable the feature for a particular poller, you provide a reference to the TransactionSynchronizationFactory
on the poller’s <transactional/> element using the synchronization-factory attribute.
To simplify configuration of these components, namespace support for the default factory has been provided. Configuration is best described using an example:
<int-file:inbound-channel-adapter id="inputDirPoller" channel="someChannel" directory="/foo/bar" filter="filter" comparator="testComparator"> <int:poller fixed-rate="5000"> <int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" /> </int:poller> </int-file:inbound-channel-adapter> <int:transaction-synchronization-factory id="syncFactory"> <int:after-commit expression="payload.renameTo('/success/' + payload.name)" channel="committedChannel" /> <int:after-rollback expression="payload.renameTo('/failed/' + payload.name)" channel="rolledBackChannel" /> </int:transaction-synchronization-factory>
The result of the SpEL evaluation is sent as the payload to either the committedChannel or rolledBackChannel (in this case, this would be Boolean.TRUE
or Boolean.FALSE
- the result of the java.io.File.renameTo()
method call).
If you wish to send the entire payload for further Spring Integration processing, simply use the expression payload.
Important | |
---|---|
It is important to understand that this is simply synchronizing the actions with a transaction, it does not make a resource that is not inherently transactional actually transactional. Instead, the transaction (be it JDBC or otherwise) is started before the poll, and committed/rolled back when the flow completes, followed by the synchronized action. It is also important to understand that if you provide a custom |
In addition to the after-commit and after-rollback expressions, before-commit is also supported. In that case, if the evaluation (or downstream processing) throws an exception, the transaction will be rolled back instead of being committed.
Referring to the above section, you may be thinking it would be useful to take these success or failure actions when a flow completes, even if there is no real transactional resources (such as JDBC) downstream of the poller. For example, consider a <file:inbound-channel-adapter/> followed by an <ftp:outbout-channel-adapter/>. Neither of these components is transactional but we might want to move the input file to different directories, based on the success or failure of the ftp transfer.
To provide this functionality, the framework provides a PseudoTransactionManager
, enabling the above configuration even when there is no real transactional resource involved.
If the flow completes normally, the beforeCommit and afterCommit synchronizations will be called, on failure the afterRollback will be called.
Of course, because it is not a real transaction there will be no actual commit or rollback.
The pseudo transaction is simply a vehicle used to enable the synchronization features.
To use a PseudoTransactionManager
, simply define it as a <bean/>, in the same way you would configure a real transaction manager:
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />
== Security in Spring Integration
Security is one of the important functions in any modern enterprise (or cloud) application,
moreover it is critical for distributed systems, such as those built using Enterprise
Integration Patterns.
Messaging independence and loosely-coupling allow target systems to communicate with each other
with any type of data in the message’s payload
.
We can either trust all those messages or secure our service against "infecting" messages.
Spring Integration together with Spring Security provide a simple and comprehensive way to secure message channels, as well as other part of the integration solution.
Spring Integration provides the interceptor ChannelSecurityInterceptor
, which extends AbstractSecurityInterceptor
and intercepts send and receive calls on the channel.
Access decisions are then made with reference to a ChannelSecurityMetadataSource
which provides the metadata describing the send and receive access policies for certain channels.
The interceptor requires that a valid SecurityContext
has been established by authenticating with Spring Security.
See the Spring Security reference documentation for details.
Namespace support is provided to allow easy configuration of security constraints.
This consists of the secured channels tag which allows definition of one or more channel name patterns in conjunction with a definition of the security configuration for send and receive.
The pattern is a java.util.regexp.Pattern
.
<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-security="http://www.springframework.org/schema/integration/security" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:security="http://www.springframework.org/schema/security" xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/security https://www.springframework.org/schema/security/spring-security.xsd http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/security https://www.springframework.org/schema/integration/security/spring-integration-security.xsd"> <int-security:secured-channels> <int-security:access-policy pattern="admin.*" send-access="ROLE_ADMIN"/> <int-security:access-policy pattern="user.*" receive-access="ROLE_USER"/> </int-security:secured-channels>
By default the secured-channels namespace element expects a bean named authenticationManager which implements AuthenticationManager
and a bean named accessDecisionManager which implements AccessDecisionManager
.
Where this is not the case references to the appropriate beans can be configured as attributes of the secured-channels element as below.
<int-security:secured-channels access-decision-manager="customAccessDecisionManager" authentication-manager="customAuthenticationManager"> <int-security:access-policy pattern="admin.*" send-access="ROLE_ADMIN"/> <int-security:access-policy pattern="user.*" receive-access="ROLE_USER"/> </int-security:secured-channels>
Starting with version 4.2, the @SecuredChannel
annotation is available for Java & Annotation
configuration in @Configuration
classes.
With the @SecuredChannel
annotation, the Java configuration variant of the XML configuration above is:
@Configuration @EnableIntegration public class ContextConfiguration { @Bean @SecuredChannel(interceptor = "channelSecurityInterceptor", sendAccess = "ROLE_ADMIN") public SubscribableChannel adminChannel() { return new DirectChannel(); } @Bean @SecuredChannel(interceptor = "channelSecurityInterceptor", receiveAccess = "ROLE_USER") public SubscribableChannel userChannel() { return new DirectChannel(); } @Bean public ChannelSecurityInterceptor channelSecurityInterceptor(AuthenticationManager authenticationManager, AccessDecisionManager accessDecisionManager) { ChannelSecurityInterceptor channelSecurityInterceptor = new ChannelSecurityInterceptor(); channelSecurityInterceptor.setAuthenticationManager(authenticationManager); channelSecurityInterceptor.setAccessDecisionManager(accessDecisionManager); return channelSecurityInterceptor; } }
=== SecurityContext Propagation
To be sure that our interaction with the application is secure, according to its security system rules, we should supply
some security context with an authentication (principal) object.
The Spring Security project provides a flexible, canonical mechanism to authenticate our application clients
over HTTP, WebSocket or SOAP protocols (as can be done for any other integration protocol
with a simple Spring Security extension) and it provides a SecurityContext
for further authorization checks on the
application objects, such as message channels.
By default, the SecurityContext
is tied with the current Thread
's execution state using the
(ThreadLocalSecurityContextHolderStrategy
).
It is accessed by an AOP interceptor on secured methods to check if that principal
of the invocation has
sufficent permissions to call that method, for example.
This works well with the current thread, but often, processing logic can be performed on another thread or even
on several threads, or on to some external system(s).
Standard thread-bound behavior is easy to configure if our application is built on the Spring Integration components
and its message channels.
In this case, the secured objects may be any service activator or transformer, secured with a
MethodSecurityInterceptor
in their <request-handler-advice-chain>
(see Section 8.9, “Adding Behavior to Endpoints”)
or even MessageChannel
(see Section 31.3.2, “TCP Failover Client Connection Factory” above).
When using DirectChannel
communication, the SecurityContext
is available
automatically, because the downstream flow runs on the current thread.
But in case of the QueueChannel
, ExecutorChannel
and PublishSubscribeChannel
with an Executor
, messages are
transferred from one thread to another (or several) by the nature of those channels.
In order to support such scenarios,
we can either transfer an Authentication
object within the message headers and extract and authenticate it on the
other side before secured object access.
Or, we can propagate the SecurityContext
to the thread receiving the transferred message.
Starting with version 4.2 SecurityContext
propagation has been introduced.
It is implemented as a SecurityContextPropagationChannelInterceptor
, which can simply be added to any MessageChannel
or configured as a @GlobalChannelInterceptor
.
The logic of this interceptor is based on the SecurityContext
extraction from the current thread from the preSend()
method, and its populating to another thread from the postReceive()
(beforeHandle()
) method.
Actually, this interceptor is an extension of the more generic ThreadStatePropagationChannelInterceptor
, which wraps
the message-to-send together with the state-to-propagate in an internal Message<?>
extension -
MessageWithThreadState<S>
, - on one side and extracts the original message back and state-to-propagate on another.
The ThreadStatePropagationChannelInterceptor
can be extended for any context propagation use-case and
SecurityContextPropagationChannelInterceptor
is a good sample on the matter.
Important | |
---|---|
Since the logic of the |
Propagation and population of SecurityContext
is just one half of the work.
Since the message isn’t an owner of the threads in the message flow and we should be sure that we are secure against
any incoming messages, we have to clean up the SecurityContext
from ThreadLocal
.
The SecurityContextPropagationChannelInterceptor
provides afterMessageHandled()
interceptor’s method
implementation to do the clean up operation to free the Thread in the end of invocation from that propagated principal.
This means that, when the thread that processes the handed-off message, completes the processing of the message
(successfully or otherwise), the context is cleared so that it can’t be inadvertently be used when processing another
message.
As of Spring Integration 2.0, the samples are no longer included with the Spring Integration distribution. Instead we have switched to a much simpler collaborative model that should promote better community participation and, ideally, more contributions. Samples now have a dedicated Git repository and a dedicated JIRA Issue Tracking system. Sample development will also have its own lifecycle which is not dependent on the lifecycle of the framework releases, although the repository will still be tagged with each major release for compatibility reasons.
The great benefit to the community is that we can now add more samples and make them available to you right away without waiting for the next release. Having its own JIRA that is not tied to the the actual framework is also a great benefit. You now have a dedicated place to suggest samples as well as report issues with existing samples. Or, _ you may want to submit a sample to us_ as an attachment through the JIRA or, better, through the collaborative model that Git promotes. If we believe your sample adds value, we would be more then glad to add it to the samples repository, properly crediting you as the author.
The Spring Integration Samples project is hosted on GitHub. You can find the repository at:
https://github.com/SpringSource/spring-integration-samples
In order to check out or clone (Git parlance) the samples, please make sure you have a Git client installed on your system. There are several GUI-based products available for many platforms, e.g. EGit for the Eclipse IDE. A simple Google search will help you find them. Of course you can also just use the command line interface for <https://git-scm.com/,Git>.
Note | |
---|---|
If you need more information on how to install and/or use Git, please visit: https://git-scm.com/. |
In order to checkout (clone in Git terms) the Spring Integration samples repository using the Git command line tool, issue the following commands:
$ git clone https://github.com/SpringSource/spring-integration-samples.git
That is all you need to do in order to clone the entire samples repository into a directory named spring-integration-samples within the working directory where you issued that git command. Since the samples repository is a live repository, you might want to perform periodic pulls (updates) to get new samples, as well as updates to the existing samples. In order to do so issue the following git pull command:
$ git pull
=== Submitting Samples or Sample Requests
How can I contribute my own Samples?
Github is for social coding: if you want to submit your own code examples to the Spring Integration Samples project, we encourage contributions through pull requests from forks of this repository. If you want to contribute code this way, please reference, if possible, ahttps://jira.springframework.org/browse/INTSAMPLES[JIRA Ticket] that provides some details regarding the provided sample.
Sign the contributor license agreement | |
---|---|
Very important: before we can accept your Spring Integration sample, we will need you to sign the SpringSource contributor license agreement (CLA). Signing the contributor’s agreement does not grant anyone commit rights to the main repository, but it does mean that we can accept your contributions, and you will get an author credit if we do. In order to read and sign the CLA, please go to: https://support.springsource.com/spring_committer_signup From the Project drop down, please select Spring Integration. The Project Lead is Gary Russell. |
Code Contribution Process
For the actual code contribution process, please read the the Contributor Guidelines for Spring Integration, they apply for this project as well:
https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md
This process ensures that every commit gets peer-reviewed. As a matter of fact, the core committers follow the exact same rules. We are gratefully looking forward to your Spring Integration Samples!
Sample Requests
As mentioned earlier, the Spring Integration Samples project has a dedicated JIRA Issue tracking system. To submit new sample requests, please visit our JIRA Issue Tracking system:
https://jira.springframework.org/browse/INTSAMPLES.
Starting with Spring Integration 2.0, the structure of the samples changed as well. With plans for more samples we realized that some samples have different goals than others. While they all share the common goal of showing you how to apply and work with the Spring Integration framework, they also differ in areas where some samples are meant to concentrate on a technical use case while others focus on a business use case, and some samples are all about showcasing various techniques that could be applied to address certain scenarios (both technical and business). The new categorization of samples will allow us to better organize them based on the problem each sample addresses while giving you a simpler way of finding the right sample for your needs.
Currently there are 4 categories. Within the samples repository each category has its own directory which is named after the category name:
BASIC (samples/basic)
This is a good place to get started. The samples here are technically motivated and demonstrate the bare minimum with regard to configuration and code. These should help you to get started quickly by introducing you to the basic concepts, API and configuration of Spring Integration as well as Enterprise Integration Patterns (EIP). For example, if you are looking for an answer on how to implement and wire a Service Activator to a Message Channel or how to use a Messaging Gateway as a facade to your message exchange, or how to get started with using MAIL or TCP/UDP modules etc., this would be the right place to find a good sample. The bottom line is this is a good place to get started.
INTERMEDIATE (samples/intermediate)
This category targets developers who are already familiar with the Spring Integration framework (past getting started), but need some more guidance while resolving the more advanced technical problems one might deal with after switching to a Messaging architecture. For example, if you are looking for an answer on how to handle errors in various message exchange scenarios or how to properly configure the Aggregator for the situations where some messages might not ever arrive for aggregation, or any other issue that goes beyond a basic implementation and configuration of a particular component and addresses what else types of problems, this would be the right place to find these type of samples.
ADVANCED (samples/advanced)
This category targets developers who are very familiar with the Spring Integration framework but are looking to extend it to address a specific custom need by using Spring Integration’s public API. For example, if you are looking for samples showing you how to implement a custom Channel or Consumer (event-based or polling-based), or you are trying to figure out what is the most appropriate way to implement a custom Bean parser on top of the Spring Integration Bean parser hierarchy when implementing your own namespace and schema for a custom component, this would be the right place to look. Here you can also find samples that will help you with Adapter development. Spring Integration comes with an extensive library of adapters to allow you to connect remote systems with the Spring Integration messaging framework. However you might have a need to integrate with a system for which the core framework does not provide an adapter. So, you may decide to implement your own (and potentially contribute it). This category would include samples showing you how.
APPLICATIONS (samples/applications)
This category targets developers and architects who have a good understanding of Message-driven architecture and EIP, and an above average understanding of Spring and Spring Integration who are looking for samples that address a particular business problem. In other words the emphasis of samples in this category is business use cases and how they can be solved with a Message-Driven Architecture and Spring Integration in particular. For example, if you are interested to see how a Loan Broker or Travel Agent process could be implemented and automated via Spring Integration, this would be the right place to find these types of samples.
Important | |
---|---|
Remember: Spring Integration is a community driven framework, therefore community participation is IMPORTANT. That includes Samples; so, if you can’t find what you are looking for, let us know! |
Currently Spring Integration comes with quite a few samples and you can only expect more.
To help you better navigate through them, each sample comes with its own readme.txt
file which covers several details about the sample (e.g., what EIP patterns it addresses, what problem it is trying to solve, how to run sample etc.).
However, certain samples require a more detailed and sometimes graphical explanation.
In this section you’ll find details on samples that we believe require special attention.
In this section, we will review the Loan Broker sample application that is included in the Spring Integration samples. This sample is inspired by one of the samples featured in Gregor Hohpe and Bobby Woolf’s book, Enterprise Integration Patterns.
The diagram below represents the entire process
Now lets look at this process in more detail
At the core of an EIP architecture are the very simple yet powerful concepts of Pipes and Filters, and of course: Messages. Endpoints (Filters) are connected with one another via Channels (Pipes). The producing endpoint sends Message to the Channel, and the Message is retrieved by the Consuming endpoint. This architecture is meant to define various mechanisms that describe HOW information is exchanged between the endpoints, without any awareness of WHAT those endpoints are or what information they are exchanging. Thus, it provides for a very loosely coupled and flexible collaboration model while also decoupling Integration concerns from Business concerns. EIP extends this architecture by further defining:
The details and variations of this use case are very nicely described in Chapter 9 of the EIP Book, but here is the brief summary; A Consumer while shopping for the best Loan Quote(s) subscribes to the services of a Loan Broker, which handles details such as:
Obviously the real process of obtaining a loan quote is a bit more complex, but since our goal here is to demonstrate how Enterprise Integration Patterns are realized and implemented within SI, the use case has been simplified to concentrate only on the Integration aspects of the process. It is not an attempt to give you an advice in consumer finances.
As you can see, by hiring a Loan Broker, the consumer is isolated from the details of the Loan Broker’s operations, and each Loan Broker’s operations may defer from one another to maintain competitive advantage, so whatever we assemble/implement must be flexible so any changes could be introduced quickly and painlessly. Speaking of change, the Loan Broker sample does not actually talk to any imaginary Banks or Credit bureaus. Those services are stubbed out. Our goal here is to assemble, orchestrate and test the integration aspect of the process as a whole. Only then can we start thinking about wiring such process to the real services. At that time the assembled process and its configuration will not change regardless of the number of Banks a particular Loan Broker is dealing with, or the type of communication media (or protocols) used (JMS, WS, TCP, etc.) to communicate with these Banks.
DESIGN
As you analyze the 6 requirements above you’ll quickly see that they all fall into the category of Integration concerns. For example, in the consumer pre-screening step we need to gather additional information about the consumer and the consumer’s desires and enrich the loan request with additional meta information. We then have to filter such information to select the most appropriate list of Banks, and so on. Enrich, filter, select – these are all integration concerns for which EIP defines a solution in the form of patterns. SI provides an implementation of these patterns.
The Messaging Gateway pattern provides a simple mechanism to access messaging systems, including our Loan Broker. In SI you define the Gateway as a Plain Old Java Interface (no need to provide an implementation), configure it via the XML <gateway> element or via annotation and use it as any other Spring bean. SI will take care of delegating and mapping method invocations to the Messaging infrastructure by generating a Message (payload is mapped to an input parameter of the method) and sending it to the designated channel.
<int:gateway id="loanBrokerGateway" default-request-channel="loanBrokerPreProcessingChannel" service-interface="org.springframework.integration.samples.loanbroker.LoanBrokerGateway"> <int:method name="getBestLoanQuote"> <int:header name="RESPONSE_TYPE" value="BEST"/> </int:method> </int:gateway>
Our current Gateway provides two methods that could be invoked. One that will return the best single quote and another one that will return all quotes. Somehow downstream we need to know what type of reply the caller is looking for. The best way to achieve this in Messaging architecture is to enrich the content of the message with some meta-data describing your intentions. Content Enricher is one of the patterns that addresses this and although Spring Integration does provide a separate configuration element to enrich Message Headers with arbitrary data (we’ll see it later), as a convenience, since_Gateway_ element is responsible to construct the initial Message it provides embedded capability to enrich the newly created Message with arbitrary Message Headers. In our example we are adding header RESPONSE_TYPE with value BEST' whenever the getBestQuote() method is invoked. For other method we are not adding any header. Now we can check downstream for an existence of this header and based on its presence and its value we can determine what type of reply the caller is looking for.
Based on the use case we also know there are some pre-screening steps that needs to be performed such as getting and evaluating the consumer’s credit score, simply because some premiere Banks will only typically accept quote requests from consumers that meet a minimum credit score requirement. So it would be nice if the Message would be enriched with such information before it is forwarded to the Banks. It would also be nice if when several processes needs to be completed to provide such meta-information, those processes could be grouped in a single unit. In our use case we need to determine credit score and based on the credit score and some rule select a list of Message Channels (Bank Channels) we will sent quote request to.
Composed Message Processor
The Composed Message Processor pattern describes rules around building endpoints that maintain control over message flow which consists of multiple message processors. In Spring Integration Composed Message Processor pattern is implemented via <chain> element.
As you can see from the above configuration we have a chain with inner header-enricher element which will further enrich the content of the Message with the header CREDIT_SCORE and value that will be determined by the call to a credit service (simple POJO spring bean identified by creditBureau name) and then it will delegate to the Message Router
There are several implementations of the Message Routing pattern available in Spring Integration. Here we are using a router that will determine a list of channels based on evaluating an expression (Spring Expression Language) which will look at the credit score that was determined is the previous step and will select the list of channels from the Map bean with id banks whose values are premier or secondary based o the value of credit score. Once the list of Channels is selected, the Message will be routed to those Channels.
Now, one last thing the Loan Broker needs to to is to receive the loan quotes form the banks, aggregate them by consumer (we don’t want to show quotes from one consumer to another), assemble the response based on the consumer’s selection criteria (single best quote or all quotes) and reply back to the consumer.
An Aggregator pattern describes an endpoint which groups related Messages into a single Message. Criteria and rules can be provided to determine an aggregation and correlation strategy. SI provides several implementations of the Aggregator pattern as well as a convenient name-space based configuration.
<int:aggregator id="quotesAggregator" input-channel="quotesAggregationChannel" method="aggregateQuotes"> <beans:bean class="org.springframework.integration.samples.loanbroker.LoanQuoteAggregator"/> </int:aggregator>
Our Loan Broker defines a quotesAggregator bean via the <aggregator> element which provides a default aggregation and correlation strategy.
The default correlation strategy correlates messages based on the correlationId
header (see Correlation Identifier pattern).
What’s interesting is that we never provided the value for this header.
It was set earlier by the router automatically, when it generated a separate Message for each Bank channel.
Once the Messages are correlated they are released to the actual Aggregator implementation. Although default Aggregator is provided by SI, its strategy (gather the list of payloads from all Messages and construct a new Message with this List as payload) does not satisfy our requirement. The reason is that our consumer might require a single best quote or all quotes. To communicate the consumer’s intention, earlier in the process we set the RESPONSE_TYPE header. Now we have to evaluate this header and return either all the quotes (the default aggregation strategy would work) or the best quote (the default aggregation strategy will not work because we have to determine which loan quote is the best).
Obviously selecting the best quote could be based on complex criteria and would influence the complexity of the aggregator implementation and configuration, but for now we are making it simple.
If consumer wants the best quote we will select a quote with the lowest interest rate.
To accomplish that the LoanQuoteAggregator.java will sort all the quotes and return the first one.
The LoanQuote.java
implements Comparable
which compares quotes based on the rate attribute.
Once the response Message is created it is sent to the default-reply-channel of the Messaging Gateway (thus the consumer) which started the process.
Our consumer got the Loan Quote!
Conclusion
As you can see a rather complex process was assembled based on POJO (read existing, legacy), light weight, embeddable messaging framework (Spring Integration) with a loosely coupled programming model intended to simplify integration of heterogeneous systems without requiring a heavy-weight ESB-like engine or proprietary development and deployment environment, because as a developer you should not be porting your Swing or console-based application to an ESB-like server or implementing proprietary interfaces just because you have an integration concern.
This and other samples in this section are built on top of Enterprise Integration Patterns and can be considered "building blocks" for YOUR solution; they are not intended to be complete solutions. Integration concerns exist in all types of application (whether server based or not). It should not require change in design, testing and deployment strategy if such applications need to be integrated.
In this section, we will review a Cafe sample application that is included in the Spring Integration samples. This sample is inspired by another sample featured in Gregor Hohpe’s https://www.enterpriseintegrationpatterns.com/ramblings.html[Ramblings].
The domain is that of a Cafe, and the basic flow is depicted in the following diagram:
The Order
object may contain multiple OrderItems
.
Once the order is placed, a Splitter will break the composite order message into a single message per drink.
Each of these is then processed by a Router that determines whether the drink is hot or cold (checking the OrderItem
object’s isIced property).
The Barista
prepares each drink, but hot and cold drink preparation are handled by two distinct methods: prepareHotDrink and prepareColdDrink.
The prepared drinks are then sent to the Waiter where they are aggregated into a Delivery
object.
Here is the XML configuration:
<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns:int="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/stream https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd"> <int:gateway id="cafe" service-interface="o.s.i.samples.cafe.Cafe"/> <int:channel id="orders"/> <int:splitter input-channel="orders" ref="orderSplitter" method="split" output-channel="drinks"/> <int:channel id="drinks"/> <int:router input-channel="drinks" ref="drinkRouter" method="resolveOrderItemChannel"/> <int:channel id="coldDrinks"><int:queue capacity="10"/></int:channel> <int:service-activator input-channel="coldDrinks" ref="barista" method="prepareColdDrink" output-channel="preparedDrinks"/> <int:channel id="hotDrinks"><int:queue capacity="10"/></int:channel> <int:service-activator input-channel="hotDrinks" ref="barista" method="prepareHotDrink" output-channel="preparedDrinks"/> <int:channel id="preparedDrinks"/> <int:aggregator input-channel="preparedDrinks" ref="waiter" method="prepareDelivery" output-channel="deliveries"/> <int-stream:stdout-channel-adapter id="deliveries"/> <beans:bean id="orderSplitter" class="org.springframework.integration.samples.cafe.xml.OrderSplitter"/> <beans:bean id="drinkRouter" class="org.springframework.integration.samples.cafe.xml.DrinkRouter"/> <beans:bean id="barista" class="o.s.i.samples.cafe.xml.Barista"/> <beans:bean id="waiter" class="o.s.i.samples.cafe.xml.Waiter"/> <int:poller id="poller" default="true" fixed-rate="1000"/> </beans:beans>
As you can see, each Message Endpoint is connected to input and/or output channels. Each endpoint will manage its own Lifecycle (by default endpoints start automatically upon initialization - to prevent that add the "auto-startup" attribute with a value of "false"). Most importantly, notice that the objects are simple POJOs with strongly typed method arguments. For example, here is the Splitter:
public class OrderSplitter { public List<OrderItem> split(Order order) { return order.getItems(); } }
In the case of the Router, the return value does not have to be a MessageChannel
instance (although it can be).
As you see in this example, a String-value representing the channel name is returned instead.
public class DrinkRouter { public String resolveOrderItemChannel(OrderItem orderItem) { return (orderItem.isIced()) ? "coldDrinks" : "hotDrinks"; } }
Now turning back to the XML, you see that there are two <service-activator> elements.
Each of these is delegating to the same Barista
instance but different methods: prepareHotDrink or prepareColdDrink corresponding to the two channels where order items have been routed.
public class Barista { private long hotDrinkDelay = 5000; private long coldDrinkDelay = 1000; private AtomicInteger hotDrinkCounter = new AtomicInteger(); private AtomicInteger coldDrinkCounter = new AtomicInteger(); public void setHotDrinkDelay(long hotDrinkDelay) { this.hotDrinkDelay = hotDrinkDelay; } public void setColdDrinkDelay(long coldDrinkDelay) { this.coldDrinkDelay = coldDrinkDelay; } public Drink prepareHotDrink(OrderItem orderItem) { try { Thread.sleep(this.hotDrinkDelay); System.out.println(Thread.currentThread().getName() + " prepared hot drink #" + hotDrinkCounter.incrementAndGet() + " for order #" + orderItem.getOrder().getNumber() + ": " + orderItem); return new Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(), orderItem.isIced(), orderItem.getShots()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } } public Drink prepareColdDrink(OrderItem orderItem) { try { Thread.sleep(this.coldDrinkDelay); System.out.println(Thread.currentThread().getName() + " prepared cold drink #" + coldDrinkCounter.incrementAndGet() + " for order #" + orderItem.getOrder().getNumber() + ": " + orderItem); return new Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(), orderItem.isIced(), orderItem.getShots()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } } }
As you can see from the code excerpt above, the barista methods have different delays (the hot drinks take 5 times as long to prepare).
This simulates work being completed at different rates.
When the`CafeDemo` main method runs, it will loop 100 times sending a single hot drink and a single cold drink each time.
It actually sends the messages by invoking the placeOrder method on the Cafe interface.
Above, you will see that the <gateway> element is specified in the configuration file.
This triggers the creation of a proxy that implements the given service-interface and connects it to a channel.
The channel name is provided on the @Gateway annotation of the Cafe
interface.
public interface Cafe { @Gateway(requestChannel="orders") void placeOrder(Order order); }
Finally, have a look at the main()
method of the CafeDemo
itself.
public static void main(String[] args) { AbstractApplicationContext context = null; if (args.length > 0) { context = new FileSystemXmlApplicationContext(args); } else { context = new ClassPathXmlApplicationContext("cafeDemo.xml", CafeDemo.class); } Cafe cafe = context.getBean("cafe", Cafe.class); for (int i = 1; i <= 100; i++) { Order order = new Order(i); order.addItem(DrinkType.LATTE, 2, false); order.addItem(DrinkType.MOCHA, 3, true); cafe.placeOrder(order); } }
Tip | |
---|---|
To run this sample as well as 8 others, refer to the |
When you run cafeDemo, you will see that the cold drinks are initially prepared more quickly than the hot drinks. Because there is an aggregator, the cold drinks are effectively limited by the rate of the hot drink preparation. This is to be expected based on their respective delays of 1000 and 5000 milliseconds. However, by configuring a poller with a concurrent task executor, you can dramatically change the results. For example, you could use a thread pool executor with 5 workers for the hot drink barista while keeping the cold drink barista as it is:
<int:service-activator input-channel="hotDrinks" ref="barista" method="prepareHotDrink" output-channel="preparedDrinks"/> <int:service-activator input-channel="hotDrinks" ref="barista" method="prepareHotDrink" output-channel="preparedDrinks"> <int:poller task-executor="pool" fixed-rate="1000"/> </int:service-activator> <task:executor id="pool" pool-size="5"/>
Also, notice that the worker thread name is displayed with each invocation. You will see that the hot drinks are prepared by the task-executor threads. If you provide a much shorter poller interval (such as 100 milliseconds), then you will notice that occasionally it throttles the input by forcing the task-scheduler (the caller) to invoke the operation.
Note | |
---|---|
In addition to experimenting with the poller’s concurrency settings, you can also add the transactional sub-element and then refer to any PlatformTransactionManager instance within the context. |
The xml messaging sample in basic/xml
illustrates how to use some of the provided components which deal with xml payloads.
The sample uses the idea of processing an order for books represented as xml.
NOTE:This sample shows that the namespace prefix can be whatever you want; while we usually use, int-xml
for
integration XML components, the sample uses si-xml
.
First the order is split into a number of messages, each one representing a single order item using the XPath splitter component.
<si-xml:xpath-splitter id="orderItemSplitter" input-channel="ordersChannel" output-channel="stockCheckerChannel" create-documents="true"> <si-xml:xpath-expression expression="/orderNs:order/orderNs:orderItem" namespace-map="orderNamespaceMap" /> </si-xml:xpath-splitter>
A service activator is then used to pass the message into a stock checker POJO. The order item document is enriched with information from the stock checker about order item stock level. This enriched order item message is then used to route the message. In the case where the order item is in stock the message is routed to the warehouse.
<si-xml:xpath-router id="instockRouter" input-channel="orderRoutingChannel" resolution-required="true"> <si-xml:xpath-expression expression="/orderNs:orderItem/@in-stock" namespace-map="orderNamespaceMap" /> <si-xml:mapping value="true" channel="warehouseDispatchChannel"/> <si-xml:mapping value="false" channel="outOfStockChannel"/> </si-xml:xpath-router>
Where the order item is not in stock the message is transformed using xslt into a format suitable for sending to the supplier.
<si-xml:xslt-transformer input-channel="outOfStockChannel" output-channel="resupplyOrderChannel" xsl-resource="classpath:org/springframework/integration/samples/xml/bigBooksSupplierTransformer.xsl"/>
Spring Integration offers a number of configuration options. Which option you choose depends upon your particular needs and at what level you prefer to work. As with the Spring framework in general, it is also possible to mix and match the various techniques according to the particular problem at hand. For example, you may choose the XSD-based namespace for the majority of configuration combined with a handful of objects that are configured with annotations. As much as possible, the two provide consistent naming. XML elements defined by the XSD schema will match the names of annotations, and the attributes of those XML elements will match the names of annotation properties. Direct usage of the API is of course always an option, but we expect that most users will choose one of the higher-level options, or a combination of the namespace-based and annotation-driven configuration.
Spring Integration components can be configured with XML elements that map directly to the terminology and concepts of enterprise integration. In many cases, the element names match those of the Enterprise Integration Patterns.
To enable Spring Integration’s core namespace support within your Spring configuration files, add the following namespace reference and schema mapping in your top-level beans element:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd">
You can choose any name after "xmlns:"; int is used here for clarity, but you might prefer a shorter abbreviation. Of course if you are using an XML-editor or IDE support, then the availability of auto-completion may convince you to keep the longer name for clarity. Alternatively, you can create configuration files that use the Spring Integration schema as the primary namespace:
<beans:beans xmlns="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd">
When using this alternative, no prefix is necessary for the Spring Integration elements.
On the other hand, if you want to define a generic Spring "bean" within the same configuration file, then a prefix would be required for the bean element (<beans:bean .../>
).
Since it is generally a good idea to modularize the configuration files themselves based on responsibility and/or architectural layer, you may find it appropriate to use the latter approach in the integration-focused configuration files, since generic beans are seldom necessary within those same files.
For purposes of this documentation, we will assume the "integration" namespace is primary.
Many other namespaces are provided within the Spring Integration distribution. In fact, each adapter type (JMS, File, etc.) that provides namespace support defines its elements within a separate schema. In order to use these elements, simply add the necessary namespaces with an "xmlns" entry and the corresponding "schemaLocation" mapping. For example, the following root element shows several of these namespace declarations:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-mail="http://www.springframework.org/schema/integration/mail" xmlns:int-rmi="http://www.springframework.org/schema/integration/rmi" xmlns:int-ws="http://www.springframework.org/schema/integration/ws" xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/file https://www.springframework.org/schema/integration/file/spring-integration-file.xsd http://www.springframework.org/schema/integration/jms https://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd http://www.springframework.org/schema/integration/mail https://www.springframework.org/schema/integration/mail/spring-integration-mail.xsd http://www.springframework.org/schema/integration/rmi https://www.springframework.org/schema/integration/rmi/spring-integration-rmi.xsd http://www.springframework.org/schema/integration/ws https://www.springframework.org/schema/integration/ws/spring-integration-ws.xsd"> ... </beans>
The reference manual provides specific examples of the various elements in their corresponding chapters. Here, the main thing to recognize is the consistency of the naming for each namespace URI and schema location.
=== Configuring the Task Scheduler
In Spring Integration, the ApplicationContext plays the central role of a Message Bus, and there are only a couple configuration options to consider. First, you may want to control the central TaskScheduler instance. You can do so by providing a single bean with the name "taskScheduler". This is also defined as a constant:
IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME
By default Spring Integration relies on an instance of ThreadPoolTaskScheduler as described in the Task Execution and Scheduling section of the Spring Framework reference manual. That default TaskScheduler will startup automatically with a pool of 10 threads, but see Section 31.3.2, “TCP Failover Client Connection Factory”. If you provide your own TaskScheduler instance instead, you can set the autoStartup property to false, and/or you can provide your own pool size value.
When Polling Consumers provide an explicit task-executor reference in their configuration, the invocation of the handler methods will happen within that executor’s thread pool and not the main scheduler pool. However, when no task-executor is provided for an endpoint’s poller, it will be invoked by one of the main scheduler’s threads.
Caution | |
---|---|
Do not run long-running tasks on poller threads; use a task executor instead.
If you have a lot of polling endpoints, you can cause thread starvation, unless you increase the pool size.
Also, polling consumers have a default |
Note | |
---|---|
An endpoint is a Polling Consumer if its input channel is one of the queue-based (i.e. pollable) channels. Event Driven Consumers are those having input channels that have dispatchers instead of queues (i.e. they are subscribable). Such endpoints have no poller configuration since their handlers will be invoked directly. |
Important | |
---|---|
When running in a JEE container, you may need to use Spring’s <bean id="taskScheduler" class="o.s.scheduling.commonj.TimerManagerTaskScheduler"> <property name="timerManagerName" value="tm/MyTimerManager" /> <property name="resourceRef" value="true" /> </bean> |
The next section will describe what happens if Exceptions occur within the asynchronous invocations.
As described in the overview at the very beginning of this manual, one of the main motivations behind a Message-oriented framework like Spring Integration is to promote loose-coupling between components. The Message Channel plays an important role in that producers and consumers do not have to know about each other. However, the advantages also have some drawbacks. Some things become more complicated in a very loosely coupled environment, and one example is error handling.
When sending a Message to a channel, the component that ultimately handles that Message may or may not be operating within the same thread as the sender.
If using a simple default DirectChannel
(with the <channel>
element that has no <queue>
sub-element and no task-executor attribute),
the Message-handling will occur in the same thread as the Message-sending.
In that case, if an Exception
is thrown, it can be caught by the sender (or it may propagate past the sender if it is an uncaught RuntimeException
).
So far, everything is fine.
This is the same behavior as an Exception-throwing operation in a normal call stack.
However, when adding the asynchronous aspect, things become much more complicated.
For instance, if the channel element does provide a queue sub-element, then the component that handles the Message will be operating in a different thread than the sender.
The sender may have dropped the Message
into the channel and moved on to other things.
There is no way for the Exception
to be thrown directly back to that sender using standard Exception
throwing techniques.
Instead, to handle errors for asynchronous processes requires an asynchronous error-handling mechanism as well.
Spring Integration supports error handling for its components by publishing errors to a Message Channel.
Specifically, the Exception
will become the payload of a Spring Integration Message.
That Message
will then be sent to a Message Channel that is resolved in a way that is similar to the replyChannel resolution.
First, if the request Message
being handled at the time the Exception
occurred contains an errorChannel header (the
header name is defined in the constant: MessageHeaders.ERROR_CHANNEL
), the ErrorMessage
will be sent to that channel.
Otherwise, the error handler will send to a "global" channel whose bean name is "errorChannel"
(this is also defined as a constant: IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME
).
A default "errorChannel" bean is created behind the scenes by the Framework. However, you can just as easily define your own if you want to control the settings.
<int:channel id="errorChannel"> <int:queue capacity="500"/> </int:channel>
Note | |
---|---|
The default "errorChannel" is a |
The most important thing to understand here is that the messaging-based error handling will only apply to Exceptions
that are thrown by a Spring Integration task that is executing within a TaskExecutor
.
This does not apply to Exceptions thrown by a handler that is operating within the same thread as the sender (e.g.
through a DirectChannel
as described above).
Note | |
---|---|
When Exceptions occur in a scheduled poller task’s execution, those exceptions will be wrapped in |
To enable global error handling, simply register a handler on that channel.
For example, you can configure Spring Integration’s ErrorMessageExceptionTypeRouter
as the handler of an endpoint that is subscribed to the errorChannel.
That router can then spread the error messages across multiple channels based on Exception
type.
Certain global framework properties can be overridden by providing a properties file on the classpath.
The default properties can be found in /META-INF/spring.integration.default.properties
in the spring-integration-core
jar.
You can see them on GitHub here,
but here are the current default values:
spring.integration.channels.autoCreate=true spring.integration.channels.maxUnicastSubscribers=0x7fffffff spring.integration.channels.maxBroadcastSubscribers=0x7fffffff spring.integration.taskScheduler.poolSize=10 spring.integration.messagingTemplate.throwExceptionOnLateReply=false spring.integration.messagingAnnotations.require.componentAnnotation=false spring.integration.readOnly.headers= spring.integration.endpoints.noAutoStartup= spring.integration.postProcessDynamicBeans=false
When true, | |
This property provides the default number of subscribers allowed on, say, a | |
This property provides the default number of subscribers allowed on, say, a | |
The number of threads available in the default | |
When | |
When | |
A comma-separated list of message header names which should not be populated into | |
A comma-separated list of | |
A boolean flag to indicate that |
These properties can be overridden by adding a file /META-INF/spring.integration.properties
to the classpath.
It is not necessary to provide all the properties, just those that you want to override.
Note | |
---|---|
In versions prior to 4.3, these property names had a typographical error ( |
In addition to the XML namespace support for configuring Message Endpoints, it is also possible to use annotations.
First, Spring Integration provides the class-level @MessageEndpoint
as a stereotype annotation, meaning that it is itself annotated with Spring’s @Component
annotation and is therefore recognized automatically as a bean definition when using Spring component-scanning.
Even more important are the various method-level annotations that indicate the annotated method is capable of handling a message. The following example demonstrates both:
@MessageEndpoint public class FooService { @ServiceActivator public void processMessage(Message message) { ... } }
Exactly what it means for the method to "handle" the Message depends on the particular annotation. Annotations available in Spring Integration include:
The behavior of each is described in its own chapter or section within this reference.
Note | |
---|---|
If you are using XML configuration in combination with annotations, the |
In most cases, the annotated handler method should not require the Message
type as its parameter.
Instead, the method parameter type can match the message’s payload type.
public class FooService { @ServiceActivator public void bar(Foo foo) { ... } }
When the method parameter should be mapped from a value in the MessageHeaders
, another option is to use the parameter-level @Header
annotation.
In general, methods annotated with the Spring Integration annotations can either accept the Message
itself, the message payload, or a header value (with @Header) as the parameter.
In fact, the method can accept a combination, such as:
public class FooService { @ServiceActivator public void bar(String payload, @Header("x") int valueX, @Header("y") int valueY) { ... } }
There is also a @Headers annotation that provides all of the Message headers as a Map:
public class FooService { @ServiceActivator public void bar(String payload, @Headers Map<String, Object> headerMap) { ... } }
Note | |
---|---|
The value of the annotation can also be a SpEL expression (e.g., |
For several of these annotations, when a Message-handling method returns a non-null value, the endpoint will attempt to send a reply. This is consistent across both configuration options (namespace and annotations) in that such an endpoint’s output channel will be used if available, and the REPLY_CHANNEL message header value will be used as a fallback.
Tip | |
---|---|
The combination of output channels on endpoints and the reply channel message header enables a pipeline approach where multiple components have an output channel, and the final component simply allows the reply message to be forwarded to the reply channel as specified in the original request message. In other words, the final component depends on the information provided by the original sender and can dynamically support any number of clients as a result. This is an example of Return Address. |
In addition to the examples shown here, these annotations also support inputChannel and outputChannel properties.
@Service public class FooService { @ServiceActivator(inputChannel="input", outputChannel="output") public void bar(String payload, @Headers Map<String, Object> headerMap) { ... } }
The processing of these annotations creates the same beans (AbstractEndpoint
s and MessageHandler
s (or MessageSource
s for the inbound channel adapter - see below) as with similar xml components.
The bean names are generated with this pattern: [componentName].[methodName].[decapitalizedAnnotationClassShortName]
(e.g for the sample above - fooService.bar.serviceActivator
)
for the AbstractEndpoint
and the same name with an additional .handler
(.source
) suffix for the MessageHandler
(MessageSource
) bean.
The MessageHandler
s (MessageSource
s) are also eligible to be tracked by Section 9.3, “Message History”.
Starting with version 4.0, all Messaging Annotations provide SmartLifecycle
options - autoStartup
and phase
to allow endpoint lifecycle control on application context initialization.
They default to true
and 0
respectively.
To change the state of an endpoint (e.g` start()/stop()) obtain a reference to the endpoint bean using the `BeanFactory
(or autowiring) and invoke the method(s), or send a command message to the Control Bus
(Section 9.6, “Control Bus”).
For these purposes you should use the beanName
mentioned above.
@Poller
Before Spring Integration 4.0, the above Messaging Annotations required that the inputChannel
was a reference to a SubscribableChannel
.
For PollableChannel
s there was need to use a <int:bridge/>
, to configure a <int:poller/>
to make the composite endpoint - a PollingConsumer
.
Starting with version 4.0, the @Poller
annotation has been introduced to allow the configuration of poller
attributes directly on the above Messaging Annotations:
public class AnnotationService { @Transformer(inputChannel = "input", outputChannel = "output", poller = @Poller(maxMessagesPerPoll = "${poller.maxMessagesPerPoll}", fixedDelay = "${poller.fixedDelay}")) public String handle(String payload) { ... } }
This annotation provides only simple PollerMetadata
options.
The @Poller
's attributes maxMessagesPerPoll
, fixedDelay
, fixedRate
and cron
can be configured with _property-placeholder_s.
If it is necessary to provide more polling options (e.g.
transaction, advice-chain, error-handler), the`PollerMetadata` should be configured as a generic bean with its bean name used for @Poller
's value
attribute.
In this case, no other attributes are allowed (they would be specified on the PollerMetadata
bean).
Note, if inputChannel
is PollableChannel
and no @Poller
is configured, the default PollerMetadata
will be used, if it is present in the application context.
To declare the default poller using @Configuration
, use:
@Bean(name = PollerMetadata.DEFAULT_POLLER) public PollerMetadata defaultPoller() { PollerMetadata pollerMetadata = new PollerMetadata(); pollerMetadata.setTrigger(new PeriodicTrigger(10)); return pollerMetadata; }
With this endpoint using the default poller:
public class AnnotationService { @Transformer(inputChannel = "aPollableChannel", outputChannel = "output") public String handle(String payload) { ... } }
To use a named poller, use:
@Bean public PollerMetadata myPoller() { PollerMetadata pollerMetadata = new PollerMetadata(); pollerMetadata.setTrigger(new PeriodicTrigger(1000)); return pollerMetadata; }
With this endpoint using the default poller:
public class AnnotationService { @Transformer(inputChannel = "aPollableChannel", outputChannel = "output" poller = @Poller("myPoller")) public String handle(String payload) { ... } }
@InboundChannelAdapter
Starting with version 4.0, the @InboundChannelAdapter
method annotation is available.
This produces a SourcePollingChannelAdapter
integration component based on a MethodInvokingMessageSource
for the annotated method.
This annotation is an analogue of <int:inbound-channel-adapter>
XML component and has the same restrictions: the method cannot have parameters, and the return type must not be void
.
It has two attributes: value
- the required MessageChannel
bean name and poller
- an optional @Poller
annotation, as described above.
If there is need to provide some MessageHeaders
, use a Message<?>
return type and build the Message<?>
within the method using a MessageBuilder
to configure its MessageHeaders
.
@InboundChannelAdapter("counterChannel") public Integer count() { return this.counter.incrementAndGet(); } @InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixed-rate = "5000")) public String foo() { return "foo"; }
Starting with version 4.3 the channel
alias for the value
annotation attribute has been introduced for better
source code readability.
Also the target MessageChannel
bean is resolved in the SourcePollingChannelAdapter
by the provided name
(outputChannelName
options) on the first receive()
call, not during
initialization phase.
It allows the late binding logic, when the target MessageChannel
bean from the consumer perspective
is created and registered a bit later than the @InboundChannelAdapter
parsing phase.
The first example requires that the default poller has been declared elsewhere in the application context.
@MessagingGateway
See Section 8.4.6, “@MessagingGateway Annotation”.
@IntegrationComponentScan
The standard Spring Framework @ComponentScan
annotation doesn’t scan interfaces for stereotype @Component
annotations.
To overcome this limitation and allow the configuration of @MessagingGateway
(see Section 8.4.6, “@MessagingGateway Annotation”),
the @IntegrationComponentScan
mechanism has been introduced.
This annotation must be placed along with a @Configuration
annotation, and customized for the scanning options,
such as basePackages
and basePackageClasses
.
In this case all discovered interfaces annotated with @MessagingGateway
will be parsed and registered
as a GatewayProxyFactoryBean
s.
All other class-based components are parsed by the standard @ComponentScan
.
In future, more scanning logic may be added to the @IntegrationComponentScan
.
==== Messaging Meta-Annotations
Starting with version 4.0, all Messaging Annotations can be configured as meta-annotations and all user-defined Messaging Annotations can define the same attributes to override their default values. In addition, meta-annotations can be configured hierarchically:
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @ServiceActivator(inputChannel = "annInput", outputChannel = "annOutput") public @interface MyServiceActivator { String[] adviceChain = { "annAdvice" }; } @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @MyServiceActivator public @interface MyServiceActivator1 { String inputChannel(); String outputChannel(); } ... @MyServiceActivator1(inputChannel = "inputChannel", outputChannel = "outputChannel") public Object service(Object payload) { ... }
This allows users to set defaults for various attributes and enables isolation of framework Java dependencies to user annotations, avoiding their use in user classes. If the framework finds a method with a user annotation that has a framework meta-annotation, it is treated as if the method was annotated directly with the framework annotation.
Starting with version 4.0, Messaging Annotations can be configured on @Bean
method definitions in @Configuration
classes, to produce Message Endpoints based on the beans, not methods.
It is useful when @Bean
definitions are "out of the box" MessageHandler
s (AggregatingMessageHandler
, DefaultMessageSplitter
etc.), Transformer
s (JsonToObjectTransformer
, ClaimCheckOutTransformer
etc.), MessageSource
s (FileReadingMessageSource
, RedisStoreMessageSource
etc.):
@Configuration @EnableIntegration public class MyFlowConfiguration { @Bean @InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "1000")) public MessageSource<String> consoleSource() { return CharacterStreamReadingMessageSource.stdin(); } @Bean @Transformer(inputChannel = "inputChannel", outputChannel = "httpChannel") public ObjectToMapTransformer toMapTransformer() { return new ObjectToMapTransformer(); } @Bean @ServiceActivator(inputChannel = "httpChannel") public MessageHandler httpHandler() { HttpRequestExecutingMessageHandler handler = new HttpRequestExecutingMessageHandler("http://foo/service"); handler.setExpectedResponseType(String.class); handler.setOutputChannelName("outputChannel"); return handler; } @Bean @ServiceActivator(inputChannel = "outputChannel") public LoggingHandler loggingHandler() { return new LoggingHandler("info"); } }
The meta-annotation rules work on @Bean
methods as well (@MyServiceActivator
above can be applied to a @Bean
definition).
Note | |
---|---|
When using these annotations on consumer |
Note | |
---|---|
The bean names are generated with this algorithm:
* The |
Important | |
---|---|
When using these annotations on |
Note | |
---|---|
With Java & Annotation configuration we can use any |
@Bean @ServiceActivator(inputChannel = "skippedChannel") @Profile("foo") public MessageHandler skipped() { return System.out::println; }
Together with the existing Spring Container logic, the Messaging Endpoint bean, based on the @ServiceActivator
annotation, won’t be registered as well.
==== Creating a Bridge with Annotations
Starting with version 4.0, the Messaging Annotation and Java configuration provides @BridgeFrom
and @BridgeTo
@Bean
method annotations to mark MessageChannel
beans in @Configuration
classes.
This is just for completeness, providing a convenient mechanism to declare a`BridgeHandler` and its Message Endpoint configuration:
@Bean public PollableChannel bridgeFromInput() { return new QueueChannel(); } @Bean @BridgeFrom(value = "bridgeFromInput", poller = @Poller(fixedDelay = "1000")) public MessageChannel bridgeFromOutput() { return new DirectChannel(); } @Bean public QueueChannel bridgeToOutput() { return new QueueChannel(); } @Bean @BridgeTo("bridgeToOutput") public MessageChannel bridgeToInput() { return new DirectChannel(); }
These annotations can be used as meta-annotations as well.
==== Advising Annotated Endpoints
See Section 8.9.7, “Advising Endpoints Using Annotations”.
=== Message Mapping rules and conventions
Spring Integration implements a flexible facility to map Messages to Methods and their arguments without providing extra configuration by relying on some default rules as well as defining certain conventions.
Single un-annotated parameter (object or primitive) which is not a Map/Properties with non-void return type;
public String foo(Object o);
Details:
Input parameter is Message Payload. If parameter type is not compatible with Message Payload an attempt will be made to convert it using Conversion Service provided by Spring 3.0. The return value will be incorporated as a Payload of the returned Message
Single un-annotated parameter (object or primitive) which is not a Map/Properties with Message return type;
public Message foo(Object o);
Details:
Input parameter is Message Payload. If parameter type is not compatible with Message Payload an attempt will be made to convert it using Conversion Service provided by Spring 3.0. The return value is a newly constructed Message that will be sent to the next destination.
_Single parameter which is a Message or its subclass with arbitrary object/primitive return type; _
public int foo(Message msg);
Details:
Input parameter is Message itself. The return value will become a payload of the Message that will be sent to the next destination.
Single parameter which is a Message or its subclass with Message or its subclass as a return type;
public Message foo(Message msg);
Details:
Input parameter is Message itself. The return value is a newly constructed Message that will be sent to the next destination.
Single parameter which is of type Map or Properties with Message as a return type;
public Message foo(Map m);
Details:
This one is a bit interesting. Although at first it might seem like an easy mapping straight to Message Headers, the preference is always given to a Message Payload. This means that if Message Payload is of type Map, this input argument will represent Message Payload. However if Message Payload is not of type Map, then no conversion via Conversion Service will be attempted and the input argument will be mapped to Message Headers.
Two parameters where one of them is arbitrary non-Map/Properties type object/primitive and another is Map/Properties type object (regardless of the return)
public Message foo(Map h, <T> t);
Details:
This combination contains two input parameters where one of them is of type Map. Naturally the non-Map parameters (regardless of the order) will be mapped to a Message Payload and the Map/Properties (regardless of the order) will be mapped to Message Headers giving you a nice POJO way of interacting with Message structure.
No parameters (regardless of the return)
public String foo();
Details:
This Message Handler method will be invoked based on the Message sent to the input channel this handler is hooked up to, however no Message data will be mapped, thus making Message act as event/trigger to invoke such handlerThe output will be mapped according to the rules above
No parameters, void return
public void foo();
Details:
Same as above, but no output
Annotation based mappings
Annotation based mapping is the safest and least ambiguous approach to map Messages to Methods. There wil be many pointers to annotation based mapping throughout this manual, however here are couple of examples:
public String foo(@Payload String s, @Header("foo") String b)
Very simple and explicit way of mapping Messages to method. As you’ll see later on, without an annotation this signature would result in an ambiguous condition. However by explicitly mapping the first argument to a Message Payload and the second argument to a value of the foo Message Header, we have avoided any ambiguity.
public String foo(@Payload String s, @RequestParam("foo") String b)
Looks almost identical to the previous example, however @RequestMapping or any other non-Spring Integration mapping annotation is irrelevant and therefore will be ignored leaving the second parameter unmapped. Although the second parameter could easily be mapped to a Payload, there can only be one Payload. Therefore this method mapping is ambiguous.
public String foo(String s, @Header("foo") String b)
The same as above. The only difference is that the first argument will be mapped to the Message Payload implicitly.
public String foo(@Headers Map m, @Header("foo") Map f, @Header("bar") String bar)
Yet another signature that would definitely be treated as ambiguous without annotations because it has more than 2 arguments. Furthermore, two of them are Maps. However, with annotation-based mapping, the ambiguity is easily avoided. In this example the first argument is mapped to all the Message Headers, while the second and third argument map to the values of Message Headers foo and bar. The payload is not being mapped to any argument.
Multiple parameters:
Multiple parameters could create a lot of ambiguity with regards to determining the appropriate mappings. The general advice is to annotate your method parameters with @Payload and/or @Header/@Headers Below are some of the examples of ambiguous conditions which result in an Exception being raised.
public String foo(String s, int i)
public String foo(String s, Map m, String b)
public String foo(Map m, Map f)
Tip | |
---|---|
Basically any method signature with more than one method argument which is not (Map, <T>), and those parameters are not annotated, will result in an ambiguous condition thus triggering an Exception. |
Multiple methods:
Message Handlers with multiple methods are mapped based on the same rules that are described above, however some scenarios might still look confusing.
Multiple methods (same or different name) with legal (mappable) signatures:
public class Foo { public String foo(String str, Map m); public String foo(Map m); }
As you can see, the Message could be mapped to either method. The first method would be invoked where Message Payload could be mapped to str and Message Headers could be mapped to m. The second method could easily also be a candidate where only Message Headers are mapped to m. To make meters worse both methods have the same name which at first might look very ambiguous considering the following configuration:
<int:service-activator input-channel="input" output-channel="output" method="foo"> <bean class="org.bar.Foo"/> </int:service-activator>
At this point it would be important to understand Spring Integration mapping Conventions where at the very core, mappings are based on Payload first and everything else next. In other words the method whose argument could be mapped to a Payload will take precedence over all other methods.
On the other hand let’s look at slightly different example:
public class Foo { public String foo(String str, Map m); public String foo(String str); }
If you look at it you can probably see a truly ambiguous condition. In this example since both methods have signatures that could be mapped to a Message Payload. They also have the same name. Such handler methods will trigger an Exception. However if the method names were different you could influence the mapping with a method attribute (see below):
public class Foo { public String foo(String str, Map m); public String bar(String str); }
<int:service-activator input-channel="input" output-channel="output" method="bar"> <bean class="org.bar.Foo"/> </int:service-activator>
Now there is no ambiguity since the configuration explicitly maps to the bar method which has no name conflicts.
The definitive source of information about Spring Integration is the Spring Integration Home at https://spring.io. That site serves as a hub of information and is the best place to find up-to-date announcements about the project as well as links to articles, blogs, and new sample applications.
=== Changes between 4.1 and 4.2
Please be sure to also see the Migration Guide for important changes that might affect your applications. Migration guides for all versions back to 2.1 can be found on the Wiki.
==== Major Management/JMX Rework
A new MetricsFactory
strategy interface has been introduced.
This, together with other changes in the JMX and management infrastructure provides much more control over management
configuration and runtime performance.
However, this has some important implications for (some) user environments.
For complete details, see Section 9.1, “Metrics and Management” and the section called “JMX Improvements”.
The MongoDbMetadataStore
is now available. For more information, see Section 22.3.2, “MongoDB Metadata Store”.
==== SecuredChannel Annotation
The @SecuredChannel
annotation has been introduced, replacing the deprecated ChannelSecurityInterceptorFactoryBean
.
For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.
==== SecurityContext Propagation
The SecurityContextPropagationChannelInterceptor
has been
introduced for the SecurityContext
propagation from one message flow’s Thread to another.
For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.
The FileSplitter
, which splits text files into lines, was added in 4.1.2.
It now has full support in the int-file:
namespace; see Section 14.5, “File Splitter” for more information.
Zookeeper support has been added to the framework to assist when running on a clustered/multi-host environment.
See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.
A new thread <int:barrier/>
component is available allowing a thread to be suspended until some asynchronous event
occurs.
See Section 6.8, “Thread Barrier” for more information.
STOMP support has been added to the framework as inbound and outbound channel adapters pair. See Chapter 28, STOMP Support for more information.
==== Codec
A new Codec
abstraction has been introduced, to encode/decode objects to/from byte[]
.
An implementation that uses Kryo is provided.
Codec-based transformers and message converters are also provided.
See Section 7.4, “Codec” for more information.
==== Message PreparedStatement Setter
A new MessagePreparedStatementSetter
functional interface callback is available for the JdbcMessageHandler
(<int-jdbc:outbound-gateway>
and <int-jdbc:outbound-channel-adapter>
) as an alternative to the
SqlParameterSourceFactory
to populate parameters on the PreparedStatement
with the requestMessage
context.
See Section 18.2, “Outbound Channel Adapter” for more information.
As an alternative to the existing selector
attribute, the <wire-tap/>
now supports the selector-expression
attribute.
See Chapter 14, File Support for more information about these changes.
===== Appending New Lines
The <int-file:outbound-channel-adapter>
and <int-file:outbound-gateway>
now support an append-new-line
attribute.
If set to true
, a new line is appended to the file after a message is written.
The default attribute value is false
.
===== Ignoring Hidden Files
The ignore-hidden
attribute has been introduced for the <int-file:inbound-channel-adapter>
to pick up or not
the hidden files from the source directory.
It is true
by default.
===== Writing InputStream Payloads
The FileWritingMessageHandler
now also accepts InputStream
as a valid message payload type.
===== HeadDirectoryScanner
The HeadDirectoryScanner
can now be used with other FileListFilter
s.
===== Last Modified Filter
The LastModifiedFileListFilter
has been added.
===== WatchService Directory Scanner
The WatchServiceDirectoryScanner
is now available.
===== Persistent File List Filter Changes
The AbstractPersistentFileListFilter
has a new property flushOnUpdate
which, when set to true, will flush()
the
metadata store if it implements Flushable
(e.g. the PropertiesPersistingMetadataStore
).
The ScatterGatherHandler
class has been moved from the org.springframework.integration.handler
to the org.springframework.integration.scattergather
.
==== TCP Changes
The TCP Serializers
no longer flush()
the OutputStream
; this is now done by the TcpNxxConnection
classes.
If you are using the serializers directly within user code, you may have to flush()
the OutputStream
.
===== Server Socket Exceptions
TcpConnectionServerExceptionEvent
s are now published whenever an unexpected exception occurs on a TCP server socket (also added to 4.1.3, 4.0.7).
See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.
If a TCP server socket factory is configured to listen on a random port, the actual port chosen by the OS can now
be obtained using getPort()
.
getServerSocketAddress()
is also available.
See Section 31.3, “TCP Connection Factories” for more information.
===== TCP Gateway Remote Timeout
The TcpOutboundGateway
now supports remote-timeout-expression
as an alternative to the existing remote-timeout
attribute.
This allows setting the timeout based on each message.
Also, the remote-timeout
no longer defaults to the same value as reply-timeout
which has a completely different meaning.
See Table 31.6, “TCP Outbound Gateway Attributes” for more information.
===== TCP SSLSession Available for Header Mapping
TcpConnection
s now support getSslSession()
to enable users to extract information from the session to add to
message headers.
See IP Message Headers for more information.
New events are now published whenever a correlation exception occurs - for example sending a message to a non-existent socket.
The TcpConnectionEventListeningMessageProducer
is deprecated; use the generic event adapter instead.
See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.
Previously, the @Poller
on an inbound channel adapter defaulted the maxMessagesPerPoll
attribute to -1
(infinity).
This was inconsistent with the XML configuration of <inbound-channel-adapter/>
s, which defaults to 1.
The annotation now defaults this attribute to 1.
o.s.integration.util.FunctionIterator
now requires a o.s.integration.util.Function
instead of a
reactor.function.Function
.
This was done to remove an unnecessary hard dependency on Reactor.
Any uses of this iterator will need to change the import.
Of course, Reactor is still supported for functionality such as the Promise
gateway; the dependency was removed for those users who don’t need it.
===== Reply Listener Lazy Initialization
It is now possible to configure the reply listener in JMS outbound gateways to be initialized on-demand and stopped after an idle period, instead of being controlled by the gateway’s lifecycle.
See Section 20.5, “Outbound Gateway” for more information.
===== Conversion Errors in Message-Driven Endpoints
The error-channel
now is used for the conversion errors, which have caused a transaction rollback and message redelivery previously.
See Section 20.2, “Message-Driven Channel Adapter” and Section 20.4, “Inbound Gateway” for more information.
===== Default Acknowledge Mode
When using an implicitly defined DefaultMessageListenerContainer
, the default acknowledge
is now transacted
.
transacted
is recommended when using this container, to avoid message loss.
This default now applies to the message-driven inbound adapter and the inbound gateway, it was already the
default for jms-backed channels.
See Section 20.2, “Message-Driven Channel Adapter” and Section 20.4, “Inbound Gateway” for more information.
===== Shared Subscriptions
Namespace support for shared subscriptions (JMS 2.0) has been added to message-driven endpoints and the
<int-jms:publish-subscribe-channel>
.
Previously, you had to wire up listener containers as <bean/>
s to use shared connections.
See Chapter 20, JMS Support for more information.
==== Conditional Pollers Much more flexibility is now provided for dynamic polling.
See Section 4.2.3, “Conditional Pollers for Message Sources” for more information.
===== Publisher Confirms
The <int-amqp:outbound-gateway>
now supports confirm-correlation-expression
and confirm-(n)ack-channel
attributes with similar purpose as for <int-amqp:outbound-channel-adapter>
.
===== Correlation Data
For both the outbound channel adapter and gateway, if the correlation data is a Message<?>
, it will be the basis
of the message on the ack/nack channel, with the additional header(s) added.
Previously, any correlation data (including Message<?>
) was returned as the payload of the ack/nack message.
===== The Inbound Gateway properties
The <int-amqp:inbound-gateway>
now exposes the amqp-template
attribute to allow more control over an external bean
for the reply RabbitTemplate
or even provide your own AmqpTemplate
implementation.
In addition the default-reply-to
is exposed to be used if request message doesn’t have replyTo
property.
See Chapter 11, AMQP Support for more information.
==== XPath Splitter Improvements
The XPathMessageSplitter
(<int-xml:xpath-splitter>
) now allows the configuration of output-properties
for the internal javax.xml.transform.Transformer
and supports an Iterator
mode (defaults to true
) for the xpath
evaluation org.w3c.dom.NodeList
result.
See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.
===== CORS
The HTTP Inbound Endpoints (<int-http:inbound-channel-adapter>
and <int-http:inbound-gateway>
) now allow the
configuration of Cross-Origin Resource Sharing (CORS).
See Section 17.4.4, “Cross-Origin Resource Sharing (CORS) Support” for more information.
===== Inbound Gateway Timeout
The HTTP inbound gateway can be configured as to what status code to return when a request times out.
The default is now 500 Internal Server Error
instead of 200 OK
.
See Section 17.4.5, “Response StatusCode” for more information.
===== Form Data
Documentation is provided for when proxying multipart/form-data
requests.
See Chapter 17, HTTP Support for more information.
===== Gateway Methods can Return CompletableFuture<?>
When using Java 8, gateway methods can now return CompletableFuture<?>
.
See the section called “CompletableFuture” for more information.
===== MessagingGateway Annotation
The request and reply timeout properties are now String
instead of Long
to allow configuration with property
placeholders or SpEL. See Section 8.4.6, “@MessagingGateway Annotation”.
===== Aggregator Performance
This release includes some performance improvements for aggregating components (aggregator, resequencer, etc),
by more efficiently removing messages from groups when they are released.
New methods (removeMessagesFromGroup
) have been added to the message store.
Set the removeBatchSize
property (default 100
) to adjust the number of messages deleted in each operation.
Currently, JDBC, Redis and MongoDB message stores support this property.
===== Output Message Group Processor
When using a ref
or inner bean for the aggregator, it is now possible to bind a MessageGroupProcessor
directly.
In addition, a SimpleMessageGroupProcessor
is provided that simply returns the collection of messages in the group.
When an output processor produces a collection of Message<?>
, the aggregator releases those messages individually.
Configuring the SimpleMessageGroupProcessor
makes the aggregator a message barrier, were messages are held up
until they all arrive, and are then released individually. See Section 6.4, “Aggregator” for more information.
==== (S)FTP Changes
===== Inbound channel adapters
You can now specify a remote-directory-expression
on the inbound channel adapters, to determine the directory
at runtime.
See Chapter 15, FTP/FTPS Adapters and Chapter 27, SFTP Adapters for more information.
===== Gateway Partial Results
When use FTP/SFTP outbound gateways to operate on multiple files (mget
, mput
), it is possible for an exception to
occur after part of the request is completed.
If such a condition occurs, a PartialSuccessException
is thrown containing the partial results.
See Section 15.7, “FTP Outbound Gateway” and Section 27.10, “SFTP Outbound Gateway” for more information.
===== Delegating Session Factory
A delegating session factory is now available, enabling the selection of a particular session factory based on some thread context value.
See Section 15.3, “Delegating Session Factory” and Section 27.4, “Delegating Session Factory” for more information.
===== Default Sftp Session Factory
Previously, the DefaultSftpSessionFactory
unconditionally allowed connections to unknown hosts.
This is now configurable (default false).
The factory now requires a configured knownHosts
file unless the allowUnknownKeys
property is true
(default
false).
See Section 27.2.1, “Configuration Properties” for more information.
===== Message Session Callback
The MessageSessionCallback<F, T>
has been introduced to perform any custom Session
operation(s) with the
requestMessage
context in the <int-(s)ftp:outbound-gateway/>
.
See Section 15.10, “MessageSessionCallback” and Section 27.12, “MessageSessionCallback” for more information.
==== Websocket Changes
WebSocketHandlerDecoratorFactory
support has been added to the ServerWebSocketContainer
to allow chained customization for the internal WebSocketHandler
.
See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.
==== Application Event Adapters changes
The ApplicationEvent
adapters can now operate with payload
as event
directly allow omitting custom
ApplicationEvent
extensions.
The publish-payload
boolean attribute has been introduced on the <int-event:outbound-channel-adapter>
for this
purpose.
See Chapter 12, Spring ApplicationEvent Support for more information.
=== Changes between 4.0 and 4.1
Please be sure to also see the Migration Guide for important changes that might affect your applications. Migration guides for all versions back to 2.1 can be found on the Wiki.
==== New Components
A Reactor Promise
return type is now supported for Messaging Gateway methods.
See Section 8.4.9, “Asynchronous Gateway”.
The WebSocket module is now available.
It is fully based on the Spring WebSocket and Spring Messaging modules and provides an <inbound-channel-adapter>
and an <outbound-channel-adapter>
.
See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.
===== Scatter-Gather EIP pattern
The Scatter-Gather EIP pattern is now implemented. See Section 6.7, “Scatter-Gather” for more information.
The Routing Slip EIP pattern implementation is now provided. See the section called “Routing Slip” for more information.
===== Idempotent Receiver Pattern
The Idempotent Receiver EIP implementation is now provided via the <idempotent-receiver>
component in XML, or the IdempotentReceiverInterceptor
and IdempotentReceiver
annotation when using Java Configuration.
See Section 8.9.10, “Idempotent Receiver Enterprise Integration Pattern” and their JavaDocs for more information.
The Boon`JsonObjectMapper` is now provided for the JSON transformers. See Section 7.1, “Transformer” for more information.
The <redis-queue-inbound-gateway>
and <redis-queue-outbound-gateway>
components are now provided.
See Section 24.10, “Redis Queue Inbound Gateway” and Section 24.9, “Redis Queue Outbound Gateway”.
The PollSkipAdvice
is now provided to be used within <advice-chain>
of the <poller>
to determine if the current poll should be suppressed (skipped) by some condition implemented with PollSkipStrategy
.
See Section 4.2, “Poller” for more information.
===== AMQP Inbound Endpoints, Channel
Elements that utilize a message listener container (inbound endpoints, channel) now support the missing-queues-fatal
attribute.
See Chapter 11, AMQP Support for more information.
The AMQP outbound endpoints support a new property lazy-connect
(default true).
When true, the connection to the broker is not established until the first message arrives (assuming there are no inbound endpoints, which always attempt to establish the connection during startup).
When set the false an attempt to establish the connection is made during application startup.
See Chapter 11, AMQP Support for more information.
The SimpleMessageStore
no longer makes a copy of the group when calling getMessageGroup()
.
See Caution with SimpleMessageStore for more information.
===== Web Service Outbound Gateway: encode-uri
The <ws:outbound-gateway/>
now provides an encode-uri
attribute to allow disabling the encoding of the URI object before sending the request.
===== Http Inbound Channel Adapter and StatusCode
The <http:inbound-channel-adapter>
can now be configured with a status-code-expression
to override the default 200 OK
status.
See Section 17.4, “HTTP Namespace Support” for more information.
The MQTT channel adapters can now be configured to connect to multiple servers, for example, to support High Availability (HA). See Chapter 23, MQTT Support for more information.
The MQTT message-driven channel adapter now supports specifying the QoS setting for each subscription. See Section 23.2, “Inbound (message-driven) Channel Adapter” for more information.
The MQTT outbound channel adapter now supports asynchronous sends, avoiding blocking until delivery is confirmed. See Section 23.3, “Outbound Channel Adapter” for more information.
It is now possible to programmatically subscribe to and unsubscribe from topics at runtime. See Section 23.2, “Inbound (message-driven) Channel Adapter” for more information.
===== FTP/SFTP Adapter Changes
The FTP and SFTP outbound channel adapters now support appending to remote files, as well as taking specific actions when a remote file already exists.
The remote file templates now also support this as well as rmdir()
and exists()
.
In addition, the remote file templates provide access to the underlying client object enabling access to low-level APIs.
See Chapter 15, FTP/FTPS Adapters and Chapter 27, SFTP Adapters for more information.
Splitter
components now support an Iterator
as the result object for producing output messages.
See Section 6.3, “Splitter” for more information.
Aggregator
s now support a new attribute expire-groups-upon-timeout
.
See Section 6.4.4, “Configuring an Aggregator” for more information.
===== Content Enricher Improvements
An null-result-expression
attribute has been added, which is evaluated and returned if <enricher>
returns null
.
It can be added in <header>
and <property>
.
See Section 7.2, “Content Enricher” for more information.
An error-channel
attribute has been added, which is used to handle an error flow if Exception
occurs downstream of the request-channel
.
This enable you to return an alternative object to use for enrichment.
See Section 7.2, “Content Enricher” for more information.
The <header-enricher/>
's <header-channels-to-string/>
element can now override the header channel registry’s default time for retaining channel mappings.
See the section called “Header Channel Registry” for more information.
Improvements have been made to the orderly shutdown algorithm. See Section 9.7, “Orderly Shutdown” for more information.
===== Management for RecipientListRouter
The RecipientListRouter
provides now several management operations to configure recipients at runtime.
With that the <recipient-list-router>
can now be configured without any <recipient>
from the start.
See the section called “RecipientListRouterManagement” for more information.
===== AbstractHeaderMapper: NON_STANDARD_HEADERS token
The AbstractHeaderMapper
implementations now provides the additional NON_STANDARD_HEADERS
token to map any user-defined headers, which aren’t mapped by default.
See Section 11.12, “AMQP Message Headers” for more information.
===== AMQP Channels: template-channel-transacted
The new template-channel-transacted
attribute has been introduced for AMQP MessageChannel
s.
See Section 11.11, “AMQP Backed Message Channels” for more information.
The default syslog message converter now has an option to retain the original message in the payload, while still setting the headers. See Section 30.2, “Syslog <inbound-channel-adapter>” for more information.
In addition to the Promise
return type mentioned above, gateway methods may now return a ListenableFuture
, introduced in Spring Framework 4.0.
You can also disable the async processing in the gateway, allowing a downstream flow to directly return a Future
.
See Section 8.4.9, “Asynchronous Gateway”.
Aggregator
s and Resequencer
s now support an <expire-advice-chain/>
and <expire-transactional/>
sub-elements to advise the forceComplete
operation.
See Section 6.4.4, “Configuring an Aggregator” for more information.
===== Outbound Channel Adapter and Scripts
The <int:outbound-channel-adapter/>
now supports the <script/>
sub-element.
The underlying script must have a void
return type or return null
.
See Section 8.8, “Groovy support” and Section 8.7, “Scripting support”.
When a message group in a resequencer is timed out (using group-timeout
or a MessageGroupStoreReaper
), late arriving messages will now be discarded immediately by default.
See Section 6.5, “Resequencer”.
===== Optional POJO method parameter
Now Spring Integration consistently handles the Java 8’s Optional
type.
See Section 8.5.2, “Configuring Service Activator”.
===== QueueChannel: backed Queue type
The QueueChannel
backed Queue type
has been changed from BlockingQueue
to the more generic Queue
.
It allows the use of any external Queue
implementation, for example Reactor’s PersistentQueue
.
See the section called “QueueChannel Configuration”.
===== ChannelInterceptor Changes
The ChannelInterceptor
now supports additional afterSendCompletion()
and afterReceiveCompletion()
methods.
See Section 4.1.3, “Channel Interceptors”.
Since version 4.1.1 there is a change of behavior if you explicitly set the javamail property mail.[protocol].peek
to false
(where [protocol]
is imap
or imaps
).
See Important: IMAP PEEK.
=== Changes between 3.0 and 4.0
Please be sure to also see the Migration Guide for important changes that might affect your applications. Migration guides for all versions back to 2.1 can be found on the Wiki.
The MQTT channel adapters (previously available in the Spring Integration Extensions repository) are now available as part of the normal Spring Integration distribution. See Chapter 23, MQTT Support
The @EnableIntegration
annotation has been added, to permit declaration of standard Spring Integration beans when using @Configuration
classes.
See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.
===== @IntegrationComponentScan
The @IntegrationComponentScan
annotation has been added, to permit classpath scanning for Spring Integration specific components.
See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.
Message history can now be enabled with the @EnableMessageHistory
annotation in a @Configuration
class; in addition the message history settings can be modified by a JMX MBean.
In addition auto-created MessageHandler
s for annotated endpoints (e.g.
@ServiceActivator
, @Splitter
etc.) now are also trackable by MessageHistory
.
For more information, see Section 9.3, “Message History”.
Messaging gateway interfaces can now be configured with the @MessagingGateway
annotation.
It is an analogue of the <int:gateway/>
xml element.
For more information, see Section 8.4.6, “@MessagingGateway Annotation”.
===== Spring Boot @EnableAutoConfiguration
As well as the @EnableIntegration
annotation mentioned above, a a hook has been introduced to allow the Spring Integration infrastructure beans to be configured using Spring Boot’s @EnableAutoConfiguration
.
For more information seehttp://docs.spring.io/spring-boot/docs/current/reference/html/using-boot-auto-configuration.html[Spring Boot - AutoConfigure].
===== @GlobalChannelInterceptor
As well as the @EnableIntegration
annotation mentioned above, the @GlobalChannelInterceptor
annotation has bean introduced.
For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.
The @IntegrationConverter
annotation has bean introduced, as an analogue of <int:converter/>
component.
For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.
The @EnablePublisher
annotation has been added, to allow the specification of a default-publisher-channel
for @Publisher
annotations.
See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.
===== Redis Channel Message Stores
A new Redis MessageGroupStore
, that is optimized for use when backing a QueueChannel
for persistence, is now provided.
For more information, see Section 24.4.1, “Redis Channel Message Stores”.
A new Redis ChannelPriorityMessageStore
is now provided.
This can be used to retrieve messages by priority.
For more information, see Section 24.4.1, “Redis Channel Message Stores”.
===== MongodDB Channel Message Store
MongoDB support now provides the MongoDbChannelMessageStore
- a channel specific MessageStore
implementation.
With priorityEnabled = true
, it can be used in <int:priority-queue>
s to achieve priority order polling of persisted messages.
For more information see Section 22.3.1, “MongoDB Channel Message Store”.
===== @EnableIntegrationMBeanExport
The IntegrationMBeanExporter
can now be enabled with the @EnableIntegrationMBeanExport
annotation in a @Configuration
class.
For more information, see Section 9.2.7, “MBean Exporter”.
===== ChannelSecurityInterceptorFactoryBean
Configuration of Spring Security for message channels using @Configuration
classes is now supported by using a ChannelSecurityInterceptorFactoryBean
.
For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.
The Redis support now provides the <outbound-gateway>
component to perform generic Redis commands using the RedisConnection#execute
method.
For more information, see Section 24.8, “Redis Outbound Command Gateway”.
===== RedisLockRegistry and GemfireLockRegistry
The RedisLockRegistry
and GemfireLockRegistry
are now available supporting global locks visible to multiple application instances/servers.
These can be used with aggregating message handlers across multiple application instances such that group release will occur on only one instance.
For more information, see Section 24.11, “Redis Lock Registry”, Section 16.6, “Gemfire Lock Registry” and Section 6.4, “Aggregator”.
Annotation-based messaging configuration can now have a poller
attribute.
This means that methods annotated with (@ServiceActivator
, @Aggregator
etc.) can now use an inputChannel
that is a reference to a PollableChannel
.
For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.
===== @InboundChannelAdapter and SmartLifecycle for Annotated Endpoints
The @InboundChannelAdapter
method annotation is now available.
It is an analogue of the <int:inbound-channel-adapter>
XML component.
In addition, all Messaging Annotations now provide SmartLifecycle
options.
For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.
===== Twitter Search Outbound Gateway
A new twitter endpoint <int-twitter-search-outbound-gateway/>
has been added.
Unlike the search inbound adapter which polls using the same search query each time, the outbound gateway allows on-demand customized queries.
For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.
The GemfireMetadataStore
is provided, allowing it to be used, for example, in a AbstractPersistentAcceptOnceFileListFilter
implementation in a multiple application instance/server environment.
For more information, see Section 9.5, “Metadata Store”, Section 14.2, “Reading Files”, Section 15.4, “FTP Inbound Channel Adapter” and Section 27.7, “SFTP Inbound Channel Adapter”.
===== @BridgeFrom and @BridgeTo Annotations
Annotation and Java configuration has introduced @BridgeFrom
and @BridgeTo
@Bean
method annotations to mark MessageChannel
beans in @Configuration
classes.
For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.
===== Meta Messaging Annotations
Messaging Annotations (@ServiceActivator
, @Router
, @MessagingGateway
etc.) can now be configured as meta-annotations for user-defined Messaging Annotations.
In addition the user-defined annotations can have the same attributes (inputChannel
, @Poller
, autoStartup
etc.).
For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.
===== Requires Spring Framework 4.0
Core messaging abstractions (Message
, MessageChannel
etc) have moved to the Spring Framework spring-messaging
module.
Users who reference these classes directly in their code will need to make changes as described in the first section of the Migration Guide.
===== Header Type for XPath Header Enricher
The header-type
attribute has been introduced for the header
sub-element of the <int-xml:xpath-header-enricher>
.
This attribute provides the target type for the header value to which the result of the XPath expression evaluation will be converted.
For more information see Section 31.3.2, “TCP Failover Client Connection Factory”.
===== Object To Json Transformer: Node Result
The result-type
attribute has been introduced for the <int:object-to-json-transformer>
.
This attribute provides the target type for the result of object mapping to JSON.
It supports STRING
(default) and NODE
.
For more information see the section called “JSON Transformers”.
The DefaultJmsHeaderMapper
now maps an incoming JMSPriority
header to the Spring Integration priority
header.
Previously priority
was only considered for outbound messages.
For more information see Section 20.6, “Mapping Message Headers to/from JMS Message”.
===== JMS Outbound Channel Adapter
The JMS outbound channel adapter now supports the session-transacted
attribute (default false).
Previously, you had to inject a customized JmsTemplate
to use transactions.
See Section 20.3, “Outbound Channel Adapter”.
===== JMS Inbound Channel Adapter
The JMS inbound channel adapter now supports the session-transacted
attribute (default false).
Previously, you had to inject a customized JmsTemplate
to use transactions (the adapter allowed transacted in the acknowledgeMode which was incorrect, and didn’t work; this value is no longer allowed).
SeeSection 20.1, “Inbound Channel Adapter”.
You can now specify a MessageConverter
to be used when converting (if necessary) payloads to one of the accepted datatype
s in a Datatype channel.
For more information see the section called “Datatype Channel Configuration”.
===== Simpler Retry Advice Configuration
Simplified namespace support has been added to configure a RequestHandlerRetryAdvice
.
For more information see the section called “Configuring the Retry Advice”.
===== Correlation Endpoint: Time-based Release Strategy
The mutually exclusive group-timeout
and group-timeout-expression
attributes have been added to the <int:aggregator>
and <int:resequencer>
.
These attributes allow forced completion of a partial MessageGroup
, if the ReleaseStrategy
does not release a group and no further messages arrive within the time specified.
For more information see Section 6.4.4, “Configuring an Aggregator”.
The RedisMetadataStore
now implements ConcurrentMetadataStore
, allowing it to be used, for example, in a AbstractPersistentAcceptOnceFileListFilter
implementation in a multiple application instance/server environment.
For more information, see Section 24.5, “Redis Metadata Store”, Section 14.2, “Reading Files”, Section 15.4, “FTP Inbound Channel Adapter” and Section 27.7, “SFTP Inbound Channel Adapter”.
===== JdbcChannelMessageStore and PriorityChannel
The JdbcChannelMessageStore
now implements PriorityCapableChannelMessageStore
, allowing it to be used as a message-store
reference for priority-queue
s.
For more information, see Section 18.4.2, “Backing Message Channels”.
===== AMQP Endpoints Delivery Mode
Spring AMQP, by default, creates persistent messages on the broker.
This behavior can be overridden by setting the amqp_deliveryMode
header and/or customizing the mappers.
A convenient default-delivery-mode
attribute has now been added to the adapters to provide easier configuration of this important setting.
For more information, see Section 11.5, “Outbound Channel Adapter” and Section 11.6, “Outbound Gateway”.
The DefaultFtpSessionFactory
now exposes the connectTimeout
, defaultTimeout
and dataTimeout
properties, avoiding the need to subclass the factory just to set these common properties.
The postProcess*
methods are still available for more advanced configuration.
See Section 15.2, “FTP Session Factory” for more information.
===== Twitter: StatusUpdatingMessageHandler
The StatusUpdatingMessageHandler
(<int-twitter:outbound-channel-adapter>
) now supports the tweet-data-expression
attribute to build a org.springframework.social.twitter.api.TweetData
object for updating the timeline status allowing, for example, attaching an image.
See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.
===== JPA Retrieving Gateway: id-expression
The id-expression
attribute has been introduced for <int-jpa:retrieving-outbound-gateway>
to perform EntityManager.find(Class entityClass, Object primaryKey)
.
See Section 19.6.3, “Retrieving Outbound Gateway” for more information.
===== TCP Deserialization Events
When one of the standard deserializers encounters a problem decoding the input stream to a message, it will now emit a TcpDeserializationExceptionEvent
, allowing applications to examine the data at the point the exception occurred.
See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.
===== Messaging Annotations on @Bean Definitions
Messaging Annotations (@ServiceActivator
, @Router
, @InboundChannelAdapter
etc.) can now be configured on @Bean
definitions in @Configuration
classes.
For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.
=== Changes Between 2.2 and 3.0
The HTTP module now provides powerful Request Mapping support for Inbound Endpoints.
Class UriPathHandlerMapping
was replaced by IntegrationRequestMappingHandlerMapping
, which is registered under the bean name integrationRequestMappingHandlerMapping
in the application context.
Upon parsing of the HTTP Inbound Endpoint, a new IntegrationRequestMappingHandlerMapping
bean is either registered or an existing bean is being reused.
To achieve flexible Request Mapping configuration, Spring Integration provides the <request-mapping/>
sub-element for the <http:inbound-channel-adapter/>
and the <http:inbound-gateway/>
.
Both HTTP Inbound Endpoints are now fully based on the Request Mapping infrastructure that was introduced with Spring MVC 3.1.
For example, multiple paths are supported on a single inbound endpoint.
For more information see Section 17.4, “HTTP Namespace Support”.
===== Spring Expression Language (SpEL) Configuration
A new IntegrationEvaluationContextFactoryBean
is provided to allow configuration of custom PropertyAccessor
s and functions for use in SpEL expressions throughout the framework.
For more information see Section 31.3.2, “TCP Failover Client Connection Factory”.
To customize the SpEL EvaluationContext
with static Method
functions, the new <spel-function/>
component is introduced.
Two built-in functions are also provided (#jsonPath
and #xpath
).
For more information see Section 31.3.2, “TCP Failover Client Connection Factory”.
===== SpEL PropertyAccessors Support
To customize the SpEL EvaluationContext
with PropertyAccessor
implementations the new <spel-property-accessors/>
component is introduced.
For more information see Section 31.3.2, “TCP Failover Client Connection Factory”.
A new Redis-based MetadataStore implementation has been added.
The RedisMetadataStore
can be used to maintain state of a MetadataStore
across application restarts.
This new MetadataStore
implementation can be used with adapters such as:
New queue-based components have been added.
The <int-redis:queue-inbound-channel-adapter/>
and the <int-redis:queue-outbound-channel-adapter/>
components are provided to perform right pop and left push operations on a Redis List, respectively.
For more information see Chapter 24, Redis Support.
It is now possible to instruct the framework to store reply and error channels in a registry for later resolution.
This is useful for cases where the replyChannel
or errorChannel
might be lost; for example when serializing a message.
See Section 7.2.2, “Header Enricher” for more information.
===== MongoDB support: New ConfigurableMongoDbMessageStore
In addition to the existing eMongoDbMessageStore
, a new ConfigurableMongoDbMessageStore
has been introduced.
This provides a more robust and flexible implementation of MessageStore
for MongoDB.
It does not have backward compatibility, with the existing store, but it is recommended to use it for new applications.
Existing applications can use it, but messages in the old store will not be available.
See Chapter 22, MongoDb Support for more information.
Building on the 2.2 SyslogToMapTransformer
Spring Integration 3.0 now introduces UDP
and TCP
inbound channel adapters especially tailored for receiving SYSLOG messages.
For more information, seeChapter 30, Syslog Support.
File 'tail’ing inbound channel adapters are now provided to generate messages when lines are added to the end of text files; see Section 14.2.6, “'Tail’ing Files”.
<int-jmx:tree-polling-channel-adapter/>
is provided; this adapter queries the JMX MBean tree and sends a message with a payload that is the graph of objects that matches the query.
By default the MBeans are mapped to primitives and simple Objects like Map, List and arrays - permitting simple transformation, for example, to JSON.
IntegrationMBeanExporter
now allows the configuration of a custom ObjectNamingStrategy
using the naming-strategy
attribute.
For more information, see Section 9.2, “JMX Support”.
===== TCP/IP Connection Events and Connection Management
TcpConnection
s now emit ApplicationEvent
s (specifically TcpConnectionEvent
s) when connections are opened, closed, or an exception occurs.
This allows applications to be informed of changes to TCP connections using the normal Spring ApplicationListener
mechanism.
AbstractTcpConnection
has been renamed TcpConnectionSupport
; custom connections that are subclasses of this class, can use its methods to publish events.
Similarly, AbstractTcpConnectionInterceptor
has been renamed to TcpConnectionInterceptorSupport
.
In addition, a new <int-ip:tcp-connection-event-inbound-channel-adapter/>
is provided; by default, this adapter sends all TcpConnectionEvent
s to a Channel
.
Further, the TCP Connection Factories, now provide a new method getOpenConnectionIds()
, which returns a list of identifiers for all open connections; this allows applications, for example, to broadcast to all open connections.
Finally, the connection factories also provide a new method closeConnection(String connectionId)
which allows applications to explicitly close a connection using its ID.
For more information see Section 31.3.2, “TCP Failover Client Connection Factory”.
===== Inbound Channel Adapter Script Support
The <int:inbound-channel-adapter/>
now supports <expression/>
and <script/>
sub-elements to create a MessageSource
; see Section 4.3.3, “Channel Adapter Expressions and Scripts”.
===== Content Enricher: Headers Enrichment Support
The Content Enricher now provides configuration for <header/>
sub-elements, to enrich the outbound Message with headers based on the reply Message from the underlying message flow.
For more information see Section 7.2.3, “Payload Enricher”.
Previously, message ids were generated using the JDK UUID.randomUUID()
method.
With this release, the default mechanism has been changed to use a more efficient algorithm which is significantly faster.
In addition, the ability to change the strategy used to generate message ids has been added.
For more information see the section called “Message ID Generation”.
GatewayMethodMetadata
is now public class and it makes possible flexibly to configure the GatewayProxyFactoryBean
programmatically from Java code.
For more information see Section 8.4, “Messaging Gateways”.
<http:outbound-gateway/>
and <http:outbound-channel-adapter/>
now provide an encode-uri
attribute to allow disabling the encoding of the URI object before sending the request.
<http:inbound-gateway/>
and <http:inbound-channel-adapter/>
now have a merge-with-default-converters
attribute to include the list of default HttpMessageConverter
s after the custom message converters.
DefaultHttpHeaderMapper
.
Now, in addition correcting that issue, DefaultHttpHeaderMapper
provides date parsing from formatted strings for any HTTP headers that accept date-time values.
<http:inbound-gateway/>
and <http:inbound-channel-adapter/>
now support additional useful variables: #matrixVariables, #requestAttributes, #requestHeaders and #cookies.
These variables are available in both payload and header expressions.
uri-variables-expression
attribute to specify an Expression
to evaluate a Map
for all URI variable placeholders within URL template.
This allows selection of a different map of expressions based on the outgoing message.
For more information see Chapter 17, HTTP Support.
ObjectToJsonTransformer
and JsonToObjectTransformer
now emit/consume headers containing type information.
For more information, see JSON Transformers in Section 7.1, “Transformer”.
===== Chain Elements id Attribute
Previously, the id attribute for elements within a <chain>
was ignored and, in some cases, disallowed.
Now, the id attribute is allowed for all elements within a <chain>
.
The bean names of chain elements is a combination of the surrounding chain’s id and the id of the element itself.
For example: fooChain$child.fooTransformer.handler.
For more information see Section 6.6, “Message Handler Chain”.
===== Aggregator empty-group-min-timeout property
The AbstractCorrelatingMessageHandler
provides a new property empty-group-min-timeout
to allow empty group expiry to run on a longer schedule than expiring partial groups.
Empty groups will not be removed from the MessageStore
until they have not been modified for at least this number of milliseconds.
For more information see Section 6.4.4, “Configuring an Aggregator”.
===== Persistent File List Filters (file, (S)FTP)
New FileListFilter
s that use a persistent MetadataStore
are now available.
These can be used to prevent duplicate files after a system restart.
SeeSection 14.2, “Reading Files”, Section 15.4, “FTP Inbound Channel Adapter”, and Section 27.7, “SFTP Inbound Channel Adapter” for more information.
===== Scripting Support: Variables Changes
A new variables
attribute has been introduced for scripting components.
In addition, variable bindings are now allowed for inline scripts.
See Section 8.8, “Groovy support” and Section 8.7, “Scripting support” for more information.
===== Direct Channel Load Balancing configuration
Previously, when configuring LoadBalancingStrategy
on the channel’s dispatcher sub-element, the only available option was to use a pre-defined enumeration of values which did not allow one to set a custom implementation of the LoadBalancingStrategy
.
You can now use load-balancer-ref
to provide a reference to a custom implementation of the LoadBalancingStrategy
.
For more information see the section called “DirectChannel”.
===== PublishSubscribeChannel Behavior
Previously, sending to a <publish-subscribe-channel/> that had no subscribers would return a false
result.
If used in conjunction with a MessagingTemplate
, this would result in an exception being thrown.
Now, the PublishSubscribeChannel
has a property minSubscribers
(default 0).
If the message is sent to at least the minimum number of subscribers, the send is deemed to be successful (even if zero).
If an application is expecting to get an exception under these conditions, set the minimum subscribers to at least 1.
===== FTP, SFTP and FTPS Changes
The FTP, SFTP and FTPS endpoints no longer cache sessions by default
The deprecated cached-sessions
attribute has been removed from all endpoints.
Previously, the embedded caching mechanism controlled by this attribute’s value didn’t provide a way to limit the size of the cache, which could grow indefinitely.
The CachingConnectionFactory
was introduced in release 2.1 and it became the preferred (and is now the only) way to cache sessions.
The CachingConnectionFactory
now provides a new method resetCache()
.
This immediately closes idle sessions and causes in-use sessions to be closed as and when they are returned to the cache.
The DefaultSftpSessionFactory
(in conjunction with a CachingSessionFactory
) now supports multiplexing channels over a single SSH connection (SFTP Only).
FTP, SFTP and FTPS Inbound Adapters
Previously, there was no way to override the default filter used to process files retrieved from a remote server.
The filter
attribute determines which files are retrieved but the FileReadingMessageSource
uses an AcceptOnceFileListFilter
.
This means that if a new copy of a file is retrieved, with the same name as a previously copied file, no message was sent from the adapter.
With this release, a new attribute local-filter
allows you to override the default filter, for example with an AcceptAllFileListFilter
, or some other custom filter.
For users that wish the behavior of the AcceptOnceFileListFilter
to be maintained across JVM executions, a custom filter that retains state, perhaps on the file system, can now be configured.
Inbound Channel Adapters now support the preserve-timestamp
attribute, which sets the local file modified timestamp to the timestamp from the server (default false).
FTP, SFTP and FTPS Gateways
local-filename-generator-expression
attribute is now supported, enabling the naming of local files during retrieval.
By default, the same name as the remote file is used.
local-directory-expression
attribute is now supported, enabling the naming of local directories during retrieval based on the remote directory.
Remote File Template
A new higher-level abstraction (RemoteFileTemplate
) is provided over the Session
implementations used by the FTP and SFTP modules.
While it is used internally by endpoints, this abstraction can also be used programmatically and, like all Spring *Template
implementations, reliably closes the underlying session while allowing low level access to the session when needed.
For more information, see Chapter 15, FTP/FTPS Adapters and Chapter 27, SFTP Adapters.
===== requires-reply Attribute for Outbound Gateways
All Outbound Gateways (e.g.
<jdbc:outbound-gateway/>
or <jms:outbound-gateway/>
) are designed for request-reply scenarios.
A response is expected from the external service and will be published to the reply-channel
, or the replyChannel
message header.
However, there are some cases where the external system might not always return a result, e.g.
a <jdbc:outbound-gateway/>
, when a SELECT ends with an empty ResultSet
or, say, a Web Service is One-Way.
An option is therefore needed to configure whether or not a reply is required.
For this purpose, the requires-reply attribute has been introduced for Outbound Gateway components.
In most cases, the default value for requires-reply is true
and, if there is not any result, a ReplyRequiredException
will be thrown.
Changing the value to false
means that, if an external service doesn’t return anything, the message-flow will end at that point, similar to an Outbound Channel Adapter.
Note | |
---|---|
The WebService outbound gateway has an additional attribute |
Note, the requiresReply
property was previously present in the AbstractReplyProducingMessageHandler
but set to false
, and there wasn’t any way to configure it on Outbound Gateways using the XML namespace.
Important | |
---|---|
Previously, a gateway receiving no reply would silently end the flow (with a DEBUG log message); with this change an exception will now be thrown by default by most gateways.
To revert to the previous behavior, set |
===== AMQP Outbound Gateway Header Mapping
Previously, the <int-amqp:outbound-gateway/> mapped headers before invoking the message converter, and the converter could overwrite headers such as content-type
.
The outbound adapter maps the headers after the conversion, which means headers like content-type
from the outbound Message
(if present) are used.
Starting with this release, the gateway now maps the headers after the message conversion, consistent with the adapter.
If your application relies on the previous behavior (where the converter’s headers overrode the mapped headers), you either need to filter those headers (before the message reaches the gateway) or set them appropriately.
The headers affected by the SimpleMessageConverter
are content-type
and content-encoding
.
Custom message converters may set other headers.
===== Stored Procedure Components Improvements
For more complex database-specific types, not supported by the standard CallableStatement.getObject
method, 2 new additional attributes were introduced to the <sql-parameter-definition/>
element with OUT-direction:
type-name
return-type
The row-mapper
attribute of the Stored Procedure Inbound Channel Adapter <returning-resultset/>
sub-element now supports a reference to a RowMapper
bean definition.
Previously, it contained just a class name (which is still supported).
For more information see Section 18.5, “Stored Procedures”.
===== Web Service Outbound URI Configuration
Web Service Outbound Gateway uri attribute now supports <uri-variable/>
substitution for all URI-schemes supported by Spring Web Services.
For more information see Section 31.3.2, “TCP Failover Client Connection Factory”.
The Redis Inbound Channel Adapter can now use a null
value for serializer
property, with the raw data being the message payload.
The Redis Outbound Channel Adapter now has the topic-expression
property to determine the Redis topic against the Message at runtime.
The Redis Inbound Channel Adapter, in addition to the existing topics
attribute, now has the topic-patterns
attribute.
For more information, see Chapter 24, Redis Support.
Previously, when a <filter/> had a <request-handler-advice-chain/>, the discard action was all performed within the scope of the advice chain (including any downstream flow on the discard-channel
).
The filter element now has an attribute discard-within-advice
(default true
), to allow the discard action to be performed after the advice chain completes.
See Section 8.9.6, “Advising Filters”.
===== Advising Endpoints using Annotations
Request Handler Advice Chains can now be configured using annotations. See Section 8.9.7, “Advising Endpoints Using Annotations”.
===== ObjectToStringTransformer Improvements
This transformer now correctly transforms byte[]
and char[]
payloads to String
.
For more information see Section 7.1, “Transformer”.
Payloads to persist or merge can now be of type https://docs.oracle.com/javase/7/docs/api/java/lang/Iterable.html[java.lang.Iterable]
.
In that case, each object returned by the Iterable
is treated as an entity and persisted or merged using the underlying EntityManager
.
NULL values returned by the iterator are ignored.
The JPA adapters now have additional attributes to optionally flush and clear entities from the associated persistence context after performing persistence operations.
Retrieving gateways had no mechanism to specify the first record to be retrieved which is a common use case.
The retrieving gateways now support specifying this parameter using a first-result
and first-result-expression
attributes to the gateway definition.
Section 19.6.3, “Retrieving Outbound Gateway”.
The JPA retrieving gateway and inbound adapter now have an attribute to specify the maximum number of results in a result set as an expression.
In addition, the max-results
attribute has been introduced to replace max-number-of-results
, which has been deprecated.
max-results
and max-results-expression
are used to provide the maximum number of results, or an expression to compute the maximum number of results, respectively, in the result set.
For more information see Chapter 19, JPA Support.
===== Delayer: delay expression
Previously, the <delayer>
provided a delay-header-name
attribute to determine the delay value at runtime.
In complex cases it was necessary to precede the <delayer>
with a <header-enricher>
.
Spring Integration 3.0 introduced the expression
attribute and expression
sub-element for dynamic delay determination.
The delay-header-name
attribute is now deprecated because the header evaluation can be specified in the expression
.
In addition, the ignore-expression-failures
was introduced to control the behavior when an expression evaluation fails.
For more information see Section 8.6, “Delayer”.
===== JDBC Message Store Improvements
Spring Integration 3.0 adds a new set of DDL scripts for MySQL version 5.6.4 and higher. Now MySQL supports fractional seconds and is thus improving the FIFO ordering when polling from a MySQL-based Message Store. For more information, please see Section 18.4.1, “The Generic JDBC Message Store”.
===== IMAP Idle Connection Exceptions
Previously, if an IMAP idle connection failed, it was logged but there was no mechanism to inform an application.
Such exceptions now generate ApplicationEvent
s.
Applications can obtain these events using an <int-event:inbound-channel-adapter>
or any ApplicationListener
configured to receive an ImapIdleExceptionEvent
or one of its super classes.
The TCP connection factories now enable the configuration of a flexible mechanism to transfer selected headers (as well as the payload) over TCP.
A new TcpMessageMapper
enables the selection of the headers, and an appropriate (de)serializer needs to be configured to write the resulting Map
to the TCP stream.
A MapJsonSerializer
is provided as a convenient mechanism to transfer headers and payload over TCP.
For more information see Section 31.3.2, “TCP Failover Client Connection Factory”.
===== JMS Message Driven Channel Adapter
Previously, when configuring a <message-driven-channel-adapter/>
, if you wished to use a specific TaskExecutor
, it was necessary to declare a container bean and provide it to the adapter using the container
attribute.
The task-executor
is now provided, allowing it to be set directly on the adapter.
This is in addition to several other container attributes that were already available.
The RMI Inbound Gateway now supports an error-channel
attribute.
See Section 26.3, “Inbound RMI”.
You can now specify the transformer factory class name using the transformer-factory-class
attribute.
See Section 31.3.2, “TCP Failover Client Connection Factory”
=== Changes between 2.1 and 2.2
===== RedisStore Inbound and Outbound Channel Adapters
Spring Integration now has RedisStore Inbound and Outbound Channel Adapters allowing you to write and read Message payloads to/from Redis collection(s). For more information please see Section 24.7, “RedisStore Outbound Channel Adapter” and Section 24.6, “RedisStore Inbound Channel Adapter”.
===== MongoDB Inbound and Outbound Channel Adapters
Spring Integration now has MongoDB Inbound and Outbound Channel Adapters allowing you to write and read Message payloads to/from a MongoDB document store. For more information please see Section 22.5, “MongoDB Outbound Channel Adapter” and Section 22.4, “MongoDB Inbound Channel Adapter”.
Spring Integration now includes components for the Java Persistence API (JPA) for retrieving and persisting JPA entity objects. The JPA Adapter includes the following components:
For more information please see Chapter 19, JPA Support
===== Spring 3.1 Used by Default
Spring Integration now uses Spring 3.1.
===== Adding Behavior to Endpoints
The ability to add an <advice-chain/> to a poller has been available for some time. However, the behavior added by this affects the entire integration flow. It did not address the ability to add, say, retry, to an individual endpoint. The 2.2. release introduces the <request-handler-advice-chain/> to many endpoints.
In addition, 3 standard Advice classes have been provided for this purpose:
For more information, see Section 8.9, “Adding Behavior to Endpoints”.
===== Transaction Synchronization and Pseudo Transactions
Pollers can now participate in Spring’s Transaction Synchronization feature. This allows for synchronizing such operations as renaming files by an inbound channel adapter depending on whether the transaction commits, or rolls back.
In addition, these features can be enabled when there is not a real transaction present, by means of a PseudoTransactionManager
.
For more information see Section 31.3.2, “TCP Failover Client Connection Factory”.
===== File Adapter - Improved File Overwrite/Append Handling
When using the File Oubound Channel Adapter or the File Outbound Gateway, a new mode property was added. Prior to Spring Integration 2.2, target files were replaced when they existed. Now you can specify the following options:
For more information please see Section 14.3.3, “Dealing with Existing Destination Files”.
===== Reply-Timeout added to more Outbound Gateways
The XML Namespace support adds the reply-timeout attribute to the following Outbound Gateways:
Spring Integration now uses Spring AMQP 1.1. This enables several features to be used within a Spring Integration application, including…
===== JDBC Support - Stored Procedures Components
SpEL Support
When using the Stored Procedure components of the Spring Integration JDBC Adapter, you can now provide Stored Procedure Names or Stored Function Names using Spring Expression Language (SpEL).
This allows you to specify the Stored Procedures to be invoked at runtime. For example, you can provide Stored Procedure names that you would like to execute via Message Headers. For more information please see Section 18.5, “Stored Procedures”.
JMX Support
The Stored Procedure components now provide basic JMX support, exposing some of their properties as MBeans:
===== JDBC Support - Outbound Gateway
When using the JDBC Outbound Gateway, the update query is no longer mandatory. You can now provide solely a select query using the request message as a source of parameters.
===== JDBC Support - Channel-specific Message Store Implementation
A new Message Channel-specific Message Store Implementation has been added, providing a more scalable solution using database-specific SQL queries. For more information please see: Section 18.4.2, “Backing Message Channels”.
A method stopActiveComponents()
has been added to the IntegrationMBeanExporter.
This allows a Spring Integration application to be shut down in an orderly manner, disallowing new inbound messages to certain adapters and waiting for some time to allow in-flight messages to complete.
===== JMS Oubound Gateway Improvements
The JMS Outbound Gateway can now be configured to use a`MessageListener` container to receive replies. This can improve performance of the gateway.
===== object-to-json-transformer
The ObjectToJsonTransformer
now sets the content-type header to application/json by default.
For more information see Section 7.1, “Transformer”.
Java serialization over HTTP is no longer enabled by default.
Previously, when setting a expected-response-type
to a Serializable
object, the Accept
header was not properly set up.
The SerializingHttpMessageConverter
has now been updated to set the Accept header to application/x-java-serialized-object
.
However, because this could cause incompatibility with existing applications, it was decided to no longer automatically add this converter to the HTTP endpoints.
If you wish to use Java serialization, you will need to add the SerializingHttpMessageConverter
to the appropriate endpoints, using the message-converters
attribute, when using XML configuration, or using the setMessageConverters()
method.
Alternatively, you may wish to consider using JSON instead which is enabled by simply having Jackson
on the classpath.
=== Changes between 2.0 and 2.1
===== JSR-223 Scripting Support
In Spring Integration 2.0, support for Groovy was added. With Spring Integration 2.1 we expanded support for additional languages substantially by implementing support for JSR-223 (Scripting for the Java™ Platform). Now you have the ability to use any scripting language that supports JSR-223 including:
For further details please see Section 8.7, “Scripting support”.
Spring Integration provides support for GemFire by providing inbound adapters for entry and continuous query events, an outbound adapter to write entries to the cache, and MessageStore
and MessageGroupStore
implementations.
Spring integration leverages the Spring Gemfire project, providing a thin wrapper over its components.
For further details please see Chapter 16, GemFire Support.
Spring Integration 2.1 adds several Channel Adapters for receiving and sending messages using thehttp://www.amqp.org/[Advanced Message Queuing Protocol] (AMQP). Furthermore, Spring Integration also provides a point-to-point Message Channel, as well as a publish/subscribe Message Channel that are backed by AMQP Exchanges and Queues.
For further details please see Chapter 11, AMQP Support.
As of version 2.1 Spring Integration provides support for MongoDB by providing a MongoDB-based MessageStore.
For further details please see Chapter 22, MongoDb Support.
As of version 2.1 Spring Integration supports Redis, an advanced key-value store, by providing a Redis-based MessageStore as well as Publish-Subscribe Messaging adapters.
For further details please see Chapter 24, Redis Support.
===== Support for Spring’s Resource abstraction
As of version 2.1, we’ve introduced a new Resource Inbound Channel Adapter that builds upon Spring’s Resource abstraction to support greater flexibility across a variety of actual types of underlying resources, such as a file, a URL, or a class path resource. Therefore, it’s similar to but more generic than the File Inbound Channel Adapter.
For further details please see Section 25.2, “Resource Inbound Channel Adapter”.
===== Stored Procedure Components
With Spring Integration 2.1, the JDBC
Module also provides Stored Procedure support by adding several new components, including inbound/outbound channel adapters and an Outbound Gateway.
The Stored Procedure support leverages Spring’shttp://static.springsource.org/spring/docs/3.0.x/javadoc-api/org/springframework/jdbc/core/simple/SimpleJdbcCall.html[SimpleJdbcCall
] class and consequently supports stored procedures for:
The Stored Procedure components also support Sql Functions for the following databases:
For further details please see Section 18.5, “Stored Procedures”.
===== XPath and XML Validating Filter
Spring Integration 2.1 provides a new XPath-based Message Filter, that is part of the XML
module.
The XPath Filter allows you to filter messages using provided XPath Expressions.
Furthermore, documentation was added for the XML Validating Filter.
For more details please see Section 31.3.2, “TCP Failover Client Connection Factory” and Section 31.3.2, “TCP Failover Client Connection Factory”.
Since Spring Integration 2.1, the Payload Enricher is provided.
A Payload Enricher defines an endpoint that typically passes ahttp://static.springsource.org/spring-integration/api/org/springframework/integration/Message.html[Message
] to the exposed request channel and then expects a reply message.
The reply message then becomes the root object for evaluation of expressions to enrich the target payload.
For further details please see Section 7.2.3, “Payload Enricher”.
===== FTP and SFTP Outbound Gateways
Spring Integration 2.1 provides two new Outbound Gateways in order to interact with remote File Transfer Protocol (FTP) or Secure File Transfer Protocol (SFT) servers. These two gateways allow you to directly execute a limited set of remote commands.
For instance, you can use these Outbound Gateways to list, retrieve and delete remote files and have the Spring Integration message flow continue with the remote server’s response.
For further details please see Section 15.7, “FTP Outbound Gateway” and Section 27.10, “SFTP Outbound Gateway”.
As of version 2.1, we have exposed more flexibility with regards to session management for remote file adapters (e.g., FTP, SFTP etc).
Specifically, the cache-sessions
attribute, which is available via the XML namespace support, is now deprecated.
Alternatively, we added the sessionCacheSize
and sessionWaitTimeout
attributes on the CachingSessionFactory
.
For further details please see Section 15.8, “FTP Session Caching” and Section 27.5, “SFTP Session Caching”.
===== Standardizing Router Configuration
Router parameters have been standardized across all router implementations with Spring Integration 2.1 providing a more consistent user experience.
With Spring Integration 2.1 the ignore-channel-name-resolution-failures
attribute has been removed in favor of consolidating its behavior with the resolution-required
attribute.
Also, the resolution-required
attribute now defaults to true
.
Starting with Spring Integration 2.1, routers will no longer silently drop any messages, if no default output channel was defined.
This means, that by default routers now require at least one resolved channel (if no default-output-channel
was set) and by default will throw a MessageDeliveryException
if no channel was determined (or an attempt to send was not successful).
If, however, you do desire to drop messages silently, simply set default-output-channel="nullChannel"
.
Important | |
---|---|
With the standardization of Router parameters and the consolidation of the parameters described above, there is the possibility of breaking older Spring Integration based applications. |
For further details please see Section 6.1, “Routers”
===== XML Schemas updated to 2.1
Spring Integration 2.1 ships with an updated XML Schema (version 2.1), providing many improvements, e.g. the Router standardizations discussed above.
From now on, users must always declare the latest XML schema (currently version 2.1). Alternatively, they can use the version-less schema. Generally, the best option is to use version-less namespaces, as these will automatically use the latest available version of Spring Integration.
Declaring a version-less Spring Integration namespace:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xsi:schemaLocation="http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd"> ... </beans>
Declaring a Spring Integration namespace using an explicit version:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xsi:schemaLocation="http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration-2.2.xsd http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd"> ... </beans>
The old 1.0 and 2.0 schemas are still there, but if an Application Context still references one of those deprecated schemas, the validator will fail on initialization.
==== Source Control Management and Build Infrastructure
===== Source Code now hosted on Github
Since version 2.0, the Spring Integration project uses Git for version control. In order to increase community visibility even further, the project was moved from SpringSource hosted Git repositories to Github. The Spring Integration Git repository is located at: spring-integration.
For the project we also improved the process of providing code contributions and we ensure that every commit is peer-reviewed. In fact, core committers now follow the same process as contributors. For more details please see:
===== Improved Source Code Visibility with Sonar
In an effort to provide better source code visibility and consequently to monitor the quality of Spring Integration’s source code, an instance of Sonar was setup and metrics are gathered nightly and made available at:
For the 2.1 release of Spring Integration we also expanded the Spring Integration Samples project and added many new samples, e.g. samples covering AMQP support, the new payload enricher, a sample illustrating techniques for testing Spring Integration flow fragments, as well as an example for executing Stored Procedures against Oracle. For details please visit:
=== Changes between 1.0 and 2.0
For a detailed migration guide in regards to upgrading an existing application that uses Spring Integration older than version 2.0, please see:
Spring Integration 1.0 to 2.0 Migration Guide
Spring Integration 2.0 is built on top of Spring 3.0.5 and makes many of its features available to our users.
===== Support for the Spring Expression Language (SpEL)
You can now use SpEL expressions within the transformer, router, filter, splitter, aggregator, service-activator, header-enricher, and many more elements of the Spring Integration core namespace as well as various adapters. There are many samples provided throughout this manual.
===== ConversionService and Converter
You can now benefit from Conversion Service support provided with Spring while configuring many Spring Integration components such as Datatype Channel. See Section 4.1.2, “Message Channel Implementations” as well Section 8.5.1, “Introduction”. Also, the SpEL support mentioned in the previous point also relies upon the ConversionService. Therefore, you can register Converters once, and take advantage of them anywhere you are using SpEL expressions.
===== TaskScheduler and Trigger
Spring 3.0 defines two new strategies related to scheduling: TaskScheduler and Trigger Spring Integration (which uses a lot of scheduling) now builds upon these. In fact, Spring Integration 1.0 had originally defined some of the components (e.g. CronTrigger) that have now been migrated into Spring 3.0’s core API. Now, you can benefit from reusing the same components within the entire Application Context (not just Spring Integration configuration). Configuration of Spring Integration Pollers has been greatly simplified as well by providing attributes for directly configuring rates, delays, cron expressions, and trigger references. See Section 4.3, “Channel Adapter” for sample configurations.
===== RestTemplate and HttpMessageConverter
Our outbound HTTP adapters now delegate to Spring’s RestTemplate for executing the HTTP request and handling its response. This also means that you can reuse any custom HttpMessageConverter implementations. See Section 17.3, “Http Outbound Components” for more details.
==== Enterprise Integration Pattern Additions
Also in 2.0 we have added support for even more of the patterns described in Hohpe and Woolf’s Enterprise Integration Patterns book.
We now provide support for the Message History pattern allowing you to keep track of all traversed components, including the name of each channel and endpoint as well as the timestamp of that traversal. See Section 9.3, “Message History” for more details.
We now provide support for the Message Store pattern. The Message Store provides a strategy for persisting messages on behalf of any process whose scope extends beyond a single transaction, such as the Aggregator and Resequencer. Many sections of this document provide samples on how to use a Message Store as it affects several areas of Spring Integration. See Section 9.4, “Message Store”, Section 7.3, “Claim Check”, Section 4.1, “Message Channels”, Section 6.4, “Aggregator”, Chapter 18, JDBC Support, and Section 6.5, “Resequencer” for more details
We have added an implementation of the Claim Check pattern. The idea behind the Claim Check pattern is that you can exchange a Message payload for a "claim ticket" and vice-versa. This allows you to reduce bandwidth and/or avoid potential security issues when sending Messages across channels. See Section 7.3, “Claim Check” for more details.
We have provided implementations of the Control Bus pattern which allows you to use messaging to manage and monitor endpoints and channels. The implementations include both a SpEL-based approach and one that executes Groovy scripts. See Section 9.6, “Control Bus” and Section 8.8.2, “Control Bus” for more details.
==== New Channel Adapters and Gateways
We have added several new Channel Adapters and Messaging Gateways in Spring Integration 2.0.
We have added Channel Adapters for receiving and sending messages over the TCP and UDP internet protocols. See Chapter 31, TCP and UDP Support for more details. Also, you can checkout the following blog: TCP/UDP support
Twitter adapters provides support for sending and receiving Twitter Status updates as well as Direct Messages. You can also perform Twitter Searches with an inbound Channel Adapter. See Section 31.3.2, “TCP Failover Client Connection Factory” for more details.
The new XMPP adapters support both Chat Messages and Presence events. See Section 31.3.2, “TCP Failover Client Connection Factory” for more details.
Inbound and outbound File transfer support over FTP/FTPS is now available. See Chapter 15, FTP/FTPS Adapters for more details.
Inbound and outbound File transfer support over SFTP is now available. See Chapter 27, SFTP Adapters for more details.
We have also added Channel Adapters for receiving news feeds (ATOM/RSS). See Chapter 13, Feed Adapter for more details.
With Spring Integration 2.0 we’ve added Groovy support allowing you to use Groovy scripting language to provide integration and/or business logic. See Section 8.8, “Groovy support” for more details.
These symmetrical transformers convert payload objects to and from a Map. See Section 7.1, “Transformer” for more details.
These symmetrical transformers convert payload objects to and from JSON. See Section 7.1, “Transformer” for more details.
===== Serialization Transformers
These symmetrical transformers convert payload objects to and from byte arrays. They also support the Serializer and Deserializer strategy interfaces that have been added as of Spring 3.0.5. See Section 7.1, “Transformer” for more details.
The core API went through some significant refactoring to make it simpler and more usable. Although we anticipate that the impact to the end user should be minimal, please read through this document to find what was changed. Especially, visit Section 6.1.5, “Dynamic Routers” , Section 8.4, “Messaging Gateways”, Section 17.3, “Http Outbound Components”, Section 5.1, “Message”, and Section 6.4, “Aggregator” for more details. If you are depending directly on some of the core components (Message, MessageHeaders, MessageChannel, MessageBuilder, etc.), you will notice that you need to update any import statements. We restructured some packaging to provide the flexibility we needed for extending the domain model while avoiding any cyclical dependencies (it is a policy of the framework to avoid such "tangles").
==== New Source Control Management and Build Infrastructure
With Spring Integration 2.0 we have switched our build environment to use Git for source control. To access our repository simply follow this URL: https://git.springsource.org/spring-integration. We have also switched our build system to Gradle.
==== New Spring Integration Samples
With Spring Integration 2.0 we have decoupled the samples from our main release distribution. Please read this blog to get more info New Spring Integration Samples We have also created many new samples, including samples for every new Adapter.
==== Spring Tool Suite Visual Editor for Spring Integration
There is an amazing new visual editor for Spring Integration included within the latest version of SpringSource Tool Suite. If you are not already using STS, please download it here: