31. TCP and UDP Support

Spring Integration provides Channel Adapters for receiving and sending messages over internet protocols. Both UDP (User Datagram Protocol) and TCP (Transmission Control Protocol) adapters are provided. Each adapter provides for one-way communication over the underlying protocol. In addition, simple inbound and outbound tcp gateways are provided. These are used when two-way communication is needed.

31.1 Introduction

Two flavors each of UDP inbound and outbound channel adapters are provided UnicastSendingMessageHandler sends a datagram packet to a single destination. UnicastReceivingChannelAdapter receives incoming datagram packets. MulticastSendingMessageHandler sends (broadcasts) datagram packets to a multicast address. MulticastReceivingChannelAdapter receives incoming datagram packets by joining to a multicast address.

TCP inbound and outbound channel adapters are provided TcpSendingMessageHandler sends messages over TCP. TcpReceivingChannelAdapter receives messages over TCP.

An inbound TCP gateway is provided; this allows for simple request/response processing. While the gateway can support any number of connections, each connection can only process serially. The thread that reads from the socket waits for, and sends, the response before reading again. If the connection factory is configured for single use connections, the connection is closed after the socket times out.

An outbound TCP gateway is provided; this allows for simple request/response processing. If the associated connection factory is configured for single use connections, a new connection is immediately created for each new request. Otherwise, if the connection is in use, the calling thread blocks on the connection until either a response is received or a timeout or I/O error occurs.

The TCP and UDP inbound channel adapters, and the TCP inbound gateway, support the "error-channel" attribute. This provides the same basic functionality as described in Section 8.4.1, “Enter the GatewayProxyFactoryBean”.

31.2 UDP Adapters

<int-ip:udp-outbound-channel-adapter id="udpOut"
    host="somehost"
    port="11111"
    multicast="false"
    channel="exampleChannel"/>

A simple UDP outbound channel adapter.

[Tip]Tip

When setting multicast to true, provide the multicast address in the host attribute.

UDP is an efficient, but unreliable protocol. Two attributes are added to improve reliability. When check-length is set to true, the adapter precedes the message data with a length field (4 bytes in network byte order). This enables the receiving side to verify the length of the packet received. If a receiving system uses a buffer that is too short the contain the packet, the packet can be truncated. The length header provides a mechanism to detect this.

Starting with version 4.3, the port can be set to 0, in which case the Operating System chooses the port; the chosen port can be discovered by invoking getPort() after the adapter is started and isListening() returns true.

<int-ip:udp-outbound-channel-adapter id="udpOut"
    host="somehost"
    port="11111"
    multicast="false"
    check-length="true"
    channel="exampleChannel"/>

An outbound channel adapter that adds length checking to the datagram packets.

[Tip]Tip

The recipient of the packet must also be configured to expect a length to precede the actual data. For a Spring Integration UDP inbound channel adapter, set its check-length attribute.

The second reliability improvement allows an application-level acknowledgment protocol to be used. The receiver must send an acknowledgment to the sender within a specified time.

<int-ip:udp-outbound-channel-adapter id="udpOut"
    host="somehost"
    port="11111"
    multicast="false"
    check-length="true"
    acknowledge="true"
    ack-host="thishost"
    ack-port="22222"
    ack-timeout="10000"
    channel="exampleChannel"/>

An outbound channel adapter that adds length checking to the datagram packets and waits for an acknowledgment.

[Tip]Tip

Setting acknowledge to true implies the recipient of the packet can interpret the header added to the packet containing acknowledgment data (host and port). Most likely, the recipient will be a Spring Integration inbound channel adapter.

[Tip]Tip

When multicast is true, an additional attribute min-acks-for-success specifies how many acknowledgments must be received within the ack-timeout.

For even more reliable networking, TCP can be used.

Starting with version 4.3, the ackPort can be set to 0, in which case the Operating System chooses the port.

Also starting with version 4.3, the destination-expression and socket-expression options are available for the <int-ip:udp-outbound-channel-adapter> (UnicastSendingMessageHandler).

The destination-expression can be used as a runtime alternative to the hardcoded host/port pair to determine the destination address for the outgoing datagram packet against requestMessage as a root object for evaluation context. The expression must evaluate to URI, or String in the URI style (see RFC-2396) or SocketAddress. The new IpHeaders.PACKET_ADDRESS header can be used for this expression as well. In the Framework this header is populated by the DatagramPacketMessageMapper, when we receive datagrams in the UnicastReceivingChannelAdapter and convert them to messages. The header value is exactly the result of DatagramPacket.getSocketAddress() of incoming datagram.

With the socket-expression help the Outbound Channel Adapter can use e.g. Inbound Channel Adapter socket to send datagrams through same port which they were received. It’s useful in a scenario when our application works as a UDP server and clients operate behind the NAT. This expression must evaluate to the DatagramSocket. The requestMessage is used as a root object for evaluation context. The socket-expression parameter cannot be used with parameters like multicast and acknowledge.

<int-ip:udp-inbound-channel-adapter id="inbound" port="0" channel="in" />

<int:channel id="in" />

<int:transformer expression="new String(payload).toUpperCase()"
                       input-channel="in" output-channel="out"/>

<int:channel id="out" />

<int-ip:udp-outbound-channel-adapter id="outbound"
                        socket-expression="@inbound.socket"
                        destination-expression="headers['ip_packetAddress']"
                        channel="out" />
<int-ip:udp-inbound-channel-adapter id="udpReceiver"
    channel="udpOutChannel"
    port="11111"
    receive-buffer-size="500"
    multicast="false"
    check-length="true"/>

A basic unicast inbound udp channel adapter.

<int-ip:udp-inbound-channel-adapter id="udpReceiver"
    channel="udpOutChannel"
    port="11111"
    receive-buffer-size="500"
    multicast="true"
    multicast-address="225.6.7.8"
    check-length="true"/>

A basic multicast inbound udp channel adapter.

By default, reverse DNS lookups are done on inbound packets to convert IP addresses to hostnames for use in message headers. In environments where DNS is not configured, this can cause delays. This default behavior can be overridden by setting the lookup-host attribute to "false".

31.3 TCP Connection Factories

For TCP, the configuration of the underlying connection is provided using a Connection Factory. Two types of connection factory are provided; a client connection factory and a server connection factory. Client connection factories are used to establish outgoing connections; Server connection factories listen for incoming connections.

A client connection factory is used by an outbound channel adapter but a reference to a client connection factory can also be provided to an inbound channel adapter and that adapter will receive any incoming messages received on connections created by the outbound adapter.

A server connection factory is used by an inbound channel adapter or gateway (in fact the connection factory will not function without one). A reference to a server connection factory can also be provided to an outbound adapter; that adapter can then be used to send replies to incoming messages to the same connection.

[Tip]Tip

Reply messages will only be routed to the connection if the reply contains the header ip_connectionId that was inserted into the original message by the connection factory.

[Tip]Tip

This is the extent of message correlation performed when sharing connection factories between inbound and outbound adapters. Such sharing allows for asynchronous two-way communication over TCP. By default, only payload information is transferred using TCP; therefore any message correlation must be performed by downstream components such as aggregators or other endpoints. Support for transferring selected headers was introduced in version 3.0. For more information refer to Section 31.3.2, “TCP Failover Client Connection Factory”.

A maximum of one adapter of each type may be given a reference to a connection factory.

Connection factories using java.net.Socket and java.nio.channel.SocketChannel are provided.

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"/>

A simple server connection factory that uses java.net.Socket connections.

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    using-nio="true"/>

A simple server connection factory that uses java.nio.channel.SocketChannel connections.

[Note]Note

Starting with Spring Integration version 4.2, if the server is configured to listen on a random port (0), the actual port chosen by the OS can be obtained using getPort(). Also, getServerSocketAddress() is available to get the complete SocketAddress. See the javadocs for the TcpServerConnectionFactory interface for more information.

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"/>

A client connection factory that uses java.net.Socket connections and creates a new connection for each message.

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"
    using-nio=true/>

A client connection factory that uses java.nio.channel.Socket connections and creates a new connection for each message.

TCP is a streaming protocol; this means that some structure has to be provided to data transported over TCP, so the receiver can demarcate the data into discrete messages. Connection factories are configured to use (de)serializers to convert between the message payload and the bits that are sent over TCP. This is accomplished by providing a deserializer and serializer for inbound and outbound messages respectively. A number of standard (de)serializers are provided.

The ByteArrayCrlfSerializer*, converts a byte array to a stream of bytes followed by carriage return and linefeed characters (\r\n). This is the default (de)serializer and can be used with telnet as a client, for example.

The ByteArraySingleTerminatorSerializer*, converts a byte array to a stream of bytes followed by a single termination character (default 0x00).

The ByteArrayLfSerializer*, converts a byte array to a stream of bytes followed by a single linefeed character (0x0a).

The ByteArrayStxEtxSerializer*, converts a byte array to a stream of bytes preceded by an STX (0x02) and followed by an ETX (0x03).

The ByteArrayLengthHeaderSerializer, converts a byte array to a stream of bytes preceded by a binary length in network byte order (big endian). This a very efficient deserializer because it does not have to parse every byte looking for a termination character sequence. It can also be used for payloads containing binary data; the above serializers only support text in the payload. The default size of the length header is 4 bytes (Integer), allowing for messages up to (2^31 - 1) bytes. However, the length header can be a single byte (unsigned) for messages up to 255 bytes, or an unsigned short (2 bytes) for messages up to (2^16 - 1) bytes. If you need any other format for the header, you can subclass this class and provide implementations for the readHeader and writeHeader methods. The absolute maximum data size supported is (2^31 - 1) bytes.

The ByteArrayRawSerializer*, converts a byte array to a stream of bytes and adds no additional message demarcation data; with this (de)serializer, the end of a message is indicated by the client closing the socket in an orderly fashion. When using this serializer, message reception will hang until the client closes the socket, or a timeout occurs; a timeout will NOT result in a message. When this serializer is being used, and the client is a Spring Integration application, the client must use a connection factory that is configured with single-use=true - this causes the adapter to close the socket after sending the message; the serializer will not, itself, close the connection. This serializer should only be used with connection factories used by channel adapters (not gateways), and the connection factories should be used by either an inbound or outbound adapter, and not both.

[Note]Note

Before version 4.2.2, when using NIO, this serializer treated a timeout (during read) as an end of file and the data read so far was emitted as a message. This is unreliable and should not be used to delimit messages; it now treats such conditions as an exception. In the unlikely event you are using it this way, the previous behavior can be restored by setting the treatTimeoutAsEndOfMessage constructor argument to true.

Each of these is a subclass of AbstractByteArraySerializer which implements both org.springframework.core.serializer.Serializer and org.springframework.core.serializer.Deserializer. For backwards compatibility, connections using any subclass of AbstractByteArraySerializer for serialization will also accept a String which will be converted to a byte array first. Each of these (de)serializers converts an input stream containing the corresponding format to a byte array payload.

To avoid memory exhaustion due to a badly behaved client (one that does not adhere to the protocol of the configured serializer), these serializers impose a maximum message size. If the size is exceeded by an incoming message, an exception will be thrown. The default maximum message size is 2048 bytes, and can be increased by setting the maxMessageSize property. If you are using the default (de)serializer and wish to increase the maximum message size, you must declare it as an explicit bean with the property set and configure the connection factory to use that bean.

The classes marked with * above use an intermediate buffer and copy the decoded data to a final buffer of the correct size. Starting with version 4.3, these can be configured with a poolSize property to allow these raw buffers to be reused instead of being allocated and discarded for each message, which is the default behavior. Setting the property to a negative value will create a pool that has no bounds. If the pool is bounded, you can also set the poolWaitTimeout property (milliseconds) after which an exception is thrown if no buffer becomes available; it defaults to infinity. Such an exception will cause the socket to be closed.

If you wish to use the same mechanism in custom deserializers, subclass AbstractPooledBufferByteArraySerializer instead of its super class AbstractByteArraySerializer, and implement doDeserialize() instead of deserialize(). The buffer will be returned to the pool automatically. AbstractPooledBufferByteArraySerializer also provides a convenient utility method copyToSizedArray().

The MapJsonSerializer uses a Jackson ObjectMapper to convert between a Map and JSON. This can be used in conjunction with a MessageConvertingTcpMessageMapper and a MapMessageConverter to transfer selected headers and the payload in a JSON format.

[Note]Note

The Jackson ObjectMapper cannot demarcate messages in the stream. Therefore, the MapJsonSerializer needs to delegate to another (de)serializer to handle message demarcation. By default, a ByteArrayLfSerializer is used, resulting in messages with the format <json><LF> on the wire, but you can configure it to use others instead.

The final standard serializer is org.springframework.core.serializer.DefaultSerializer which can be used to convert Serializable objects using java serialization.org.springframework.core.serializer.DefaultDeserializer is provided for inbound deserialization of streams containing Serializable objects.

To implement a custom (de)serializer pair, implement the org.springframework.core.serializer.Deserializer and org.springframework.core.serializer.Serializer interfaces.

If you do not wish to use the default (de)serializer (ByteArrayCrLfSerializer), you must supply serializer and deserializer attributes on the connection factory (example below).

<bean id="javaSerializer"
      class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
      class="org.springframework.core.serializer.DefaultDeserializer" />

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    deserializer="javaDeserializer"
    serializer="javaSerializer"/>

A server connection factory that uses java.net.Socket connections and uses Java serialization on the wire.

For full details of the attributes available on connection factories, see the reference at the end of this section.

By default, reverse DNS lookups are done on inbound packets to convert IP addresses to hostnames for use in message headers. In environments where DNS is not configured, this can cause connection delays. This default behavior can be overridden by setting the lookup-host attribute to "false".

[Note]Note

It is possible to modify the creation of and/or attributes of sockets - see Section 31.3.2, “TCP Failover Client Connection Factory”. As is noted there, such modifications are possible whether or not SSL is being used.

31.3.1 TCP Caching Client Connection Factory

As noted above, TCP sockets can be single-use (one request/response) or shared. Shared sockets do not perform well with outbound gateways, in high-volume environments, because the socket can only process one request/response at a time.

To improve performance, users could use collaborating channel adapters instead of gateways, but that requires application-level message correlation. See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.

Spring Integration 2.2 introduced a caching client connection factory, where a pool of shared sockets is used, allowing a gateway to process multiple concurrent requests with a pool of shared connections.

31.3.2 TCP Failover Client Connection Factory

It is now possible to configure a TCP connection factory that supports failover to one or more other servers. When sending a message, the factory will iterate over all its configured factories until either the message can be sent, or no connection can be found. Initially, the first factory in the configured list is used; if a connection subsequently fails the next factory will become the current factory.

<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
    <constructor-arg>
        <list>
            <ref bean="clientFactory1"/>
            <ref bean="clientFactory2"/>
        </list>
    </constructor-arg>
</bean>
[Note]Note

When using the failover connection factory, the singleUse property must be consistent between the factory itself and the list of factories it is configured to use.

The connection factory has two properties when used with a shared connection (singleUse=false):

  • refreshSharedInterval
  • closeOnRefresh

These are 0 and false to retain the same behavior that existed before the properties were added.

Consider the following scenario based on the above configuration: Let’s say clientFactory1 cannot establish a connection but clientFactory2 can. Each time the failCF getConnection() method is called, we will again attempt to connect using clientFactory1; if successful, the "old" connection will remain open and may be reused in future if the first factory fails once more.

Set refreshSharedInterval to only attempt to reconnect with the first factory after that time has expired; set it to Long.MAX_VALUE if you only want to fail back to the first factory when the current connection fails.

Set closeOnRefresh to close the "old" connection after a refresh actually creates a new connection.

[Important]Important

These properties do not apply if any of the delegate factories is a CachingClientConnectionFactory because the connection caching is handled there; in that case the list of connection factories will always be consulted to get a connection.

[Note]Note

When using the failover connection factory, the singleUse property must be consistent between the factory itself and the list of factories it is configured to use.

=== TCP Connection Interceptors

Connection factories can be configured with a reference to a TcpConnectionInterceptorFactoryChain. Interceptors can be used to add behavior to connections, such as negotiation, security, and other setup. No interceptors are currently provided by the framework but, for an example, see the InterceptedSharedConnectionTests in the source repository.

The HelloWorldInterceptor used in the test case works as follows:

When configured with a client connection factory, when the first message is sent over a connection that is intercepted, the interceptor sends Hello over the connection, and expects to receive world!. When that occurs, the negotiation is complete and the original message is sent; further messages that use the same connection are sent without any additional negotiation.

When configured with a server connection factory, the interceptor requires the first message to be Hello and, if it is, returns world!. Otherwise it throws an exception causing the connection to be closed.

All TcpConnection methods are intercepted. Interceptor instances are created for each connection by an interceptor factory. If an interceptor is stateful, the factory should create a new instance for each connection; if there is no state, the same interceptor can wrap each connection. Interceptor factories are added to the configuration of an interceptor factory chain, which is provided to a connection factory using the interceptor-factory attribute. Interceptors must extend TcpConnectionInterceptorSupport; factories must implement the TcpConnectionInterceptorFactory interface. TcpConnectionInterceptorSupport is provided with passthrough methods; by extending this class, you only need to implement those methods you wish to intercept.

<bean id="helloWorldInterceptorFactory"
    class="o.s.i.ip.tcp.connection.TcpConnectionInterceptorFactoryChain">
    <property name="interceptors">
        <array>
            <bean class="o.s.i.ip.tcp.connection.HelloWorldInterceptorFactory"/>
        </array>
    </property>
</bean>

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="12345"
    using-nio="true"
    single-use="true"
    interceptor-factory-chain="helloWorldInterceptorFactory"/>

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="12345"
    single-use="true"
    so-timeout="100000"
    using-nio="true"
    interceptor-factory-chain="helloWorldInterceptorFactory"/>

Configuring a connection interceptor factory chain.

=== TCP Connection Events

Beginning with version 3.0, changes to TcpConnection s are reported by TcpConnectionEvent s. TcpConnectionEvent is a subclass of ApplicationEvent and thus can be received by any ApplicationListener defined in the ApplicationContext.

[Note]Note

The following is deprecated as of version 4.2; use the generic Event Inbound Channel Adapter instead. See Section 12.1, “Receiving Spring Application Events”.

For convenience, a <int-ip:tcp-connection-event-inbound-channel-adapter/> is provided. This adapter will receive all TcpConnectionEvent s (by default), and send them to its channel. The adapter accepts an event-type attribute, which is a list of class names for events that should be sent. This can be used if an application subclasses TcpConnectionEvent for some reason, and wishes to only receive those events. Omitting this attribute will mean that all TcpConnectionEvent s will be sent. You can also use this to limit which TcpConnectionEvent s you are interested in ( TcpConnectionOpenEvent, TcpConnectionCloseEvent, or TcpConnectionExceptionEvent).

TcpConnectionEvents have the following properties:

  • connectionId - the connection identifier which can be used in a message header to send data to the connection
  • connectionFactoryName - the bean name of the connection factory the connection belongs to
  • throwable - the Throwable (for TcpConnectionExceptionEvent events only)
  • source - the TcpConnection; this can be used, for example, to determine the remote IP Address with getHostAddress() (cast required)

In addition, since version 4.0 the standard deserializers discussed in Section 31.3, “TCP Connection Factories” now emit TcpDeserializationExceptionEvent s when problems are encountered decoding the data stream. These events contain the exception, the buffer that was in the process of being built, and an offset into the buffer (if available) at the point the exception occurred. Applications can use a normal ApplicationListener, or see Section 12.1, “Receiving Spring Application Events”, to capture these events, allowing analysis of the problem.

Starting with versions 4.0.7, 4.1.3, TcpConnectionServerExceptionEvent s are published whenever an unexpected exception occurs on a server socket (such as a BindException when the server socket is in use). These events have a reference to the connection factory and the cause.

Starting with version 4.2, TcpConnectionFailedCorrelationEvent s are published whenever an endpoint (inbound gateway or collaborating outbound channel adapter) receives a message that cannot be routed to a connection because the ip_connectionId header is invalid. Outbound gateways also publish this event when a late reply is received (the sender thread has timed out). The event contains the connection id as well as an exception in the cause property that contains the failed message.

Starting with version 4.3, a TcpConnectionServerListeningEvent is emitted when a server connection factory is started. This is useful when the factory is configured to listen on port 0, meaning that the operating system chooses the port. It can also be used instead of polling isListening(), if you need to wait before starting some other process that will connect to the socket.

[Important]Important

To avoid delaying the listening thread from accepting connections, the event is published on a separate thread.

Starting with version 4.3.2, a TcpConnectionFailedEvent is emitted whenever a client connection can’t be created. The source of the event is the connection factory which can be used to determine the host and port to which the connection could not be established.

=== TCP Adapters

TCP inbound and outbound channel adapters that utilize the above connection factories are provided. These adapters have attributes connection-factory and channel. The channel attribute specifies the channel on which messages arrive at an outbound adapter and on which messages are placed by an inbound adapter. The connection-factory attribute indicates which connection factory is to be used to manage connections for the adapter. While both inbound and outbound adapters can share a connection factory, server connection factories are always owned by an inbound adapter; client connection factories are always owned by an outbound adapter. One, and only one, adapter of each type may get a reference to a connection factory.

<bean id="javaSerializer"
      class="org.springframework.core.serializer.DefaultSerializer"/>
<bean id="javaDeserializer"
      class="org.springframework.core.serializer.DefaultDeserializer"/>

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    deserializer="javaDeserializer"
    serializer="javaSerializer"
    using-nio="true"
    single-use="true"/>

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="#{server.port}"
    single-use="true"
    so-timeout="10000"
    deserializer="javaDeserializer"
    serializer="javaSerializer"/>

<int:channel id="input" />

<int:channel id="replies">
    <int:queue/>
</int:channel>

<int-ip:tcp-outbound-channel-adapter id="outboundClient"
    channel="input"
    connection-factory="client"/>

<int-ip:tcp-inbound-channel-adapter id="inboundClient"
    channel="replies"
    connection-factory="client"/>

<int-ip:tcp-inbound-channel-adapter id="inboundServer"
    channel="loop"
    connection-factory="server"/>

<int-ip:tcp-outbound-channel-adapter id="outboundServer"
    channel="loop"
    connection-factory="server"/>

<int:channel id="loop"/>

In this configuration, messages arriving in channel input are serialized over connections created by client received at the server and placed on channel loop. Since loop is the input channel for outboundServer the message is simply looped back over the same connection and received by inboundClient and deposited in channel replies. Java serialization is used on the wire.

Normally, inbound adapters use a type="server" connection factory, which listens for incoming connection requests. In some cases, it is desirable to establish the connection in reverse, whereby the inbound adapter connects to an external server and then waits for inbound messages on that connection.

This topology is supported by using client-mode="true" on the inbound adapter. In this case, the connection factory must be of type client and must have single-use set to false.

Two additional attributes are used to support this mechanism: retry-interval specifies (in milliseconds) how often the framework will attempt to reconnect after a connection failure. scheduler is used to supply a TaskScheduler used to schedule the connection attempts, and to test that the connection is still active.

For an outbound adapter, the connection is normally established when the first message is sent. client-mode="true" on an outbound adapter will cause the connection to be established when the adapter is started. Adapters are automatically started by default. Again, the connection factory must be of type client and have single-use set to false and retry-interval and scheduler are also supported. If a connection fails, it will be re-established either by the scheduler or when the next message is sent.

For both inbound and outbound, if the adapter is started, you may force the adapter to establish a connection by sending a <control-bus /> command: @adapter_id.retryConnection() and examine the current state with @adapter_id.isClientModeConnected().

=== TCP Gateways

The inbound TCP gateway TcpInboundGateway and outbound TCP gateway TcpOutboundGateway use a server and client connection factory respectively. Each connection can process a single request/response at a time.

The inbound gateway, after constructing a message with the incoming payload and sending it to the requestChannel, waits for a response and sends the payload from the response message by writing it to the connection.

[Note]Note

For the inbound gateway, care must be taken to retain, or populate, the ip_connectionId header because it is used to correlate the message to a connection. Messages that originate at the gateway will automatically have the header set. If the reply is constructed as a new message, you will need to set the header. The header value can be captured from the incoming message.

As with inbound adapters, inbound gateways normally use a type="server" connection factory, which listens for incoming connection requests. In some cases, it is desirable to establish the connection in reverse, whereby the inbound gateway connects to an external server and then waits for, and replies to, inbound messages on that connection.

This topology is supported by using client-mode="true" on the inbound gateway. In this case, the connection factory must be of type client and must have single-use set to false.

Two additional attributes are used to support this mechanism: retry-interval specifies (in milliseconds) how often the framework will attempt to reconnect after a connection failure. scheduler is used to supply a TaskScheduler used to schedule the connection attempts, and to test that the connection is still active.

If the gateway is started, you may force the gateway to establish a connection by sending a <control-bus /> command: @adapter_id.retryConnection() and examine the current state with @adapter_id.isClientModeConnected().

The outbound gateway, after sending a message over the connection, waits for a response and constructs a response message and puts in on the reply channel. Communications over the connections are single-threaded. Users should be aware that only one message can be handled at a time and, if another thread attempts to send a message before the current response has been received, it will block until any previous requests are complete (or time out). If, however, the client connection factory is configured for single-use connections each new request gets its own connection and is processed immediately.

<int-ip:tcp-inbound-gateway id="inGateway"
    request-channel="tcpChannel"
    reply-channel="replyChannel"
    connection-factory="cfServer"
    reply-timeout="10000"/>

A simple inbound TCP gateway; if a connection factory configured with the default (de)serializer is used, messages will be \r\n delimited data and the gateway can be used by a simple client such as telnet.

<int-ip:tcp-outbound-gateway id="outGateway"
    request-channel="tcpChannel"
    reply-channel="replyChannel"
    connection-factory="cfClient"
    request-timeout="10000"
    remote-timeout="10000"/> <!-- or e.g.
remote-timeout-expression="headers['timeout']" -->

A simple outbound TCP gateway.

client-mode is not currently available with the outbound gateway.

=== TCP Message Correlation

==== Overview

One goal of the IP Endpoints is to provide communication with systems other than another Spring Integration application. For this reason, only message payloads are sent and received, by default. Since 3.0, headers can be transferred, using JSON, Java serialization, or with custom Serializer s and Deserializer s; see Section 31.3.2, “TCP Failover Client Connection Factory” for more information. No message correlation is provided by the framework, except when using the gateways, or collaborating channel adapters on the server side. In the paragraphs below we discuss the various correlation techniques available to applications. In most cases, this requires specific application-level correlation of messages, even when message payloads contain some natural correlation data (such as an order number).

==== Gateways

The gateways will automatically correlate messages. However, an outbound gateway should only be used for relatively low-volume use. When the connection factory is configured for a single shared connection to be used for all message pairs (single-use="false"), only one message can be processed at a time. A new message will have to wait until the reply to the previous message has been received. When a connection factory is configured for each new message to use a new connection (single-use="true"), the above restriction does not apply. While this may give higher throughput than a shared connection environment, it comes with the overhead of opening and closing a new connection for each message pair.

Therefore, for high-volume messages, consider using a collaborating pair of channel adapters. However, you will need to provide collaboration logic.

Another solution, introduced in Spring Integration 2.2, is to use a CachingClientConnectionFactory, which allows the use of a pool of shared connections.

==== Collaborating Outbound and Inbound Channel Adapters

To achieve high-volume throughput (avoiding the pitfalls of using gateways as mentioned above) you may consider configuring a pair of collaborating outbound and inbound channel adapters. Collaborating adapters can also be used (server-side or client-side) for totally asynchronous communication (rather than with request/reply semantics). On the server side, message correlation is automatically handled by the adapters because the inbound adapter adds a header allowing the outbound adapter to determine which connection to use to send the reply message.

[Note]Note

On the server side, care must be taken to populate the ip_connectionId header because it is used to correlate the message to a connection. Messages that originate at the inbound adapter will automatically have the header set. If you wish to construct other messages to send, you will need to set the header. The header value can be captured from an incoming message.

On the client side, the application will have to provide its own correlation logic, if needed. This can be done in a number of ways.

If the message payload has some natural correlation data, such as a transaction id or an order number, AND there is no need to retain any information (such as a reply channel header) from the original outbound message, the correlation is simple and would done at the application level in any case.

If the message payload has some natural correlation data, such as a transaction id or an order number, but there is a need to retain some information (such as a reply channel header) from the original outbound message, you may need to retain a copy of the original outbound message (perhaps by using a publish-subscribe channel) and use an aggregator to recombine the necessary data.

For either of the previous two paragraphs, if the payload has no natural correlation data, you may need to provide a transformer upstream of the outbound channel adapter to enhance the payload with such data. Such a transformer may transform the original payload to a new object containing both the original payload and some subset of the message headers. Of course, live objects (such as reply channels) from the headers can not be included in the transformed payload.

If such a strategy is chosen you will need to ensure the connection factory has an appropriate serializer/deserializer pair to handle such a payload, such as the DefaultSerializer/Deserializer which use java serialization, or a custom serializer and deserializer. The ByteArray*Serializer options mentioned in Section 31.3, “TCP Connection Factories”, including the default ByteArrayCrLfSerializer, do not support such payloads, unless the transformed payload is a String or byte[],

[Note]Note

Before the 2.2 release, when a client connection factory was used by collaborating channel adapters, the so-timeout attribute defaulted to the default reply timeout (10 seconds). This meant that if no data were received by the inbound adapter for this period of time, the socket was closed.

This default behavior was not appropriate in a truly asynchronous environment, so it now defaults to an infinite timeout. You can reinstate the previous default behavior by setting the so-timeout attribute on the client connection factory to 10000 milliseconds.

==== Transferring Headers

TCP is a streaming protocol; Serializers and Deserializers are used to demarcate messages within the stream. Prior to 3.0, only message payloads (String or byte[]) could be transferred over TCP. Beginning with 3.0, you can now transfer selected headers as well as the payload. It is important to understand, though, that "live" objects, such as the replyChannel header cannot be serialized.

Sending header information over TCP requires some additional configuration.

The first step is to provide the ConnectionFactory with a MessageConvertingTcpMessageMapper using the mapper attribute. This mapper delegates to any MessageConverter implementation to convert the message to/from some object that can be (de)serialized by the configured serializer and deserializer.

A MapMessageConverter is provided, which allows the specification of a list of headers that will be added to a Map object, along with the payload. The generated Map has two entries: payload and headers. The headers entry is itself a Map containing the selected headers.

The second step is to provide a (de)serializer that can convert between a Map and some wire format. This can be a custom (de)Serializer, which would typically be needed if the peer system is not a Spring Integration application.

A MapJsonSerializer is provided that will convert a Map to/from JSON. This uses a Spring Integration JsonObjectMapper to perform this function. You can provide a custom JsonObjectMapper if needed. By default, the serializer inserts a linefeed`0x0a` character between objects. See the JavaDocs for more information.

[Note]Note

At the time of writing, the JsonObjectMapper uses whichever version of Jackson is on the classpath.

You can also use standard Java serialization of the Map, using the DefaultSerializer and DefaultDeserializer.

The following example shows the configuration of a connection factory that transfers the correlationId, sequenceNumber, and sequenceSize headers using JSON.

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="12345"
    mapper="mapper"
    serializer="jsonSerializer"
    deserializer="jsonSerializer"/>

<bean id="mapper"
      class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
    <constructor-arg name="messageConverter">
        <bean class="o.sf.integration.support.converter.MapMessageConverter">
            <property name="headerNames">
                <list>
                    <value>correlationId</value>
                    <value>sequenceNumber</value>
                    <value>sequenceSize</value>
                </list>
            </property>
        </bean>
    </constructor-arg>
</bean>

<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />

A message sent with the above configuration, with payload foo would appear on the wire like so:

{"headers":{"correlationId":"bar","sequenceSize":5,"sequenceNumber":1},"payload":"foo"}

=== A Note About NIO

Using NIO (see using-nio in Section 31.3.2, “TCP Failover Client Connection Factory”) avoids dedicating a thread to read from each socket. For a small number of sockets, you will likely find that not using NIO, together with an async handoff (e.g. to a QueueChannel), will perform as well as, or better than, using NIO.

Consider using NIO when handling a large number of connections. However, the use of NIO has some other ramifications. A pool of threads (in the task executor) is shared across all the sockets; each incoming message is assembled and sent to the configured channel as a separate unit of work on a thread selected from that pool. Two sequential messages arriving on the same socket might be processed by different threads. This means that the order in which the messages are sent to the channel is indeterminate; the strict ordering of the messages arriving on the socket is not maintained.

For some applications, this is not an issue; for others it is. If strict ordering is required, consider setting using-nio to false and using async handoff.

Alternatively, you may choose to insert a resequencer downstream of the inbound endpoint to return the messages to their proper sequence. Set apply-sequence to true on the connection factory, and messages arriving on a TCP connection will have sequenceNumber and correlationId headers set. The resequencer uses these headers to return the messages to their proper sequence.

Pool Size

The pool size attribute is no longer used; previously, it specified the size of the default thread pool when a task-executor was not specified. It was also used to set the connection backlog on server sockets. The first function is no longer needed (see below); the second function is replaced by the backlog attribute.

Previously, when using a fixed thread pool task executor (which was the default), with NIO, it was possible to get a deadlock and processing would stop. The problem occurred when a buffer was full, a thread reading from the socket was trying to add more data to the buffer, and there were no threads available to make space in the buffer. This only occurred with a very small pool size, but it could be possible under extreme conditions. Since 2.2, two changes have eliminated this problem. First, the default task executor is a cached thread pool executor. Second, deadlock detection logic has been added such that if thread starvation occurs, instead of deadlocking, an exception is thrown, thus releasing the deadlocked resources.

[Important]Important

Now that the default task executor is unbounded, it is possible that an out of memory condition might occur with high rates of incoming messages, if message processing takes extended time. If your application exhibits this type of behavior, you are advised to use a pooled task executor with an appropriate pool size, but see the next section.

==== Thread Pool Task Executor with CALLER_RUNS Policy

There are some important considerations when using a fixed thread pool with the CallerRunsPolicy (CALLER_RUNS when using the <task/> namespace) and the queue capacity is small.

The following does not apply if you are not using a fixed thread pool.

With NIO connections there are 3 distinct task types; the IO Selector processing is performed on one dedicated thread - detecting events, accepting new connections, and dispatching the IO read operations to other threads, using the task executor. When an IO reader thread (to which the read operation is dispatched) reads data, it hands off to another thread to assemble the incoming message; large messages may take several reads to complete. These "assembler" threads can block waiting for data. When a new read event occurs, the reader determines if this socket already has an assembler and runs a new one if not. When the assembly process is complete, the assembler thread is returned to the pool.

This can cause a deadlock when the pool is exhausted and the CALLER_RUNS rejection policy is in use, and the task queue is full. When the pool is empty and there is no room in the queue, the IO selector thread receives an OP_READ event and dispatches the read using the executor; the queue is full, so the selector thread itself starts the read process; now, it detects that there is not an assembler for this socket and, before it does the read, fires off an assembler; again, the queue is full, and the selector thread becomes the assembler. The assembler is now blocked awaiting the data to be read, which will never happen. The connection factory is now deadlocked because the selector thread can’t handle new events.

We must avoid the selector (or reader) threads performing the assembly task to avoid this deadlock. It is desirable to use seperate pools for the IO and assembly operations.

The framework provides a CompositeExecutor, which allows the configuration of two distinct executors; one for performing IO operations, and one for message assembly. In this environment, an IO thread can never become an assembler thread, and the deadlock cannot occur.

In addition, the task executors should be configured to use a AbortPolicy (ABORT when using <task>). When an IO cannot be completed, it is deferred for a short time and retried continually until it can be completed and an assembler allocated. By default, the delay is 100ms but it can be changed using the readDelay property on the connection factory (read-delay when configuring with the XML namespace).

Example configuration of the composite executor is shown below.

@Bean
private CompositeExecutor compositeExecutor() {
    ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
    ioExec.setCorePoolSize(4);
    ioExec.setMaxPoolSize(10);
    ioExec.setQueueCapacity(0);
    ioExec.setThreadNamePrefix("io-");
    ioExec.setRejectedExecutionHandler(new AbortPolicy());
    ioExec.initialize();
    ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
    assemblerExec.setCorePoolSize(4);
    assemblerExec.setMaxPoolSize(10);
    assemblerExec.setQueueCapacity(0);
    assemblerExec.setThreadNamePrefix("assembler-");
    assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
    assemblerExec.initialize();
    return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg ref="io"/>
    <constructor-arg ref="assembler"/>
</bean>

<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="io-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="8" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="assembler-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="10" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
</bean>

=== SSL/TLS Support

==== Overview

Secure Sockets Layer/Transport Layer Security is supported. When using NIO, the JDK 5+ SSLEngine feature is used to handle handshaking after the connection is established. When not using NIO, standard SSLSocketFactory and SSLServerSocketFactory objects are used to create connections. A number of strategy interfaces are provided to allow significant customization; default implementations of these interfaces provide for the simplest way to get started with secure communications.

==== Getting Started

Regardless of whether NIO is being used, you need to configure the ssl-context-support attribute on the connection factory. This attribute references a <bean/> definition that describes the location and passwords for the required key stores.

SSL/TLS peers require two keystores each; a keystore containing private/public key pairs identifying the peer; a truststore, containing the public keys for peers that are trusted. See the documentation for the keytool utility provided with the JDK. The essential steps are

  1. Create a new key pair and store in a keystore.
  2. Export the public key.
  3. Import the public key into the peer’s truststore.

Repeat for the other peer.

[Note]Note

It is common in test cases to use the same key stores on both peers, but this should be avoided for production.

After establishing the key stores, the next step is to indicate their locations to the TcpSSLContextSupport bean, and provide a reference to that bean to the connection factory.

<bean id="sslContextSupport"
    class="o.sf.integration.ip.tcp.connection.support.DefaultTcpSSLContextSupport">
    <constructor-arg value="client.ks"/>
    <constructor-arg value="client.truststore.ks"/>
    <constructor-arg value="secret"/>
    <constructor-arg value="secret"/>
</bean>

<ip:tcp-connection-factory id="clientFactory"
    type="client"
    host="localhost"
    port="1234"
    ssl-context-support="sslContextSupport" />

The DefaulTcpSSLContextSupport class also has an optional protocol property, which can be SSL or TLS (default).

The keystore file names (first two constructor arguments) use the Spring Resource abstraction; by default the files will be located on the classpath, but this can be overridden by using the file: prefix, to find the files on the filesystem instead.

Starting with version 4.3.6, when using NIO, you can specify an ssl-handshake-timeout (seconds) on the connection factory. This timeout (default 30) is used during SSL handshake when waiting for data; if the timeout is exceeded, the process is aborted and the socket closed.

=== Advanced Techniques

==== Strategy Interfaces

In many cases, the configuration described above is all that is needed to enable secure communication over TCP/IP. However, a number of strategy interfaces are provided to allow customization and modification of socket factories and sockets.

  • TcpSSLContextSupport
  • TcpSocketFactorySupport
  • TcpSocketSupport
  • TcpNioConnectionSupport
public interface TcpSSLContextSupport {

    SSLContext getSSLContext() throws Exception;

}

Implementations of this interface are responsible for creating an SSLContext. The implementation provided by the framework is the DefaultTcpSSLContextSupport described above. If you require different behavior, implement this interface and provide the connection factory with a reference to a bean of your class' implementation.

public interface TcpSocketFactorySupport {

    ServerSocketFactory getServerSocketFactory();

    SocketFactory getSocketFactory();

}

Implementations of this interface are responsible for obtaining references to ServerSocketFactory and SocketFactory. Two implementations are provided; the first is DefaultTcpNetSocketFactorySupport for non-SSL sockets (when no ssl-context-support attribute is defined); this simply uses the JDK’s default factories. The second implementation is DefaultTcpNetSSLSocketFactorySupport; this is used, by default, when an ssl-context-support attribute is defined; it uses the SSLContext created by that bean to create the socket factories.

[Note]Note

This interface only applies if using-nio is "false"; socket factories are not used by NIO.

public interface TcpSocketSupport {

    void postProcessServerSocket(ServerSocket serverSocket);

    void postProcessSocket(Socket socket);

}

Implementations of this interface can modify sockets after they are created, and after all configured attributes have been applied, but before the sockets are used. This applies whether or not NIO is being used. For example, you could use an implementation of this interface to modify the supported cipher suites on an SSL socket, or you could add a listener that gets notified after SSL handshaking is complete. The sole implementation provided by the framework is the DefaultTcpSocketSupport which does not modify the sockets in any way

To supply your own implementation of TcpSocketFactorySupport or TcpSocketSupport, provide the connection factory with references to beans of your custom type using the socket-factory-support and socket-support attributes, respectively.

public interface TcpNioConnectionSupport {

    TcpNioConnection createNewConnection(SocketChannel socketChannel,
            boolean server, boolean lookupHost,
            ApplicationEventPublisher applicationEventPublisher,
            String connectionFactoryName) throws Exception;

}

This interface is invoked to create TcpNioConnection objects (or subclasses). Two implementations are provided DefaultTcpNioSSLConnectionSupport and DefaultTcpNioConnectionSupport which are used depending on whether SSL is in use or not. A common use case would be to subclass DefaultTcpNioSSLConnectionSupport and override postProcessSSLEngine; see the example below.

==== Example: Enabling SSL Client Authentication

To enable client certificate authentication when using SSL, the technique depends on whether NIO is in use or not. When NIO is not being used, provide a custom TcpSocketSupport implementation to post-process the server socket:

serverFactory.setTcpSocketSupport(new DefaultTcpSocketSupport() {

    @Override
    public void postProcessServerSocket(ServerSocket serverSocket) {
        ((SSLServerSocket) serverSocket).setNeedClientAuth(true);
    }

});

(When using XML configuration, provide a reference to your bean using the socket-support attribute).

When using NIO, provide a custom TcpNioSslConnectionSupport implementation to post-process the SSLEngine.

@Bean
public DefaultTcpNioSSLConnectionSupport tcpNioConnectionSupport() {
    return new DefaultTcpNioSSLConnectionSupport(serverSslContextSupport) {

            @Override
            protected void postProcessSSLEngine(SSLEngine sslEngine) {
                sslEngine.setNeedClientAuth(true);
            }

    }
}

@Bean
public TcpNioServerConnectionFactory server() {
    ...
    serverFactory.setTcpNioConnectionSupport(tcpNioConnectionSupport());
    ...
}

(When using XML configuration, since version 4.3.7, provide a reference to your bean using the nio-connection-support attribute).

=== IP Configuration Attributes

Attribute NameClient?Server?Allowed ValuesAttribute Description

type

Y

Y

client, server

Determines whether the connection factory is a client or server.

host

Y

N

 

The host name or ip address of the destination.

port

Y

Y

 

The port.

serializer

Y

Y

 

An implementation of Serializer used to serialize the payload. Defaults to ByteArrayCrLfSerializer

deserializer

Y

Y

 

An implementation of Deserializer used to deserialize the payload. Defaults to ByteArrayCrLfSerializer

using-nio

Y

Y

true, false

Whether or not connection uses NIO. Refer to the java.nio package for more information. See Section 31.3.2, “TCP Failover Client Connection Factory”. Default false.

using-direct-buffers

Y

N

true, false

When using NIO, whether or not the connection uses direct buffers. Refer to java.nio.ByteBuffer documentation for more information. Must be false if using-nio is false.

apply-sequence

Y

Y

true, false

When using NIO, it may be necessary to resequence messages. When this attribute is set to true, correlationId and sequenceNumber headers will be added to received messages. See Section 31.3.2, “TCP Failover Client Connection Factory”. Default false.

so-timeout

Y

Y

 

Defaults to 0 (infinity), except for server connection factories with single-use="true". In that case, it defaults to the default reply timeout (10 seconds).

so-send-buffer-size

Y

Y

 

See java.net.Socket. setSendBufferSize().

so-receive-buffer- size

Y

Y

 

See java.net.Socket. setReceiveBufferSize().

so-keep-alive

Y

Y

true, false

See java.net.Socket. setKeepAlive().

so-linger

Y

Y

 

Sets linger to true with supplied value. See java.net.Socket. setSoLinger().

so-tcp-no-delay

Y

Y

true, false

See java.net.Socket. setTcpNoDelay().

so-traffic-class

Y

Y

 

See java.net.Socket. setTrafficClass().

local-address

N

Y

 

On a multi-homed system, specifies an IP address for the interface to which the socket will be bound.

task-executor

Y

Y

 

Specifies a specific Executor to be used for socket handling. If not supplied, an internal cached thread executor will be used. Needed on some platforms that require the use of specific task executors such as a WorkManagerTaskExecutor.

single-use

Y

Y

true, false

Specifies whether a connection can be used for multiple messages. If true, a new connection will be used for each message.

pool-size

N

N

 

This attribute is no longer used. For backward compatibility, it sets the backlog but users should use backlog to specify the connection backlog in server factories

backlog

N

Y

 

Sets the connection backlog for server factories.

lookup-host

Y

Y

true, false

Specifies whether reverse lookups are done on IP addresses to convert to host names for use in message headers. If false, the IP address is used instead. Defaults to true.

interceptor-factory-chain

Y

Y

 

See Section 31.3.2, “TCP Failover Client Connection Factory”

ssl-context-support

Y

Y

 

See Section 31.3.2, “TCP Failover Client Connection Factory”

socket-factory-support

Y

Y

 

See Section 31.3.2, “TCP Failover Client Connection Factory”

socket-support

Y

Y

 

See Section 31.3.2, “TCP Failover Client Connection Factory”

nio-connection-support

Y

Y

 

See Section 31.3.2, “TCP Failover Client Connection Factory”

read-delay

Y

Y

long > 0

The delay (in milliseconds) before retrying a read after the previous attempt failed due to insufficient threads. Default 100. Only applies if using-nio is true.

Table 31.1. UDP Inbound Channel Adapter Attributes

Attribute NameAllowed ValuesAttribute Description

port

 

The port on which the adapter listens.

multicast

true, false

Whether or not the udp adapter uses multicast.

multicast-address

 

When multicast is true, the multicast address to which the adapter joins.

pool-size

 

Specifies the concurrency. Specifies how many packets can be handled concurrently. It only applies if task-executor is not configured. Defaults to 5.

task-executor

 

Specifies a specific Executor to be used for socket handling. If not supplied, an internal pooled executor will be used. Needed on some platforms that require the use of specific task executors such as a WorkManagerTaskExecutor. See pool-size for thread requirements.

receive-buffer-size

 

The size of the buffer used to receive DatagramPackets. Usually set to the MTU size. If a smaller buffer is used than the size of the sent packet, truncation can occur. This can be detected by means of the check-length attribute..

check-length

true, false

Whether or not a udp adapter expects a data length field in the packet received. Used to detect packet truncation.

so-timeout

 

See java.net.DatagramSocket setSoTimeout() methods for more information.

so-send-buffer-size

 

Used for udp acknowledgment packets. See java.net.DatagramSocket setSendBufferSize() methods for more information.

so-receive-buffer- size

 

See java.net.DatagramSocket setReceiveBufferSize() for more information.

local-address

 

On a multi-homed system, specifies an IP address for the interface to which the socket will be bound.

error-channel

 

If an Exception is thrown by a downstream component, the MessagingException message containing the exception and failed message is sent to this channel.

lookup-host

true, false

Specifies whether reverse lookups are done on IP addresses to convert to host names for use in message headers. If false, the IP address is used instead. Defaults to true.


Table 31.2. UDP Outbound Channel Adapter Attributes

Attribute NameAllowed ValuesAttribute Description

host

 

The host name or ip address of the destination. For multicast udp adapters, the multicast address.

port

 

The port on the destination.

multicast

true, false

Whether or not the udp adapter uses multicast.

acknowledge

true, false

Whether or not a udp adapter requires an acknowledgment from the destination. when enabled, requires setting the following 4 attributes.

ack-host

 

When acknowledge is true, indicates the host or ip address to which the acknowledgment should be sent. Usually the current host, but may be different, for example when Network Address Transaction (NAT) is being used.

ack-port

 

When acknowledge is true, indicates the port to which the acknowledgment should be sent. The adapter listens on this port for acknowledgments.

ack-timeout

 

When acknowledge is true, indicates the time in milliseconds that the adapter will wait for an acknowledgment. If an acknowledgment is not received in time, the adapter will throw an exception.

min-acks-for- success

 

Defaults to 1. For multicast adapters, you can set this to a larger value, requiring acknowledgments from multiple destinations.

check-length

true, false

Whether or not a udp adapter includes a data length field in the packet sent to the destination.

time-to-live

 

For multicast adapters, specifies the time to live attribute for the MulticastSocket; controls the scope of the multicasts. Refer to the Java API documentation for more information.

so-timeout

 

See java.net.DatagramSocket setSoTimeout() methods for more information.

so-send-buffer-size

 

See java.net.DatagramSocket setSendBufferSize() methods for more information.

so-receive-buffer- size

 

Used for udp acknowledgment packets. See java.net.DatagramSocket setReceiveBufferSize() methods for more information.

local-address

 

On a multi-homed system, for the UDP adapter, specifies an IP address for the interface to which the socket will be bound for reply messages. For a multicast adapter it is also used to determine which interface the multicast packets will be sent over.

task-executor

 

Specifies a specific Executor to be used for acknowledgment handling. If not supplied, an internal single threaded executor will be used. Needed on some platforms that require the use of specific task executors such as a WorkManagerTaskExecutor. One thread will be dedicated to handling acknowledgments (if the acknowledge option is true).

destination-expression

SpEL expression

A SpEL expression to be evaluated to determine which SocketAddress to use as a destination address for the outgoing UDP packets.

socket-expression

SpEL expression

A SpEL expression to be evaluated to determine which datagram socket use for sending outgoing UDP packets.


Table 31.3. TCP Inbound Channel Adapter Attributes

Attribute NameAllowed ValuesAttribute Description

channel

 

The channel to which inbound messages will be sent.

connection-factory

 

If the connection factory has a type server, the factory is owned by this adapter. If it has a type client, it is owned by an outbound channel adapter and this adapter will receive any incoming messages on the connection created by the outbound adapter.

error-channel

 

If an Exception is thrown by a downstream component, the MessagingException message containing the exception and failed message is sent to this channel.

client-mode

true, false

When true, the inbound adapter will act as a client, with respect to establishing the connection and then receive incoming messages on that connection. Default = false. Also see retry-interval and scheduler. The connection factory must be of type client and have single-use set to false.

retry-interval

 

When in client-mode, specifies the number of milliseconds to wait between connection attempts, or after a connection failure. Default 60,000 (60 seconds).

scheduler

true, false

Specifies a TaskScheduler to use for managing the client-mode connection. Defaults to a ThreadPoolTaskScheduler with a pool size of `.


Table 31.4. TCP Outbound Channel Adapter Attributes

Attribute NameAllowed ValuesAttribute Description

channel

 

The channel on which outbound messages arrive.

connection-factory

 

If the connection factory has a type client, the factory is owned by this adapter. If it has a type server, it is owned by an inbound channel adapter and this adapter will attempt to correlate messages to the connection on which an original inbound message was received.

client-mode

true, false

When true, the outbound adapter will attempt to establish the connection as soon as it is started. When false, the connection is established when the first message is sent. Default = false. Also see retry-interval and scheduler. The connection factory must be of type client and have single-use set to false.

retry-interval

 

When in client-mode, specifies the number of milliseconds to wait between connection attempts, or after a connection failure. Default 60,000 (60 seconds).

scheduler

true, false

Specifies a TaskScheduler to use for managing the client-mode connection. Defaults to a ThreadPoolTaskScheduler with a pool size of `.


Table 31.5. TCP Inbound Gateway Attributes

Attribute NameAllowed ValuesAttribute Description

connection-factory

 

The connection factory must be of type server.

request-channel

 

The channel to which incoming messages will be sent.

reply-channel

 

The channel on which reply messages may arrive. Usually replies will arrive on a temporary reply channel added to the inbound message header

reply-timeout

 

The time in milliseconds for which the gateway will wait for a reply. Default 1000 (1 second).

error-channel

 

If an Exception is thrown by a downstream component, the MessagingException message containing the exception and failed message is sent to this channel; any reply from that flow will then be returned as a response by the gateway.

client-mode

true, false

When true, the inbound gateway will act as a client, with respect to establishing the connection and then receive (and reply to) incoming messages on that connection. Default = false. Also see retry-interval and scheduler. The connection factory must be of type client and have single-use set to false.

retry-interval

 

When in client-mode, specifies the number of milliseconds to wait between connection attempts, or after a connection failure. Default 60,000 (60 seconds).

scheduler

true, false

Specifies a TaskScheduler to use for managing the client-mode connection. Defaults to a ThreadPoolTaskScheduler with a pool size of `.


Table 31.6. TCP Outbound Gateway Attributes

Attribute NameAllowed ValuesAttribute Description

connection-factory

 

The connection factory must be of type client.

request-channel

 

The channel on which outgoing messages will arrive.

reply-channel

 

Optional. The channel to which reply messages may be sent.

remote-timeout

 

The time in milliseconds for which the gateway will wait for a reply from the remote system. Mutually exclusive with remote-timeout-expression. Default: 10000 (10 seconds). Note: in versions prior to 4.2 this value defaulted to reply-timeout (if set).

remote-timeout-expression

 

A SpEL expression, evaluated against the message to determine the time in milliseconds for which the gateway will wait for a reply from the remote system. Mutually exclusive with remote-timeout.

request-timeout

 

If a single-use connection factory is not being used, The time in milliseconds for which the gateway will wait to get access to the shared connection.

reply-timeout

 

The time in milliseconds for which the gateway will wait when sending the reply to the reply-channel. Only applies if the reply-channel might block, such as a bounded QueueChannel that is currently full.


IP Message Headers.  === IP Message Headers The following MessageHeader s are used by this module:

Header NameIpHeaders ConstantDescription

ip_hostname

HOSTNAME

The host name from which a TCP message or UDP packet was received. If lookupHost is false, this will contain the ip address.

ip_address

IP_ADDRESS

The ip address from which a TCP message or UDP packet was received.

ip_port

PORT

The remote port for a UDP packet.

ip_localInetAddress

IP_LOCAL_ADDRESS

The local InetAddress to which the socket is connected (since version 4.2.5).

ip_ackTo

ACKADDRESS

The remote ip address to which UDP application-level acks will be sent. The framework includes acknowledgment information in the data packet.

ip_ackId

ACK_ID

A correlation id for UDP application-level acks. The framework includes acknowledgment information in the data packet.

ip_tcp_remotePort

REMOTE_PORT

The remote port for a TCP connection.

ip_connectionId

CONNECTION_ID

A unique identifier for a TCP connection; set by the framework for inbound messages; when sending to a server-side inbound channel adapter, or replying to an inbound gateway, this header is required so the endpoint can determine which connection to send the message to.

ip_actualConnectionId

ACTUAL_ CONNECTION_ID

For information only - when using a cached or failover client connection factory, contains the actual underlying connection id.

contentType

MessageHeaders. CONTENT_TYPE

An optional content type for inbound messages; see below. Note that, unlike the other header constants, this constant is in the class MessageHeaders not IpHeaders.

For inbound messages, ip_hostname, ip_address, ip_tcp_remotePort and ip_connectionId are mapped by the default TcpHeaderMapper. Set the mapper’s addContentTypeHeader property to true and the mapper will set the contentType header (application/octet-stream;charset="UTF-8") by default. You can change the default by setting the contentType property. Users can add additional headers by subclassing TcpHeaderMapper and overriding the method supplyCustomHeaders. For example, when using SSL, properties of the SSLSession can be added by obtaining the session object from the TcpConnection object which is provided as an argument to the supplyCustomHeaders method.

For outbound messages, String payloads are converted to byte[] using the default (UTF-8) charset. Set the charset property to change the default.

When customizing the mapper properties, or subclassing, declare the mapper as a bean and provide an instance to the connection factory using the mapper property

=== Annotation-Based Configuration

The following example from the samples repository is used to illustrate some of the configuration options when using annotations instead of XML.

@EnableIntegration 1
@IntegrationComponentScan 2
@Configuration
public static class Config {

    @Value(${some.port})
    private int port;

    @MessagingGateway(defaultRequestChannel="toTcp") 3
    public interface Gateway {

        String viaTcp(String in);

    }

    @Bean
    @ServiceActivator(inputChannel="toTcp") 4
    public MessageHandler tcpOutGate(AbstractClientConnectionFactory connectionFactory) {
        TcpOutboundGateway gate = new TcpOutboundGateway();
        gate.setConnectionFactory(connectionFactory);
        gate.setOutputChannelName("resultToString");
        return gate;
    }

    @Bean 5
    public TcpInboundGateway tcpInGate(AbstractServerConnectionFactory connectionFactory)  {
        TcpInboundGateway inGate = new TcpInboundGateway();
        inGate.setConnectionFactory(connectionFactory);
        inGate.setRequestChannel(fromTcp());
        return inGate;
    }

    @Bean
    public MessageChannel fromTcp() {
        return new DirectChannel();
    }

    @MessageEndpoint
    public static class Echo { 6

        @Transformer(inputChannel="fromTcp", outputChannel="toEcho")
        public String convert(byte[] bytes) {
            return new String(bytes);
        }

        @ServiceActivator(inputChannel="toEcho")
        public String upCase(String in) {
            return in.toUpperCase();
        }

        @Transformer(inputChannel="resultToString")
        public String convertResult(byte[] bytes) {
            return new String(bytes);
        }

    }

    @Bean
    public AbstractClientConnectionFactory clientCF() { 7
        return new TcpNetClientConnectionFactory("localhost", this.port);
    }

    @Bean
    public AbstractServerConnectionFactory serverCF() { 8
        return new TcpNetServerConnectionFactory(this.port);
    }

}

1

Standard Spring Integration annotation enabling the infrastructure for an integration application.

2

Searches for @MessagingGateway interfaces.

3

The entry point to the client-side of the flow. The calling application can @Autowired this Gateway bean and invoke its method.

4

Outbound endpoints consist of a MessageHandler and a consumer that wraps it. In this scenario, the @ServiceActivator configures the endpoint according to the channel type.

5

Inbound endpoints (in the TCP/UDP module) are all message-driven so just need to be declared as simple @Bean s.

6

This class provides a number of POJO methods for use in this sample flow (a @Transformer and @ServiceActivator on the server side, and a @Transformer on the client side).

7

The client-side connection factory.

8

The server-side connection factory.

== Twitter Support

Spring Integration provides support for interacting with Twitter. With the Twitter adapters you can both receive and send Twitter messages. You can also perform a Twitter search based on a schedule and publish the search results within Messages. Since version 4.0, a search outbound gateway is provided to perform dynamic searches.

=== Introduction

Twitter is a social networking and micro-blogging service that enables its users to send and read messages known as tweets. Tweets are text-based posts of up to 140 characters displayed on the author’s profile page and delivered to the author’s subscribers who are known as followers.

[Important]Important

Versions of Spring Integration prior to 2.1 were dependent upon the Twitter4J API, but with the release of Spring Social 1.0 GA, Spring Integration, as of version 2.1, now builds directly upon Spring Social’s Twitter support, instead of Twitter4J. All Twitter endpoints require the configuration of a TwitterTemplate because even search operations require an authenticated template.

Spring Integration provides a convenient namespace configuration to define Twitter artifacts. You can enable it by adding the following within your XML header.

xmlns:int-twitter="http://www.springframework.org/schema/integration/twitter"
xsi:schemaLocation="http://www.springframework.org/schema/integration/twitter
https://www.springframework.org/schema/integration/twitter/spring-integration-twitter.xsd"

=== Twitter OAuth Configuration

For authenticated operations, Twitter uses OAuth - an authentication protocol that allows users to approve an application to act on their behalf without sharing their password. More information can be found at https://oauth.net or in this article https://hueniverse.com/oauth from Hueniverse. Please also see OAuth FAQ for more information about OAuth and Twitter.

In order to use OAuth authentication/authorization with Twitter you must create a new Application on the Twitter Developers site. Follow the directions below to create a new application and obtain consumer keys and an access token:

  • Go to https://dev.twitter.com
  • Click on the Register an app link and fill out all required fields on the form provided; set Application Type to Client and depending on the nature of your application select Default Access Type as Read & Write or Read-only and Submit the form. If everything is successful you’ll be presented with the Consumer Key and Consumer Secret. Copy both values in a safe place.
  • On the same page you should see a My Access Token button on the side bar (right). Click on it and you’ll be presented with two more values: Access Token and Access Token Secret. Copy these values in a safe place as well.

=== Twitter Template

As mentioned above, Spring Integration relies upon Spring Social, and that library provides an implementation of the template pattern, o.s.social.twitter.api.impl.TwitterTemplate to interact with Twitter. For anonymous operations (e.g., search), you don’t have to define an instance of TwitterTemplate explicitly, since a default instance will be created and injected into the endpoint. However, for authenticated operations (update status, send direct message, etc.), you must configure a TwitterTemplate as a bean and inject it explicitly into the endpoint, because the authentication configuration is required. Below is a sample configuration of TwitterTemplate:

<bean id="twitterTemplate" class="o.s.social.twitter.api.impl.TwitterTemplate">
	<constructor-arg value="4XzBPacJQxyBzzzH"/>
	<constructor-arg value="AbRxUAvyCtqQtvxFK8w5ZMtMj20KFhB6o"/>
	<constructor-arg value="21691649-4YZY5iJEOfz2A9qCFd9SjBRGb3HLmIm4HNE"/>
	<constructor-arg value="AbRxUAvyNCtqQtxFK8w5ZMtMj20KFhB6o"/>
</bean>
[Note]Note

The values above are not real.

 As you can see from the configuration above, all we need to do is to provide OAuth `attributes` as constructor arguments.
The values would be those you obtained in the previous step.
The order of constructor arguments is: 1) `consumerKey`, 2) `consumerSecret`, 3) `accessToken`, and 4) `accessTokenSecret`.

A more practical way to manage OAuth connection attributes would be via Spring’s property placeholder support by simply creating a property file (e.g., oauth.properties):

twitter.oauth.consumerKey=4XzBPacJQxyBzzzH
twitter.oauth.consumerSecret=AbRxUAvyCtqQtvxFK8w5ZMtMj20KFhB6o
twitter.oauth.accessToken=21691649-4YZY5iJEOfz2A9qCFd9SjBRGb3HLmIm4HNE
twitter.oauth.accessTokenSecret=AbRxUAvyNCtqQtxFK8w5ZMtMj20KFhB6o

Then, you can configure a property-placeholder to point to the above property file:

<context:property-placeholder location="classpath:oauth.properties"/>

<bean id="twitterTemplate" class="o.s.social.twitter.api.impl.TwitterTemplate">
    <constructor-arg value="${twitter.oauth.consumerKey}"/>
    <constructor-arg value="${twitter.oauth.consumerSecret}"/>
    <constructor-arg value="${twitter.oauth.accessToken}"/>
    <constructor-arg value="${twitter.oauth.accessTokenSecret}"/>
</bean>

=== Twitter Inbound Adapters

Twitter inbound adapters allow you to receive Twitter Messages. There are several types of twitter messages, or tweets

Spring Integration version 2.0 and above provides support for receiving tweets as Timeline Updates, Direct Messages, Mention Messages as well as Search Results.

[Important]Important

Every Inbound Twitter Channel Adapter is a Polling Consumer which means you have to provide a poller configuration. Twitter defines a concept of Rate Limiting. You can read more about it here: Rate Limiting. In a nutshell, Rate Limiting is a mechanism that Twitter uses to manage how often an application can poll for updates. You should consider this when setting your poller intervals so that the adapter polls in compliance with the Twitter policies.

With Spring Integration prior to version 3.0, a hard-coded limit within the adapters was used to ensure the polling interval could not be less than 15 seconds. This is no longer the case and the poller configuration is applied directly.

Another issue that we need to worry about is handling duplicate Tweets. The same adapter (e.g., Search or Timeline Update) while polling on Twitter may receive the same values more than once. For example if you keep searching on Twitter with the same search criteria you’ll end up with the same set of tweets unless some other new tweet that matches your search criteria was posted in between your searches. In that situation you’ll get all the tweets you had before plus the new one. But what you really want is only the new tweet(s). Spring Integration provides an elegant mechanism for handling these situations. The latest Tweet id will be stored in an instance of the org.springframework.integration.metadata.MetadataStore strategy (e.g. last retrieved tweet in this case). For more information see Section 9.5, “Metadata Store”.

[Note]Note

The key used to persist the latest twitter id is the value of the (required) id attribute of the Twitter Inbound Channel Adapter component plus the profileId of the Twitter user.

Prior to version 4.0, the page size was hard-coded to 20. This is now configurable using the page-size attribute (defaults to 20).

==== Inbound Message Channel Adapter

This adapter allows you to receive updates from everyone you follow. It’s essentially the "Timeline Update" adapter.

<int-twitter:inbound-channel-adapter
  		twitter-template="twitterTemplate"
  		channel="inChannel">
    <int:poller fixed-rate="5000" max-messages-per-poll="3"/>
</int-twitter:inbound-channel-adapter>

==== Direct Inbound Message Channel Adapter

This adapter allows you to receive Direct Messages that were sent to you from other Twitter users.

<int-twitter:dm-inbound-channel-adapter
  		twitter-template="twiterTemplate"
  		channel="inboundDmChannel">
    <int-poller fixed-rate="5000" max-messages-per-poll="3"/>
</int-twitter:dm-inbound-channel-adapter>

==== Mentions Inbound Message Channel Adapter

This adapter allows you to receive Twitter Messages that Mention you via @user syntax.

<int-twitter:mentions-inbound-channel-adapter
  		twitter-template="twiterTemplate"
		channel="inboundMentionsChannel">
    <int:poller fixed-rate="5000" max-messages-per-poll="3"/>
</int-twitter:mentions-inbound-channel-adapter>

==== Search Inbound Message Channel Adapter

This adapter allows you to perform searches. As you can see it is not necessary to define twitter-template since a search can be performed anonymously, however you must define a search query.

<int-twitter:search-inbound-channel-adapter
  		query="#springintegration"
		channel="inboundMentionsChannel">
     <int:poller fixed-rate="5000" max-messages-per-poll="3"/>
</int-twitter:search-inbound-channel-adapter>

Refer to https://dev.twitter.com/docs/using-search to learn more about Twitter queries.

As you can see the configuration of all of these adapters is very similar to other inbound adapters with one exception. Some may need to be injected with the twitter-template. Once received each Twitter Message would be encapsulated in a Spring Integration Message and sent to the channel specified by the channel attribute. Currently the Payload type of any Message is org.springframework.integration.twitter.core.Tweet which is very similar to the object with the same name in Spring Social. As we migrate to Spring Social we’ll be depending on their API and some of the artifacts that are currently in use will be obsolete, however we’ve already made sure that the impact of such migration is minimal by aligning our API with the current state (at the time of writing) of Spring Social.

To get the text from the org.springframework.social.twitter.api.Tweet simply invoke the getText() method.

=== Twitter Outbound Adapter

Twitter outbound channel adapters allow you to send Twitter Messages, or tweets.

Spring Integration version 2.0 and above supports sending Status Update Messages and Direct Messages. Twitter outbound channel adapters will take the Message payload and send it as a Twitter message. Currently the only supported payload type is`String`, so consider adding a transformer if the payload of the incoming message is not a String.

==== Twitter Outbound Update Channel Adapter

This adapter allows you to send regular status updates by simply sending a Message to the channel identified by the channel attribute.

<int-twitter:outbound-channel-adapter
  		twitter-template="twitterTemplate"
  		channel="twitterChannel"/>
The only extra configuration that is required for this adapter is the `twitter-template` reference.

Starting with version 4.0 the <int-twitter:outbound-channel-adapter> supports a tweet-data-expression to populate the TweetData argument (Spring Social Twitter) using the message as the root object of the expression evaluation context. The result can be a String, which will be used for the TweetData message; a Tweet object, the text of which will be used for the TweetData message; or an entire TweetData object. For convenience, the TweetData can be built from the expression directly without needing a fully qualified class name:

<int-twitter:outbound-channel-adapter
    twitter-template="twitterTemplate"
    channel="twitterChannel"
    tweet-data-expression="new TweetData(payload).withMedia(headers.media).displayCoordinates(true)/>

This allows, for example, attaching an image to the tweet.

==== Twitter Outbound Direct Message Channel Adapter

This adapter allows you to send Direct Twitter Messages (i.e., @user) by simply sending a Message to the channel identified by the channel attribute.

<int-twitter:dm-outbound-channel-adapter
  		twitter-template="twitterTemplate"
  		channel="twitterChannel"/>
The only extra configuration that is required for this adapter is the `twitter-template` reference.

When it comes to Twitter Direct Messages, you must specify who you are sending the message to - the target userid. The Twitter Outbound Direct Message Channel Adapter will look for a target userid in the Message headers under the name twitter_dmTargetUserId which is also identified by the following constant: TwitterHeaders.DM_TARGET_USER_ID. So when creating a Message all you need to do is add a value for that header.

Message message = MessageBuilder.withPayload("hello")
        .setHeader(TwitterHeaders.DM_TARGET_USER_ID, "z_oleg").build();

The above approach works well if you are creating the Message programmatically. However it’s more common to provide the header value within a messaging flow. The value can be provided by an upstream <header-enricher>.

<int:header-enricher input-channel="in" output-channel="out">
    <int:header name="twitter_dmTargetUserId" value="z_oleg"/>
</int:header-enricher>

It’s quite common that the value must be determined dynamically. For those cases you can take advantage of SpEL support within the <header-enricher>.

<int:header-enricher input-channel="in" output-channel="out">
    <int:header name="twitter_dmTargetUserId"
        expression="@twitterIdService.lookup(headers.username)"/>
</int:header-enricher>
[Important]Important

Twitter does not allow you to post duplicate Messages. This is a common problem during testing when the same code works the first time but does not work the second time. So, make sure to change the content of the Message each time. Another thing that works well for testing is to append a timestamp to the end of each message.

=== Twitter Search Outbound Gateway

In Spring Integration, an outbound gateway is used for two-way request/response communication with an external service. The Twitter Search Outbound Gateway allows you to issue dynamic twitter searches. The reply message payload is a collection of Tweet objects. If the search returns no results, the payload is an empty collection. You can limit the number of tweets and you can page through a larger set of tweets by making multiple calls. To facilitate this, search reply messages contain a header twitter_searchMetadata with its value being a SearchMetadata object. For more information on the Tweet, SearchParameters and SearchMetadata classes, refer to the Spring Social Twitter documentation.

Configuring the Outbound Gateway

<int-twitter:search-outbound-gateway id="twitter"
	request-channel="in"  1
	twitter-template="twitterTemplate"  2
	search-args-expression="payload"  3
	reply-channel="out"  4
	reply-timeout="123"  5
	order="1"  6
	auto-startup="false"  7
	phase="100" /> 8

1

The channel used to send search requests to this gateway.

2

A reference to a TwitterTemplate with authentication configuration.

3

A SpEL expression that evaluates to argument(s) for the search. Default: "payload" - in which case the payload can be a String (e.g "#springintegration") and the gateway limits the query to 20 tweets, or the payload can be a SearchParameters object. The expression can also be specified as a SpEL List. The first element (String) is the query, the remaining elements (Numbers) are pageSize, sinceId, maxId respectively - refer to the Spring Social Twitter documentation for more information about these parameters. When specifying a SearchParameters object directly in the SpEL expression, you do not have to fully qualify the class name. Some examples: new SearchParameters(payload).count(5).sinceId(headers.sinceId) {payload, 30} {payload, headers.pageSize, headers.sinceId, headers.maxId}

4

The channel to which to send the reply; if omitted, the replyChannel header is used.

5

The timeout when sending the reply message to the reply channel; only applies if the reply channel can block, for example a bounded queue channel that is full.

6

When subscribed to a publish/subscribe channel, the order in which this endpoint will be invoked.

7

SmartLifecycle method.

8

SmartLifecycle method.

== WebSockets Support

=== Introduction

Starting with version 4.1 Spring Integration has introduced WebSocket support. It is based on architecture, infrastructure and API from the Spring Framework’s web-socket module. Therefore, many of Spring WebSocket’s components (e.g. SubProtocolHandler or WebSocketClient) and configuration options (e.g. @EnableWebSocketMessageBroker) can be reused within Spring Integration. For more information, please, refer to the Spring Framework WebSocket Support chapter in the Spring Framework reference manual.

[Note]Note

Since the Spring Framework WebSocket infrastructure is based on the Spring Messaging foundation and provides a basic Messaging framework based on the same MessageChannel s, MessageHandler s that Spring Integration uses, and some POJO-method annotation mappings, Spring Integration can be directly involved in a WebSocket flow, even without WebSocket adapters. For this purpose you can simply configure a Spring Integration @MessagingGateway with appropriate annotations:

@MessagingGateway
@Controller
public interface WebSocketGateway {

    @MessageMapping("/greeting")
    @SendToUser("/queue/answer")
    @Gateway(requestChannel = "greetingChannel")
    String greeting(String payload);

}

=== Overview

Since the WebSocket protocol is streaming by definition and we can send and receive messages to/from a WebSocket at the same time, we can simply deal with an appropriate WebSocketSession, regardless of being on the client or server side. To encapsulate the connection management and WebSocketSession registry, the IntegrationWebSocketContainer is provided with ClientWebSocketContainer and ServerWebSocketContainer implementations. Thanks to the WebSocket API and its implementation in the Spring Framework, with many extensions, the same classes are used on the server side as well as the client side (from a Java perspective, of course). Hence most connection and WebSocketSession registry options are the same on both sides. That allows us to reuse many configuration items and infrastructure hooks to build WebSocket applications on the server side as well as on the client side:

//Client side
@Bean
public WebSocketClient webSocketClient() {
    return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}

@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
    return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}

//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
    return new ServerWebSocketContainer("/endpoint").withSockJs();
}

The IntegrationWebSocketContainer is designed to achieve bidirectional messaging and can be shared between Inbound and Outbound Channel Adapters (see below), can be referenced only from one of them (when using one-way - sending or receiving - WebSocket messaging). It can be used without any Channel Adapter, but in this case, IntegrationWebSocketContainer only plays a role as the WebSocketSession registry.

[Note]Note

The ServerWebSocketContainer implements WebSocketConfigurer to register an internal IntegrationWebSocketContainer.IntegrationWebSocketHandler as an Endpoint under the provided paths and other server WebSocket options (such as HandshakeHandler or SockJS fallback) within the ServletWebSocketHandlerRegistry for the target vendor WebSocket Container. This registration is achieved with an infrastructural WebSocketIntegrationConfigurationInitializer component, which does the same as the @EnableWebSocket annotation. This means that using just @EnableIntegration (or any Spring Integration Namespace in the application context) you can omit the @EnableWebSocket declaration, because all WebSocket Endpoints are detected by the Spring Integration infrastructure.

=== WebSocket Inbound Channel Adapter

The WebSocketInboundChannelAdapter implements the receiving part of WebSocketSession interaction. It must be supplied with a IntegrationWebSocketContainer, and the adapter registers itself as a WebSocketListener to handle incoming messages and WebSocketSession events.

[Note]Note

Only one WebSocketListener can be registered in the IntegrationWebSocketContainer.

For WebSocket _sub-protocol_s, the WebSocketInboundChannelAdapter can be configured with SubProtocolHandlerRegistry as the second constructor argument. The adapter delegates to the SubProtocolHandlerRegistry to determine the appropriate SubProtocolHandler for the accepted WebSocketSession and to convert WebSocketMessage to a Message according to the sub-protocol implementation.

[Note]Note

By default, the WebSocketInboundChannelAdapter relies just only on the raw PassThruSubProtocolHandler implementation, which simply converts the WebSocketMessage to a Message.

The WebSocketInboundChannelAdapter accepts and sends to the underlying integration flow only Message s with SimpMessageType.MESSAGE or an empty simpMessageType header. All other Message types are handled through the ApplicationEvent s emitted from a SubProtocolHandler implementation (e.g. StompSubProtocolHandler).

On the server side WebSocketInboundChannelAdapter can be configured with the useBroker = true option, if the @EnableWebSocketMessageBroker configuration is present. In this case all non-MESSAGE Message types are delegated to the provided AbstractBrokerMessageHandler. In addition, if the Broker Relay is configured with destination prefixes, those Messages, which match to the Broker destinations, are routed to the AbstractBrokerMessageHandler, instead of to the outputChannel of the WebSocketInboundChannelAdapter.

If useBroker = false and received message is of SimpMessageType.CONNECT type, the WebSocketInboundChannelAdapter sends SimpMessageType.CONNECT_ACK message to the WebSocketSession immediately without sending it to the channel.

[Note]Note

Spring’s WebSocket Support allows the configuration of only one Broker Relay, hence we don’t require an AbstractBrokerMessageHandler reference, it is detected in the Application Context.

For more configuration options see Section 31.3.2, “TCP Failover Client Connection Factory”.

=== WebSocket Outbound Channel Adapter

The WebSocketOutboundChannelAdapter accepts Spring Integration messages from its MessageChannel, determines the WebSocketSession id from the MessageHeaders, retrieves the WebSocketSession from the provided IntegrationWebSocketContainer and delegates the conversion and sending WebSocketMessage work to the appropriate SubProtocolHandler from the provided SubProtocolHandlerRegistry.

On the client side, the WebSocketSession id message header isn’t required, because ClientWebSocketContainer deals only with a single connection and its WebSocketSession respectively.

To use the STOMP sub-protocol, this adapter should be configured with a StompSubProtocolHandler. Then you can send any STOMP message type to this adapter, using StompHeaderAccessor.create(StompCommand...) and a MessageBuilder, or just using a HeaderEnricher (see Section 7.2.2, “Header Enricher”).

For more configuration options see below.

=== WebSockets Namespace Support

Spring Integration WebSocket namespace includes several components described below. To include it in your configuration, simply provide the following namespace declaration in your application context configuration file:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/websocket
    https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
    ...
</beans>

<int-websocket:client-container>

<int-websocket:client-container
                	id=""  1
                	client=""  2
                	uri=""  3
                	uri-variables=""  4
                	origin=""  5
                	send-time-limit=""  6
                	send-buffer-size-limit=""  7
                	auto-startup=""  8
                	phase="">  9
                <int-websocket:http-headers>
                	<entry key="" value=""/>
                </int-websocket:http-headers>  10
</int-websocket:client-container>

1

The component bean name.

2

The WebSocketClient bean reference.

3

The uri or uriTemplate to the target WebSocket service. If it is used as a uriTemplate with URI variable placeholders, the uri-variables attribute is required.

4

Comma-separated values for the URI variable placeholders within the uri attribute value. The values are replaced into the placeholders according to the order in the uri. See UriComponents.expand(Object... uriVariableValues).

5

The Origin Handshake HTTP header value.

6

The WebSocket session send timeout limit. Defaults to 10000.

7

The WebSocket session send message size limit. Defaults to 524288.

8

Boolean value indicating whether this endpoint should start automatically. Defaults to false, assuming that this container will be started from the Section 31.3.2, “TCP Failover Client Connection Factory”.

9

The lifecycle phase within which this endpoint should start and stop. The lower the value the earlier this endpoint will start and the later it will stop. The default is Integer.MAX_VALUE. Values can be negative. See SmartLifeCycle.

10

A Map of HttpHeaders to be used with the Handshake request.

<int-websocket:server-container>

<int-websocket:server-container
					id=""  1
					path=""  2
					handshake-handler=""  3
					handshake-interceptors=""  4
					decorator-factories=""  5
					send-time-limit=""  6
					send-buffer-size-limit=""  7
					allowed-origins="">  8
				  <int-websocket:sockjs
						client-library-url=""   9
						stream-bytes-limit=""   10
						session-cookie-needed=""   11
						heartbeat-time=""   12
						disconnect-delay=""   13
						message-cache-size=""   14
						websocket-enabled=""   15
						scheduler=""   (16)
						message-codec=""   (17)
						transport-handlers=""  (18)
						suppress-cors="true"="" />  (19)
</int-websocket:server-container>

1

The component bean name.

2

A path (or comma-separated paths) that maps a particular request to a WebSocketHandler. Exact path mapping URIs (such as "/myPath") are supported as well as ant-style path patterns (such as /myPath/**).

3

The HandshakeHandler bean reference. Default to DefaultHandshakeHandler.

4

List of HandshakeInterceptor bean references.

5

Configure one or more factories (WebSocketHandlerDecoratorFactory) to decorate the handler used to process WebSocket messages. This may be useful for some advanced use cases, for example to allow Spring Security to forcibly close the WebSocket session when the corresponding HTTP session expires. See Spring Session Project for more information.

6

See the same option on the <int-websocket:client-container>.

7

See the same option on the <int-websocket:client-container>.

8

Configure allowed Origin header values. Multiple origins may be specified as a comma-separated list. This check is mostly designed for browser clients. There is noting preventing other types of client to modify the Origin header value. When SockJS is enabled and allowed origins are restricted, transport types that do not use Origin headers for cross origin requests (jsonp-polling, iframe-xhr-polling, iframe-eventsource and iframe-htmlfile) are disabled. As a consequence, IE6/IE7 are not supported and IE8/IE9 will only be supported without cookies. By default, all origins are allowed.

9

Transports with no native cross-domain communication (e.g. "eventsource", "htmlfile") must get a simple page from the "foreign" domain in an invisible iframe so that code in the iframe can run from a domain local to the SockJS server. Since the iframe needs to load the SockJS javascript client library, this property allows specifying where to load it from. By default this is set to point to https://d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js. However it can also be set to point to a URL served by the application. Note that it’s possible to specify a relative URL in which case the URL must be relative to the iframe URL. For example assuming a SockJS endpoint mapped to "/sockjs", and resulting iframe URL "/sockjs/iframe.html", then the The relative URL must start with "../../" to traverse up to the location above the SockJS mapping. In case of a prefix-based Servlet mapping one more traversal may be needed.

10

Minimum number of bytes that can be send over a single HTTP streaming request before it will be closed. Defaults to 128K (i.e. 128*1024 bytes).

11

The "cookie_needed" value in the response from the SockJs "/info" endpoint. This property indicates whether the use of a JSESSIONID cookie is required for the application to function correctly, e.g. for load balancing or in Java Servlet containers for the use of an HTTP session.

12

The amount of time in milliseconds when the server has not sent any messages and after which the server should send a heartbeat frame to the client in order to keep the connection from breaking. The default value is 25,000 (25 seconds).

13

The amount of time in milliseconds before a client is considered disconnected after not having a receiving connection, i.e. an active connection over which the server can send data to the client. The default value is 5000.

14

The number of server-to-client messages that a session can cache while waiting for the next HTTP polling request from the client. The default size is 100.

15

Some load balancers don’t support websockets. Set this option to false to disable the WebSocket transport on the server side. The default value is true.

(16)

The TaskScheduler bean reference; a new ThreadPoolTaskScheduler instance will be created if no value is provided. This scheduler instance will be used for scheduling heart-beat messages.

(17)

The SockJsMessageCodec bean reference to use for encoding and decoding SockJS messages. By default Jackson2SockJsMessageCodec is used requiring the Jackson library to be present on the classpath.

(18)

List of TransportHandler bean references.

(19)

The option to disable automatic addition of CORS headers for SockJS requests. The default value is false.

<int-websocket:outbound-channel-adapter>

<int-websocket:outbound-channel-adapter
                          id=""  1
                          channel=""  2
                          container=""  3
                          default-protocol-handler=""  4
                          protocol-handlers=""  5
                          message-converters=""  6
                          merge-with-default-converters=""  7
                          auto-startup=""  8
                          phase=""/>  9

1

The component bean name. If the channel attribute isn’t provided, a DirectChannel is created and registered with the application context with this id attribute as the bean name. In this case, the endpoint is registered with the bean name id + '.adapter'. And the MessageHandler is registered with the bean alias id + '.handler'.

2

Identifies the channel attached to this adapter.

3

The reference to the IntegrationWebSocketContainer bean, which encapsulates the low-level connection and WebSocketSession handling operations. Required.

4

Optional reference to a SubProtocolHandler instance. It is used when the client did not request a sub-protocol or it is a single protocol-handler. If this reference or protocol-handlers list aren’t provided the PassThruSubProtocolHandler is used by default.

5

List of SubProtocolHandler bean references for this Channel Adapter. If only a single bean reference is provided and a default-protocol-handler isn’t provided, that single SubProtocolHandler will be used as the default-protocol-handler. If this attribute or default-protocol-handler aren’t provided, the PassThruSubProtocolHandler is used by default.

6

List of MessageConverter bean references for this Channel Adapter.

7

Flag to indicate if the default converters should be registered after any custom converters. This flag is used only if message-converters are provided, otherwise all default converters will be registered. Defaults to false. The default converters are (in the order): StringMessageConverter, ByteArrayMessageConverter and MappingJackson2MessageConverter if the Jackson library is present on the classpath.

8

Boolean value indicating whether this endpoint should start automatically. Default to true.

9

The lifecycle phase within which this endpoint should start and stop. The lower the value the earlier this endpoint will start and the later it will stop. The default is Integer.MIN_VALUE. Values can be negative. See SmartLifeCycle.

<int-websocket:inbound-channel-adapter>

<int-websocket:inbound-channel-adapter
                            id=""  1
                            channel=""  2
                            error-channel=""  3
                            container=""  4
                            default-protocol-handler=""  5
                            protocol-handlers=""  6
                            message-converters=""  7
                            merge-with-default-converters=""  8
                            send-timeout=""  9
                            payload-type=""  10
                            use-broker=""  11
                            auto-startup=""  12
                            phase=""/>  13

1

The component bean name. If the channel attribute isn’t provided, a DirectChannel is created and registered with the application context with this id attribute as the bean name. In this case, the endpoint is registered with the bean name id + '.adapter'.

2

Identifies the channel attached to this adapter.

3

The MessageChannel bean reference to which the ErrorMessages should be sent.

4

See the same option on the <int-websocket:outbound-channel-adapter>.

5

See the same option on the <int-websocket:outbound-channel-adapter>.

6

See the same option on the <int-websocket:outbound-channel-adapter>.

7

See the same option on the <int-websocket:outbound-channel-adapter>.

8

See the same option on the <int-websocket:outbound-channel-adapter>.

9

Maximum amount of time in milliseconds to wait when sending a message to the channel if the channel may block. For example, a QueueChannel can block until space is available if its maximum capacity has been reached.

10

Fully qualified name of the java type for the target payload to convert from the incoming WebSocketMessage. Default to String.

11

Flag to indicate if this adapter will send non-MESSAGE WebSocketMessage s and messages with broker destinations to the AbstractBrokerMessageHandler from the application context. The Broker Relay configuration is required when this attribute is true. This attribute is used only on the server side. On the client side, it is ignored. Defaults to false.

12

See the same option on the <int-websocket:outbound-channel-adapter>.

13

See the same option on the <int-websocket:outbound-channel-adapter>.

=== ClientStompEncoder

Starting with version 4.3.13, the ClientStompEncoder is provided as an extension of standard StompEncoder for using on client side of the WebSocket Channel Adapters. An instance of the ClientStompEncoder must be injected into the StompSubProtocolHandler for proper client side message preparation. One of the problem of the default StompSubProtocolHandler that it was designed for the server side, so it updates the SEND stompCommand header into MESSAGE as it must be by the STOMP protocol from server side. If client doesn’t send its messages in the proper SEND web socket frame, some STOMP brokers won’t accept them. The purpose of the ClientStompEncoder, in this case, is to override stompCommand header to the SEND value before encoding the message to the byte[].

== Web Services Support

=== Outbound Web Service Gateways

To invoke a Web Service upon sending a message to a channel, there are two options - both of which build upon the Spring Web Services project: SimpleWebServiceOutboundGateway and MarshallingWebServiceOutboundGateway. The former will accept either a String or javax.xml.transform.Source as the message payload. The latter provides support for any implementation of the Marshaller and Unmarshaller interfaces. Both require a Spring Web Services DestinationProvider for determining the URI of the Web Service to be called.

 simpleGateway = new SimpleWebServiceOutboundGateway(destinationProvider);

 marshallingGateway = new MarshallingWebServiceOutboundGateway(destinationProvider, marshaller);
[Note]Note

When using the namespace support described below, you will only need to set a URI. Internally, the parser will configure a fixed URI DestinationProvider implementation. If you do need dynamic resolution of the URI at runtime, however, then the DestinationProvider can provide such behavior as looking up the URI from a registry. See the Spring Web Services DestinationProvider JavaDoc for more information about this strategy.

For more detail on the inner workings, see the Spring Web Services reference guide’s chapter covering client access as well as the chapter covering Object/XML mapping.

=== Inbound Web Service Gateways

To send a message to a channel upon receiving a Web Service invocation, there are two options again: SimpleWebServiceInboundGateway and MarshallingWebServiceInboundGateway. The former will extract a javax.xml.transform.Source from the WebServiceMessage and set it as the message payload. The latter provides support for implementation of the Marshaller and Unmarshaller interfaces. If the incoming web service message is a SOAP message the SOAP Action header will be added to the headers of the`Message` that is forwarded onto the request channel.

 simpleGateway = new SimpleWebServiceInboundGateway();
 simpleGateway.setRequestChannel(forwardOntoThisChannel);
 simpleGateway.setReplyChannel(listenForResponseHere); //Optional

 marshallingGateway = new MarshallingWebServiceInboundGateway(marshaller);
 //set request and optionally reply channel

Both gateways implement the Spring Web Services MessageEndpoint interface, so they can be configured with a MessageDispatcherServlet as per standard Spring Web Services configuration.

For more detail on how to use these components, see the Spring Web Services reference guide’s chapter covering creating a Web Service. The chapter covering Object/XML mapping is also applicable again.

To include the SimpleWebServiceInboundGateway and MarshallingWebServiceInboundGateway configurations to the Spring WS infrastructure you should add the EndpointMapping definition between MessageDispatcherServlet and the target MessageEndpoint implementations like you do that with normal Spring WS application. For this purpose (from Spring Integration perspective), the Spring WS provides these convenient EndpointMapping implementations:

  • o.s.ws.server.endpoint.mapping.UriEndpointMapping
  • o.s.ws.server.endpoint.mapping.PayloadRootQNameEndpointMapping
  • o.s.ws.soap.server.endpoint.mapping.SoapActionEndpointMapping
  • o.s.ws.server.endpoint.mapping.XPathPayloadEndpointMapping

The beans for these classes must be specified in the application context referencing to the SimpleWebServiceInboundGateway and/or MarshallingWebServiceInboundGateway bean definitions according to the WS mapping algorithm.

Please, refer to the Endpoint mappings for the more information.

=== Web Service Namespace Support

To configure an outbound Web Service Gateway, use the "outbound-gateway" element from the "ws" namespace:

<int-ws:outbound-gateway id="simpleGateway"
                     request-channel="inputChannel"
                     uri="https://example.org"/>
[Note]Note

Notice that this example does not provide a reply-channel. If the Web Service were to return a non-empty response, the Message containing that response would be sent to the reply channel provided in the request Message’s REPLY_CHANNEL header, and if that were not available a channel resolution Exception would be thrown. If you want to send the reply to another channel instead, then provide a reply-channel attribute on the outbound-gateway element.

[Tip]Tip

When invoking a Web Service that returns an empty response after using a String payload for the request Message, no reply Message will be sent by default. Therefore you don’t need to set a reply-channel or have a REPLY_CHANNEL header in the request Message. If for any reason you actually do want to receive the empty response as a Message, then provide the ignore-empty-responses attribute with a value of false (this only applies for Strings, because using a Source or Document object simply leads to a NULL response and will therefore never generate a reply Message).

To set up an inbound Web Service Gateway, use the "inbound-gateway":

<int-ws:inbound-gateway id="simpleGateway"
                    request-channel="inputChannel"/>

To use Spring OXM Marshallers and/or Unmarshallers, provide bean references. For outbound:

<int-ws:outbound-gateway id="marshallingGateway"
                     request-channel="requestChannel"
                     uri="https://example.org"
                     marshaller="someMarshaller"
                     unmarshaller="someUnmarshaller"/>

And for inbound:

<int-ws:inbound-gateway id="marshallingGateway"
                    request-channel="requestChannel"
                    marshaller="someMarshaller"
                    unmarshaller="someUnmarshaller"/>
[Note]Note

Most Marshaller implementations also implement the Unmarshaller interface. When using such a Marshaller, only the "marshaller" attribute is necessary. Even when using a Marshaller, you may also provide a reference for the "request-callback" on the outbound gateways.

For either outbound gateway type, a "destination-provider" attribute can be specified instead of the "uri" (exactly one of them is required). You can then reference any Spring Web Services DestinationProvider implementation (e.g. to lookup the URI at runtime from a registry).

For either outbound gateway type, the "message-factory" attribute can also be configured with a reference to any Spring Web Services WebServiceMessageFactory implementation.

For the simple inbound gateway type, the "extract-payload" attribute can be set to false to forward the entire WebServiceMessage instead of just its payload as a Message to the request channel. This might be useful, for example, when a custom Transformer works against the WebServiceMessage directly.

=== Outbound URI Configuration

For all URI-schemes supported by Spring Web Services (URIs and Transports) <uri-variable/> substitution is provided:

<ws:outbound-gateway id="gateway" request-channel="input"
        uri="https://springsource.org/{foo}-{bar}">
    <ws:uri-variable name="foo" expression="payload.substring(1,7)"/>
    <ws:uri-variable name="bar" expression="headers.x"/>
</ws:outbound-gateway>

<ws:outbound-gateway request-channel="inputJms"
        uri="jms:{destination}?deliveryMode={deliveryMode}&amp;priority={priority}"
        message-sender="jmsMessageSender">
    <ws:uri-variable name="destination" expression="headers.jmsQueue"/>
    <ws:uri-variable name="deliveryMode" expression="headers.deliveryMode"/>
    <ws:uri-variable name="priority" expression="headers.jms_priority"/>
</ws:outbound-gateway>

If a DestinationProvider is supplied, variable substitution is not supported and a configuration error will result if variables are provided.

Controlling URI Encoding

By default, the URL string is encoded (see UriComponentsBuilder) to the URI object before sending the request. In some scenarios with a non-standard URI it is undesirable to perform the encoding. Since version 4.1 the <ws:outbound-gateway/> provides an encode-uri attribute. To disable encoding the URL, this attribute should be set to false (by default it is true). If you wish to partially encode some of the URL, this can be achieved using an expression within a <uri-variable/>:

<ws:outbound-gateway url="http://somehost/%2f/fooApps?bar={param}" encode-uri="false">
          <http:uri-variable name="param"
            expression="T(org.apache.commons.httpclient.util.URIUtil)
                                             .encodeWithinQuery('Hello World!')"/>
</ws:outbound-gateway>

Note, encode-uri is ignored, if DestinationProvider is supplied.

=== WS Message Headers

The Spring Integration WebService Gateways will map the SOAP Action header automatically. It will be copied by default to and from Spring Integration MessageHeaders using the DefaultSoapHeaderMapper.

Of course, you can pass in your own implementation of SOAP specific header mappers, as the gateways have respective properties to support that.

Any user-defined SOAP headers will NOT be copied to or from a SOAP Message, unless explicitly specified by the requestHeaderNames and/or replyHeaderNames properties of the DefaultSoapHeaderMapper.

When using the XML namespace for configuration, these properties can be set using the mapped-request-headers and mapped-reply-headers, or a custom mapper can be provided using the header-mapper attribute.

[Tip]Tip

When mapping user-defined headers, the values can also contain simple wildcard patterns (e.g. "foo*" or "*foo") to be matched. For example, if you need to copy all user-defined headers simply use the wildcard character *.

Starting with version 4.1, the AbstractHeaderMapper (a DefaultSoapHeaderMapper superclass) allows the NON_STANDARD_HEADERS token to be configured for the requestHeaderNames and/or replyHeaderNames properties (in addition to existing STANDARD_REQUEST_HEADERS and STANDARD_REPLY_HEADERS) to map all user-defined headers. Note, it is recommended to use the combination like this STANDARD_REPLY_HEADERS, NON_STANDARD_HEADERS instead of a *, to avoid mapping of request headers to the reply.

Starting with version 4.3, patterns in the header mappings can be negated by preceding the pattern with !. Negated patterns get priority, so a list such as STANDARD_REQUEST_HEADERS,foo,ba*,!bar,!baz,qux,!foo will NOT map foo (nor bar nor baz); the standard headers plus bad, qux will be mapped.

[Important]Important

If you have a user defined header that begins with ! that you do wish to map, you need to escape it with \ thus: STANDARD_REQUEST_HEADERS,\!myBangHeader and it WILL be mapped.

Inbound SOAP headers (request headers for the inbound gateway, reply-headers for the outbound gateway) are mapped as SoapHeaderElement objects. The contents can be explored by accessing the Source:

<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
    <soapenv:Header>
        <auth>
            <username>user</username>
            <password>pass</password>
        </auth>
        <bar>BAR</bar>
        <baz>BAZ</baz>
        <qux>qux</qux>
    </soapenv:Header>
    <soapenv:Body>
        ...
    </soapenv:Body>
</soapenv:Envelope>

If mapped-request-headers is "auth, ba*", the auth, bar and baz headers are mapped but qux is not.

...
SoapHeaderElement header = (SoapHeaderElement) headers.get("auth");
DOMSource source = (DOMSource) header.getSource();
NodeList nodeList = source.getNode().getChildNodes();
assertEquals("username", nodeList.item(0).getNodeName());
assertEquals("user", nodeList.item(0).getFirstChild().getNodeValue());
...

== XML Support - Dealing with XML Payloads

=== Introduction

Spring Integration’s XML support extends the core of Spring Integration with the following components:

These components are designed to make working with XML messages in Spring Integration simple. The provided messaging components are designed to work with XML represented in a range of formats including instances of java.lang.String, org.w3c.dom.Document and javax.xml.transform.Source. It should be noted however that where a DOM representation is required, for example in order to evaluate an XPath expression, the String payload will be converted into the required type and then converted back again to String. Components that require an instance of DocumentBuilder will create a namespace-aware instance if one is not provided. In cases where you require greater control over document creation, you can provide an appropriately configured instance of DocumentBuilder.

=== Namespace Support

All components within the Spring Integration XML module provide namespace support. In order to enable namespace support, you need to import the respective schema for the Spring Integration XML Module. A typical setup is shown below:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-xml="http://www.springframework.org/schema/integration/xml"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/xml
    https://www.springframework.org/schema/integration/xml/spring-integration-xml.xsd">
</beans>

==== XPath Expressions

Many of the components within the Spring Integration XML module work with XPath Expressions. Each of those components will either reference an XPath Expression that has been defined as top-level element or via a nested <xpath-expression/> element.

All forms of XPath expressions result in the creation of an XPathExpression using the Spring org.springframework.xml.xpath.XPathExpressionFactory. When creating XPath expressions, the best XPath implementation that is available on the classpath is being used, either JAXP 1.3+ or Jaxen, whereby JAXP is preferred.

[Note]Note

Spring Integration under the covers uses the XPath functionality as provided by the Spring Web Services project (https://www.spring.io/spring-ws). Specifically, Spring Web Services' XML module (spring-xml-x.x.x.jar) is being used. Therefore, for a deeper understanding, please refer to the respective documentation as well at: https://docs.spring.io/spring-ws/docs/current/reference/html/common.html#xpath

Here is an overview of all available configuration parameters of the xpath-expression element:

<int-xml:xpath-expression expression="" 1
          id=""              2
          namespace-map=""   3
          ns-prefix=""       4
          ns-uri="">         5
    <map></map>              6
</int-xml:xpath-expression>

1

Defines an XPath xpression. Required.

2

The Identifier of the underlying bean definition. Will be an instance of `org.springframework.xml.xpath.XPathExpression`Optional.

3

Reference to a map containing namespaces. The key of the map defines the namespace prefix and the value of the map sets the namespace URI. It is not valid to specify both this attribute and the map sub element, or setting the ns-prefix and ns-uri attribute. Optional.

4

Allows you to set the namspace prefix directly as and attribute on the XPath expression element. If you set ns-prefix, you must also set the ns-uri attribute. Optional.

5

Allows you to set the namspace URI directly as an attribute on the XPath expression element. If you set ns-uri, you must also set the ns-prefix attribute. Optional.

6

Defines a map containing namespaces. Only one map child element is allowed. The key of the map defines the namespace prefix and the value of the map sets the namespace URI.

It is not valid to specify both this sub-element and the map attribute, or setting the ns-prefix and ns-uri attributes. Optional.

===== Providing Namespaces (Optional) to XPath Expressions

For the XPath Expression Element, namespace information can be optionally provided as configuration parameters. As such, namespaces can be defined using one of the following 3 choices:

  • Reference a map using the namespace-map attribute
  • Provide a map of namespaces using the map sub-element
  • Specifying the ns-prefix and the ns-uri attribute

All three options are mutually exclusive. Only one option can be set.

Below, please find several different usage examples on how to use XPath expressions using the XML namespace support including the various option for setting the XML namespaces as discussed above.

<int-xml:xpath-filter id="filterReferencingXPathExpression"
                      xpath-expression-ref="refToXpathExpression"/>

<int-xml:xpath-expression id="refToXpathExpression" expression="/name"/>

<int-xml:xpath-filter id="filterWithoutNamespace">
    <int-xml:xpath-expression expression="/name"/>
</int-xml:xpath-filter>

<int-xml:xpath-filter id="filterWithOneNamespace">
    <int-xml:xpath-expression expression="/ns1:name"
                              ns-prefix="ns1" ns-uri="www.example.org"/>
</int-xml:xpath-filter>

<int-xml:xpath-filter id="filterWithTwoNamespaces">
    <int-xml:xpath-expression expression="/ns1:name/ns2:type">
        <map>
            <entry key="ns1" value="www.example.org/one"/>
            <entry key="ns2" value="www.example.org/two"/>
        </map>
    </int-xml:xpath-expression>
</int-xml:xpath-filter>

<int-xml:xpath-filter id="filterWithNamespaceMapReference">
    <int-xml:xpath-expression expression="/ns1:name/ns2:type"
                              namespace-map="defaultNamespaces"/>
</int-xml:xpath-filter>

<util:map id="defaultNamespaces">
    <util:entry key="ns1" value="www.example.org/one"/>
    <util:entry key="ns2" value="www.example.org/two"/>
</util:map>

===== Using XPath Expressions with Default Namespaces

When working with default namespaces, you may run into situations that behave differently than originally expected. Let’s assume we have the following XML document:

<?xml version="1.0" encoding="UTF-8"?>
<order>
    <orderItem>
        <isbn>0321200683</isbn>
        <quantity>2</quantity>
    </orderItem>
    <orderItem>
        <isbn>1590596439</isbn>
        <quantity>1</quantity>
    </orderItem>
</order>

This document is not declaring any namespace. Therefore, applying the following XPath Expression will work as expected:

<int-xml:xpath-expression expression="/order/orderItem" />

You might expect that the same expression will also work for the following XML file. It looks exactly the same as the previous example but in addition it also declares a default namespace:

http://www.example.org/orders

<?xml version="1.0" encoding="UTF-8"?>
<order xmlns="http://www.example.org/orders">
	<orderItem>
		<isbn>0321200683</isbn>
		<quantity>2</quantity>
	</orderItem>
	<orderItem>
		<isbn>1590596439</isbn>
		<quantity>1</quantity>
	</orderItem>
</order>

However, the XPath Expression used previously will fail in this case.

In order to solve this issue, you must provide a namespace prefix and a namespace URI using either the ns-prefix and ns-uri attribute or by providing a namespace-map attribute instead. The namespace URI must match the namespace declared in your XML document, which in this example is http://www.example.org/orders.

The namespace prefix, however, can be arbitrarily chosen. In fact, just providing an empty String will actually work (Null is not allowed). In the case of a namespace prefix consisting of an empty String, your Xpath Expression will use a colon (":") to indicate the default namespace. If you leave the colon off, the XPath expression will not match. The following XPath Expression will match against the XML document above:

<int-xml:xpath-expression expression="/:order/:orderItem"
    ns-prefix="" ns-uri="https://www.example.org/prodcuts"/>

Of course you can also provide any other arbitrarily chosen namespace prefix. The following XPath expression using the myorder namespace prefix will match also:

<int-xml:xpath-expression expression="/myorder:order/myorder:orderItem"
    ns-prefix="myorder" ns-uri="https://www.example.org/prodcuts"/>

It is important to remember that the namespace URI is the really important piece of information to declare, not the prefix itself. The Jaxen FAQ summarizes the point very well:

In XPath 1.0, all unprefixed names are unqualified. There is no requirement that the prefixes used in the XPath expression are the same as the prefixes used in the document being queried. Only the namespace URIs need to match, not the prefixes.

=== Transforming XML Payloads

==== Configuring Transformers as Beans

This section will explain the workings of the following transformers and how to configure them as beans:

All of the provided XML transformers extend AbstractTransformer or AbstractPayloadTransformer and therefore implement Transformer. When configuring XML transformers as beans in Spring Integration, you would normally configure the Transformer in conjunction with a MessageTransformingHandler. This allows the transformer to be used as an Endpoint. Finally, the namespace support will be discussed, which allows for the simple configuration of the transformers as elements in XML.

===== UnmarshallingTransformer

An UnmarshallingTransformer allows an XML Source to be unmarshalled using implementations of the Spring OXM Unmarshaller. Spring’s Object/XML Mapping support provides several implementations supporting marshalling and unmarshalling using JAXB, Castor and JiBX amongst others. The unmarshaller requires an instance of Source. If the message payload is not an instance of Source, conversion will be attempted. Currently String, File and org.w3c.dom.Document payloads are supported. Custom conversion to a Source is also supported by injecting an implementation of a SourceFactory.

[Note]Note

If a SourceFactory is not set explicitly, the property on the UnmarshallingTransformer will by default be set to a DomSourceFactory.

<bean id="unmarshallingTransformer" class="o.s.i.xml.transformer.UnmarshallingTransformer">
    <constructor-arg>
        <bean class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
            <property name="contextPath" value="org.example" />
        </bean>
    </constructor-arg>
</bean>

===== MarshallingTransformer

The MarshallingTransformer allows an object graph to be converted into XML using a Spring OXM Marshaller. By default the MarshallingTransformer will return a DomResult. However, the type of result can be controlled by configuring an alternative ResultFactory such as StringResultFactory. In many cases it will be more convenient to transform the payload into an alternative XML format. To achieve this, configure a ResultTransformer. Two implementations are provided, one which converts to String and another which converts to Document.

<bean id="marshallingTransformer" class="o.s.i.xml.transformer.MarshallingTransformer">
    <constructor-arg>
        <bean class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
            <property name="contextPath" value="org.example"/>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="o.s.i.xml.transformer.ResultToDocumentTransformer"/>
    </constructor-arg>
</bean>

By default, the MarshallingTransformer will pass the payload Object to the Marshaller, but if its boolean extractPayload property is set to false, the entire Message instance will be passed to the Marshaller instead. That may be useful for certain custom implementations of the Marshaller interface, but typically the payload is the appropriate source Object for marshalling when delegating to any of the various out-of-the-box Marshaller implementations.

===== XsltPayloadTransformer

XsltPayloadTransformer transforms XML payloads using Extensible Stylesheet Language Transformations (XSLT). The transformer’s constructor requires an instance of either Resource or Templates to be passed in. Passing in a Templates instance allows for greater configuration of the TransformerFactory used to create the template instance.

As with the UnmarshallingTransformer, the XsltPayloadTransformer will do the actual XSLT transformation using instances of Source. Therefore, if the message payload is not an instance of Source, conversion will be attempted. String and Document payloads are supported directly.

Custom conversion to a Source is also supported by injecting an implementation of a SourceFactory.

[Note]Note

If a SourceFactory is not set explicitly, the property on the XsltPayloadTransformer will by default be set to a DomSourceFactory.

By default, the XsltPayloadTransformer will create a message with a Result payload, similar to the XmlPayloadMarshallingTransformer. This can be customised by providing a ResultFactory and/or a ResultTransformer.

<bean id="xsltPayloadTransformer" class="o.s.i.xml.transformer.XsltPayloadTransformer">
  <constructor-arg value="classpath:org/example/xsl/transform.xsl"/>
  <constructor-arg>
    <bean class="o.s.i.xml.transformer.ResultToDocumentTransformer"/>
  </constructor-arg>
</bean>

Starting with Spring Integration 3.0, you can now specify the transformer factory class name using a constructor argument. This is configured using the transformer-factory-class attribute when using the namespace.

===== ResultTransformers

Both the MarshallingTransformer and the XsltPayloadTransformer allow you to specify a ResultTransformer. Thus, if the Marshalling or XSLT transformation returns a Result, than you have the option to also use a ResultTransformer to transform the Result into another format. Spring Integration provides 2 concrete`ResultTransformer` implementations:

Using ResultTransformers with the MarshallingTransformer

By default, the MarshallingTransformer will always return a Result. By specifying a ResultTransformer, you can customize the type of payload returned.

Using ResultTransformers with the XsltPayloadTransformer

The behavior is slighly more complex for the XsltPayloadTransformer. By default, if the input payload is an instance of String or Document the resultTransformer property is ignored.

However, if the input payload is a Source or any other type, then the resultTransformer property is applied. Additionally, you can set the property alwaysUseResultFactory to true, which will also cause the specified resultTransformer to being used.

For more information and examples, please see Section 31.3.2, “TCP Failover Client Connection Factory”

==== Namespace Support for XML Transformers

Namespace support for all XML transformers is provided in the Spring Integration XML namespace, a template for which can be seen below. The namespace support for transformers creates an instance of either`EventDrivenConsumer` or PollingConsumer according to the type of the provided input channel. The namespace support is designed to reduce the amount of XML configuration by allowing the creation of an endpoint and transformer using one element.

UnmarshallingTransformer

The namespace support for the UnmarshallingTransformer is shown below. Since the namespace is now creating an endpoint instance rather than a transformer, a poller can also be nested within the element to control the polling of the input channel.

<int-xml:unmarshalling-transformer id="defaultUnmarshaller"
    input-channel="input" output-channel="output"
    unmarshaller="unmarshaller"/>

<int-xml:unmarshalling-transformer id="unmarshallerWithPoller"
    input-channel="input" output-channel="output"
    unmarshaller="unmarshaller">
    <int:poller fixed-rate="2000"/>
<int-xml:unmarshalling-transformer/>

MarshallingTransformer

The namespace support for the marshalling transformer requires an input-channel, output-channel and a reference to a marshaller. The optional result-type attribute can be used to control the type of result created. Valid values are StringResult or DomResult (the default).

<int-xml:marshalling-transformer
     input-channel="marshallingTransformerStringResultFactory"
     output-channel="output"
     marshaller="marshaller"
     result-type="StringResult" />

<int-xml:marshalling-transformer
    input-channel="marshallingTransformerWithResultTransformer"
    output-channel="output"
    marshaller="marshaller"
    result-transformer="resultTransformer" />

<bean id="resultTransformer" class="o.s.i.xml.transformer.ResultToStringTransformer"/>

Where the provided result types are not sufficient, a reference to a custom implementation of ResultFactory can be provided as an alternative to setting the result-type attribute, using the result-factory attribute. The attributes result-type and result-factory are mutually exclusive.

[Note]Note

Internally, the result types StringResult and DomResult are represented by the ResultFactory s StringResultFactory and DomResultFactory respectively.

XsltPayloadTransformer

Namespace support for the XsltPayloadTransformer allows you to either pass in a Resource, in order to create the Templates instance, or alternatively, you can pass in a precreated Templates instance as a reference. In common with the marshalling transformer, the type of the result output can be controlled by specifying either the result-factory or result-type attribute. A result-transformer attribute can also be used to reference an implementation of ResultTransformer where conversion of the result is required before sending.

[Important]Important

If you specify the result-factory or the result-type attribute, then the alwaysUseResultFactory property on the underlying XsltPayloadTransformer will be set to true by the XsltPayloadTransformerParser.

<int-xml:xslt-transformer id="xsltTransformerWithResource"
    input-channel="withResourceIn" output-channel="output"
    xsl-resource="org/springframework/integration/xml/config/test.xsl"/>

<int-xml:xslt-transformer id="xsltTransformerWithTemplatesAndResultTransformer"
    input-channel="withTemplatesAndResultTransformerIn" output-channel="output"
    xsl-templates="templates"
    result-transformer="resultTransformer"/>

Often you may need to have access to Message data, such as the Message Headers, in order to assist with transformation. For example, you may need to get access to certain Message Headers and pass them on as parameters to a transformer (e.g., transformer.setParameter(..)). Spring Integration provides two convenient ways to accomplish this, as illustrated in following example:

<int-xml:xslt-transformer id="paramHeadersCombo"
    input-channel="paramHeadersComboChannel" output-channel="output"
    xsl-resource="classpath:transformer.xslt"
    xslt-param-headers="testP*, *foo, bar, baz">

    <int-xml:xslt-param name="helloParameter" value="hello"/>
    <int-xml:xslt-param name="firstName" expression="headers.fname"/>
</int-xml:xslt-transformer>

If message header names match 1:1 to parameter names, you can simply use xslt-param-headers attribute. There you can also use wildcards for simple pattern matching, which supports the following simple pattern styles: "xxx*", "xxx", "*xxx" and "xxx*yyy".

You can also configure individual Xslt parameters via the <xslt-param/> sub element. There you can use either the expression or value attribute. The expression attribute should be any valid SpEL expression with Message being the root object of the expression evaluation context. The value attribute, just like any value in Spring beans, allows you to specify simple scalar values. You can also use property placeholders (e.g., ${some.value}). So as you can see, with the expression and value attribute, Xslt parameters could now be mapped to any accessible part of the Message as well as any literal value.

Starting with Spring Integration 3.0, you can now specify the transformer factory class name using the transformer-factory-class attribute.

==== Namespace Configuration and ResultTransformers

The usage of ResultTransformers was previously introduced in Section 31.3.2, “TCP Failover Client Connection Factory”. The following example illustrates several special use-cases using XML namespace configuration. First, we define the ResultTransformer:

<beans:bean id="resultToDoc" class="o.s.i.xml.transformer.ResultToDocumentTransformer"/>

This ResultTransformer will accept either a StringResult or a DOMResult as input and converts the input into a Document.

Now, let’s declare the transformer:

<int-xml:xslt-transformer input-channel="in" output-channel="fahrenheitChannel"
    xsl-resource="classpath:noop.xslt" result-transformer="resultToDoc"/>

If the incoming message’s payload is of type Source, then as first step the Result is determined using the ResultFactory. As we did not specify a ResultFactory, the default DomResultFactory is used, meaning that the transformation will yield a DomResult.

However, as we specified a ResultTransformer, it will be used and the resulting Message payload will be of type`Document`.

[Important]Important

If the incoming message’s payload is of type String, the payload after the Xslt transformation will be a String. Similarly, if the incoming message’s payload is of type Document, the payload after the Xslt transformation will be a`Document`. The specified ResultTransformer will be ignored with String or Document payloads.

If the message payload is neither a Source, String or Document, as a fallback option, it is attempted to create a`Source` using the default SourceFactory. As we did not specify a SourceFactory explicitly using the source-factory attribute, the default DomSourceFactory is used. If successful, the XSLT transformation is executed as if the payload was of type Source, which we described in the previous paragraphs.

[Note]Note

The DomSourceFactory supports the creation of a DOMSource from a either Document, File or String payloads.

The next transformer declaration adds a result-type attribute using StringResult as its value. First, the result-type is internally represented by the StringResultFactory. Thus, you could have also added a reference to a StringResultFactory, using the result-factory attribute, which would haven been the same.

<int-xml:xslt-transformer input-channel="in" output-channel="fahrenheitChannel"
		xsl-resource="classpath:noop.xslt" result-transformer="resultToDoc"
		result-type="StringResult"/>

Because we are using a ResultFactory, the alwaysUseResultFactory property of the XsltPayloadTransformer class will be implicitly set to true. Consequently, the referenced ResultToDocumentTransformer will be used.

Therefore, if you transform a payload of type String, the resulting payload will be of type Document.

XsltPayloadTransformer and <xsl:output method="text"/>

<xsl:output method="text"/> tells the XSLT template to only produce text content from the input source. In this particular case there is no reason to have a DomResult. Therefore, the XsltPayloadTransformer defaults to StringResult if the output property called method of the underlying javax.xml.transform.Transformer returns "text". This coercion is performed independent from the inbound payload type. Keep in mind that this [quote] smart behavior is only available, if the result-type or result-factory attributes aren’t provided for the respective <int-xml:xslt-transformer> component.

=== Transforming XML Messages Using XPath

When it comes to message transformation XPath is a great way to transform Messages that have XML payloads by defining XPath transformers via <xpath-transformer/> element.

Simple XPath transformation

Let’s look at the following transformer configuration:

<int-xml:xpath-transformer input-channel="inputChannel" output-channel="outputChannel"
      xpath-expression="/person/@name" />

...and Message

Message<?> message =
  MessageBuilder.withPayload("<person name='John Doe' age='42' married='true'/>").build();

After sending this message to the inputChannel the XPath transformer configured above will transform this XML Message to a simple Message with payload of John Doe all based on the simple XPath Expression specified in the xpath-expression attribute.

XPath also has the capability to perform simple conversion of extracted elements to a desired type. Valid return types are defined in javax.xml.xpath.XPathConstants and follows the conversion rules specified by the javax.xml.xpath.XPath interface.

The following constants are defined by the XPathConstants class: BOOLEAN, DOM_OBJECT_MODEL, NODE, NODESET, NUMBER, STRING

You can configure the desired type by simply using the evaluation-type attribute of the <xpath-transformer/> element.

<int-xml:xpath-transformer input-channel="numberInput" xpath-expression="/person/@age"
                           evaluation-type="NUMBER_RESULT" output-channel="output"/>

<int-xml:xpath-transformer input-channel="booleanInput"
                           xpath-expression="/person/@married = 'true'"
                           evaluation-type="BOOLEAN_RESULT" output-channel="output"/>

Node Mappers

If you need to provide custom mapping for the node extracted by the XPath expression simply provide a reference to the implementation of the org.springframework.xml.xpath.NodeMapper - an interface used by XPathOperations implementations for mapping Node objects on a per-node basis. To provide a reference to a NodeMapper simply use node-mapper attribute:

<int-xml:xpath-transformer input-channel="nodeMapperInput" xpath-expression="/person/@age"
                           node-mapper="testNodeMapper" output-channel="output"/>

...and Sample NodeMapper implementation:

class TestNodeMapper implements NodeMapper {
  public Object mapNode(Node node, int nodeNum) throws DOMException {
    return node.getTextContent() + "-mapped";
  }
}

XML Payload Converter

You can also use an implementation of the org.springframework.integration.xml.XmlPayloadConverter to provide more granular transformation:

<int-xml:xpath-transformer input-channel="customConverterInput"
                           output-channel="output" xpath-expression="/test/@type"
                           converter="testXmlPayloadConverter" />

...and Sample XmlPayloadConverter implementation:

class TestXmlPayloadConverter implements XmlPayloadConverter {
  public Source convertToSource(Object object) {
    throw new UnsupportedOperationException();
  }
  //
  public Node convertToNode(Object object) {
    try {
      return DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(
          new InputSource(new StringReader("<test type='custom'/>")));
    }
    catch (Exception e) {
      throw new IllegalStateException(e);
    }
  }
  //
  public Document convertToDocument(Object object) {
    throw new UnsupportedOperationException();
  }
}

The DefaultXmlPayloadConverter is used if this reference is not provided, and it should be sufficient in most cases since it can convert from Node, Document, Source, File, String, InputStream and byte[] typed payloads. If you need to extend beyond the capabilities of that default implementation, then an upstream Transformer is probably a better option than providing a reference to a custom implementation of this strategy here.

=== Splitting XML Messages

XPathMessageSplitter supports messages with either String or Document payloads. The splitter uses the provided XPath expression to split the payload into a number of nodes. By default this will result in each Node instance becoming the payload of a new message. Where it is preferred that each message be a Document the createDocuments flag can be set. Where a String payload is passed in the payload will be converted then split before being converted back to a number of String messages. The XPath splitter implements MessageHandler and should therefore be configured in conjunction with an appropriate endpoint (see the namespace support below for a simpler configuration alternative).

<bean id="splittingEndpoint"
      class="org.springframework.integration.endpoint.EventDrivenConsumer">
    <constructor-arg ref="orderChannel" />
    <constructor-arg>
        <bean class="org.springframework.integration.xml.splitter.XPathMessageSplitter">
            <constructor-arg value="/order/items" />
            <property name="documentBuilder" ref="customisedDocumentBuilder" />
            <property name="outputChannel" ref="orderItemsChannel" />
        </bean>
    </constructor-arg>
</bean>

XPath splitter namespace support allows the creation of a Message Endpoint with an input channel and output channel.

<!-- Split the order into items creating a new message for each item node -->
<int-xml:xpath-splitter id="orderItemSplitter"
                       input-channel="orderChannel"
                       output-channel="orderItemsChannel">
    <int-xml:xpath-expression expression="/order/items"/>
</int-xml:xpath-splitter>

<!-- Split the order into items creating a new document for each item-->
<int-xml:xpath-splitter id="orderItemDocumentSplitter"
                       input-channel="orderChannel"
                       output-channel="orderItemsChannel"
                       create-documents="true">
    <int-xml:xpath-expression expression="/order/items"/>
    <int:poller fixed-rate="2000"/>
</int-xml:xpath-splitter>

Starting with version 4.2, the XPathMessageSplitter exposes outputProperties (such as OutputKeys.OMIT_XML_DECLARATION) property for the javax.xml.transform.Transformer instances when a request payload isn’t of org.w3c.dom.Node type:

<util:properties id="outputProperties">
	<beans:prop key="#{T (javax.xml.transform.OutputKeys).OMIT_XML_DECLARATION}">yes</beans:prop>
</util:properties>

<xpath-splitter input-channel="input"
             output-properties="outputProperties">
    <xpath-expression expression="/orders/order"/>
</xpath-splitter>

Starting with version 4.2, the XPathMessageSplitter exposes an iterator option as a boolean flag (defaults to true). This allows the "streaming" of split nodes in the downstream flow. With the iterator mode, each node is transformed while iterating. When false, all entries are transformed first, before the split nodes start being sent to the output channel (transform, send, transform, send Vs. transform, transform, send, send). See Section 6.3, “Splitter” for more information.

=== Routing XML Messages Using XPath

Similar to SpEL-based routers, Spring Integration provides support for routing messages based on XPath expressions, allowing you to create a Message Endpoint with an input channel but no output channel. Instead, one or more output channels are determined dynamically.

<int-xml:xpath-router id="orderTypeRouter" input-channel="orderChannel">
    <int-xml:xpath-expression expression="/order/type"/>
</int-xml:xpath-router>
[Note]Note

For an overview of attributes that are common among Routers, please see chapter: Section 6.1.2, “Common Router Parameters”

Internally XPath expressions will be evaluated as NODESET type and converted to a List<String> representing channel names. Typically such a list will contain a single channel name. However, based on the results of an XPath Expression, the XPath router can also take on the characteristics of a Recipient List Router if the XPath Expression returns more then one value. In that case, the List<String> will contain more then one channel name and consequently Messages will be sent to all channels in the list.

Thus, assuming that the XML file passed to the router configured below contains many responder sub-elements representing channel names, the message will be sent to all of those channels.

<!-- route the order to all responders-->
<int-xml:xpath-router id="responderRouter" input-channel="orderChannel">
    <int-xml:xpath-expression expression="/request/responders"/>
</int-xml:xpath-router>

If the returned values do not represent the channel names directly, additional mapping parameters can be specified, in order to map those returned values to actual channel names. For example if the /request/responders expression results in two values responderA and responderB but you don’t want to couple the responder names to channel names, you may provide additional mapping configuration such as the following:

<!-- route the order to all responders-->
<int-xml:xpath-router id="responderRouter" input-channel="orderChannel">
    <int-xml:xpath-expression expression="/request/responders"/>
    <int-xml:mapping value="responderA" channel="channelA"/>
    <int-xml:mapping value="responderB" channel="channelB"/>
</int-xml:xpath-router>

As already mentioned, the default evaluation type for XPath expressions is NODESET, which is converted to a List<String> of channel names, therefore handling single channel scenarios as well as multiple ones.

Nonetheless, certain XPath expressions may evaluate as String type from the very beginning. Take for example the following XPath Expression:

name(./node())

This expression will return the name of the root node. It will resulting in an exception, if the default evaluation type NODESET is being used.

For these scenarious, you may use the evaluate-as-string attribute, which will allow you to manage the evaluation type. It is FALSE by default, however if set to TRUE, the String evaluation type will be used.

[Note]Note

To provide some background information: XPath 1.0 specifies 4 data types:

  • Node-sets
  • Strings
  • Number
  • Boolean

When the XPath Router evaluates expressions using the optional evaluate-as-string attribute, the return value is determined per the string() function as defined in the XPath specification. This means that if the expression selects multiple nodes, it will return the string value of the first node.

For further information, please see:

For example if we want to route based on the name of the root node, we can use the following configuration:

<int-xml:xpath-router id="xpathRouterAsString"
        input-channel="xpathStringChannel"
        evaluate-as-string="true">
    <int-xml:xpath-expression expression="name(./node())"/>
</int-xml:xpath-router>

==== XML Payload Converter

For XPath Routers, you can also specify the Converter to use when converting payloads prior to XPath evaluation. As such, the XPath Router supports custom implementations of the XmlPayloadConverter strategy, and when configuring an xpath-router element in XML, a reference to such an implementation may be provided via the converter attribute.

If this reference is not explicitly provided, the DefaultXmlPayloadConverter is used. It should be sufficient in most cases, since it can convert from Node, Document, Source, File, and String typed payloads. If you need to extend beyond the capabilities of that default implementation, then an upstream Transformer is generally a better option in most cases, rather than providing a reference to a custom implementation of this strategy here.

=== XPath Header Enricher

The XPath Header Enricher defines a Header Enricher Message Transformer that evaluates XPath expressions against the message payload and inserts the result of the evaluation into a messsage header.

Please see below for an overview of all available configuration parameters:

<int-xml:xpath-header-enricher default-overwrite="true"    1
                               id=""                       2
                               input-channel=""            3
                               output-channel=""           4
                               should-skip-nulls="true">   5
    <int:poller></int:poller>                              6
    <int-xml:header name=""                                7
                    evaluation-type="STRING_RESULT"        8
                    header-type="int"                      9
                    overwrite="true"                       10
                    xpath-expression=""                    11
                    xpath-expression-ref=""/>              12
</int-xml:xpath-header-enricher>

1

Specify the default boolean value for whether to overwrite existing header values. This will only take effect for sub-elements that do not provide their own overwrite attribute. If the default- overwrite attribute is not provided, then the specified header values will NOT overwrite any existing ones with the same header names. Optional.

2

Id for the underlying bean definition. Optional.

3

The receiving Message channel of this endpoint. Optional.

4

Channel to which enriched messages shall be send to. Optional.

5

Specify whether null values, such as might be returned from an expression evaluation, should be skipped. The default value is true. Set this to false if a null value should trigger removal of the corresponding header instead.Optional.

6

Optional.

7

The name of the header to be enriched. Mandatory.

8

The result type expected from the XPath evaluation. This will be the type of the header value, if there is no header-type attribute provided. The following values are allowed: BOOLEAN_RESULT, STRING_RESULT, NUMBER_RESULT, NODE_RESULT and NODE_LIST_RESULT. Defaults internally to XPathEvaluationType.STRING_RESULT if not set. Optional.

9

The fully qualified class name for the header value type. The result of XPath evaluation will be converted to this type using the ConversionService. This allows, for example, a NUMBER_RESULT (a double) to be converted to an Integer. The type can be declared as a primitive (e.g. int) but the result will always be the equivalent wrapper class (e.g. Integer). The same integration ConversionService discussed in Section 8.1.6, “Payload Type Conversion” is used for the conversion, so conversion to custom types is supported, by adding a custom converter to the service.Optional.

10

Boolean value to indicate whether this header value should overwrite an existing header value for the same name if already present on the input Message.

11

The XPath Expression as a String. Either this attribute or xpath-expression-ref must be provided, but not both.

12

The XPath Expression reference. Either this attribute or xpath-expression must be provided, but not both.

=== Using the XPath Filter

This component defines an XPath-based Message Filter. Under the covers this components uses a MessageFilter that wraps an instance of AbstractXPathMessageSelector.

[Note]Note

Please also refer to the chapter on Message Filters for further details.

In order to use the XPath Filter you must as a minimum provide an XPath Expression either by declaring the xpath-expression sub-element or by referencing an XPath Expression using the xpath-expression-ref attribute.

If the provided XPath expression will evaluate to a boolean value, no further configuration parameters are necessary. However, if the XPath expression will evaluate to a String, the match-value attribute should be specified against which the evaluation result will be matched.

There are three options for the match-type:

  • exact - correspond to equals on java.lang.String. The underlying implementation uses a StringValueTestXPathMessageSelector
  • case-insensitive - correspond to equals-ignore-case on java.lang.String. The underlying implementation uses a StringValueTestXPathMessageSelector
  • regex - matches operations one java.lang.String. The underlying implementation uses a RegexTestXPathMessageSelector

When providing a match-type value of regex, the value provided with the match-value attribute must be a valid Regular Expression.

<int-xml:xpath-filter discard-channel=""                      1
                      id=""                                   2
                      input-channel=""                        3
                      match-type="exact"                      4
                      match-value=""                          5
                      output-channel=""                       6
                      throw-exception-on-rejection="false"    7
                      xpath-expression-ref="">                8
    <int-xml:xpath-expression ... />                          9
    <int:poller ... />                                        10
</int-xml:xpath-filter>

1

Message Channel where you want rejected messages to be sent. Optional.

2

Id for the underlying bean definition. Optional.

3

The receiving Message channel of this endpoint. Optional.

4

Type of match to apply between the XPath evaluation result and the match-value. Default is exact. Optional.

5

String value to be matched against the XPath evaluation result. If this attribute is not provided, then the XPath evaluation MUST produce a boolean result directly. Optional.

6

The channel to which Messages that matched the filter criterias shall be dispatched to. Optional.

7

By default, this property is set to false and rejected Messages (Messages that did not match the filter criteria) will be silently dropped. However, if set to true message rejection will result in an error condition and the exception will be propagated upstream to the caller. Optional.

8

Reference to an XPath expression instance to evaluate.

9

This sub-element sets the XPath expression to be evaluated. If this is not defined you MUST define the xpath-expression-ref attribute. Also, only one xpath-expression element can be set.

10

Optional.

=== #xpath SpEL Function

Spring Integration, since version 3.0, provides the #xpath built-in SpEL function, which invokes the static method XPathUtils.evaluate(...). This method delegates to an org.springframework.xml.xpath.XPathExpression. The following shows some usage examples:

<transformer expression="#xpath(payload, '/name')"/>

<filter expression="#xpath(payload, headers.xpath, 'boolean')"/>

<splitter expression="#xpath(payload, '//book', 'document_list')"/>

<router expression="#xpath(payload, '/person/@age', 'number')">
    <mapping channel="output1" value="16"/>
    <mapping channel="output2" value="45"/>
</router>

#xpath also supports a third optional parameter for converting the result of the xpath evaluation. It can be one of the String constants 'string', 'boolean', 'number', 'node', 'node_list' and 'document_list' or an org.springframework.xml.xpath.NodeMapper instance. By default the #xpath SpEL function returns a String representation of the xpath evaluation.

[Note]Note

To enable the #xpath SpEL function, simply add the spring-integration-xml.jar to the CLASSPATH; there is no need to declare any component(s) from the Spring Integration Xml Namespace.

For more information see Section 31.3.2, “TCP Failover Client Connection Factory”.

=== XML Validating Filter

The XML Validating Filter allows you to validate incoming messages against provided schema instances. The following schema types are supported:

Messages that fail validation can either be silently dropped or they can be forwarded to a definable discard-channel. Furthermore you can configure this filter to throw an Exception in case validation fails.

Please see below for an overview of all available configuration parameters:

<int-xml:validating-filter discard-channel=""                    1
                           id=""                                 2
                           input-channel=""                      3
                           output-channel=""                     4
                           schema-location=""                    5
                           schema-type="xml-schema"              6
                           throw-exception-on-rejection="false"  7
                           xml-validator="">                     8
    <int:poller .../>                                            9
</int-xml:validating-filter>

1

Message Channel where you want rejected messages to be sent. Optional.

2

Id for the underlying bean definition. Optional.

3

The receiving Message channel of this endpoint. Optional.

4

Message Channel where you want accepted messages to be sent. Optional.

5

Sets the location of the schema to validate the Message’s payload against. Internally uses the org.springframework.core.io.Resource interface. You can set this attribute or the xml-validator attribute but not both. Optional.

6

Sets the schema type. Can be either xml-schema or relax-ng. Optional. If not set it defaults to xml-schema which internally translates to org.springframework.xml.validation.XmlValidatorFactory#SCHEMA_W3C_XML

7

If true a MessageRejectedException is thrown in case validation fails for the provided Message’s payload.Optional. Defaults to false if not set.

8

Reference to a custom sorg.springframework.xml.validation.XmlValidator strategy. You can set this attribute or the schema-location attribute but not both. Optional.

9

Optional.

== XMPP Support

Spring Integration provides Channel Adapters for XMPP.

=== Introduction

XMPP describes a way for multiple agents to communicate with each other in a distributed system. The canonical use case is to send and receive chat messages, though XMPP can be, and is, used for far more applications. XMPP is used to describe a network of actors. Within that network, actors may address each other directly, as well as broadcast status changes (e.g. "presence").

XMPP provides the messaging fabric that underlies some of the biggest Instant Messaging networks in the world, including Google Talk (GTalk) - which is also available from within GMail - and Facebook Chat. There are many good open-source XMPP servers available. Two popular implementations are Openfire and ejabberd

Spring integration provides support for XMPP via XMPP adapters which support sending and receiving both XMPP chat messages and presence changes from other entries in your roster. As with other adapters, the XMPP adapters come with support for a convenient namespace-based configuration. To configure the XMPP namespace, include the following elements in the headers of your XML configuration file:

xmlns:int-xmpp="http://www.springframework.org/schema/integration/xmpp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/xmpp
	https://www.springframework.org/schema/integration/xmpp/spring-integration-xmpp.xsd"

=== XMPP Connection

Before using inbound or outbound XMPP adapters to participate in the XMPP network, an actor must establish its XMPP connection. This connection object could be shared by all XMPP adapters connected to a particular account. Typically this requires - at a minimum -user, password, and host. To create a basic XMPP connection, you can utilize the convenience of the namespace.

<int-xmpp:xmpp-connection
    id="myConnection"
    user="user"
    password="password"
    host="host"
    port="port"
    resource="theNameOfTheResource"
    subscription-mode="accept_all"/>
[Note]Note

For added convenience you can rely on the default naming convention and omit the id attribute. The default name xmppConnection will be used for this connection bean.

If the XMPP Connection goes stale, reconnection attempts will be made with an automatic login as long as the previous connection state was logged (authenticated). We also register a ConnectionListener which will log connection events if the DEBUG logging level is enabled.

The subscription-mode initiates the Roster listener to deal with incoming subscriptions from other users. This functionality isn’t always available for the target XMPP servers. For example GCM/FCM fully disables it. To switch off the Roster listener for subscriptions you should configure it with an empty string when using XML configuration: subscription-mode="", or with XmppConnectionFactoryBean.setSubscriptionMode(null) when using Java Configuration. Doing so will disable Roster at the login phase as well. See Roster.setRosterLoadedAtLogin(Boolean) for more information.

=== XMPP Messages

==== Inbound Message Channel Adapter

The Spring Integration adapters support receiving chat messages from other users in the system. To do this, the Inbound Message Channel Adapter "logs in" as a user on your behalf and receives the messages sent to that user. Those messages are then forwarded to your Spring Integration client. Configuration support for the XMPP Inbound Message Channel Adapter is provided via the inbound-channel-adapter element.

<int-xmpp:inbound-channel-adapter id="xmppInboundAdapter"
	channel="xmppInbound"
	xmpp-connection="testConnection"
	payload-expression="getExtension('google:mobile:data').json"
	stanza-filter="stanzaFilter"
	auto-startup="true"/>

As you can see amongst the usual attributes this adapter also requires a reference to an XMPP Connection.

It is also important to mention that the XMPP inbound adapter is an event driven adapter and a Lifecycle implementation. When started it will register a PacketListener that will listen for incoming XMPP Chat Messages. It forwards any received messages to the underlying adapter which will convert them to Spring Integration Messages and send them to the specified channel. It will unregister the PacketListener when it is stopped.

Starting with version 4.3 the ChatMessageListeningEndpoint (and its <int-xmpp:inbound-channel-adapter>) supports a org.jivesoftware.smack.filter.StanzaFilter injection to be registered on the provided XMPPConnection together with an internal StanzaListener implementation. See their JavaDocs for more information.

Also with the version 4.3 the payload-expression has been introduced for the ChatMessageListeningEndpoint. The incoming org.jivesoftware.smack.packet.Message represents a root object of evaluation context. This option is useful in case of Section 31.3.2, “TCP Failover Client Connection Factory”. For example, for the GCM protocol we can extract the body using expression:

payload-expression="getExtension('google:mobile:data').json"

for the XHTML protocol:

payload-expression="getExtension(T(org.jivesoftware.smackx.xhtmlim.packet.XHTMLExtension).NAMESPACE).bodies[0]"

To simplify the access to the Extension in the XMPP Message, the extension variable is added into the EvaluationContext. Note, it is done only when one and only one Extension is present in the Message. The samples above with the namespace manipulations can be simplified to something like:

 ----
 payload-expression="#extension.json"
 payload-expression="#extension.bodies[0]"
 ----
[Note]Note

The extract-payload option has been deprecated in favor of the new payload-expression one.

==== Outbound Message Channel Adapter

You may also send chat messages to other users on XMPP using the Outbound Message Channel Adapter. Configuration support for the XMPP Outbound Message Channel Adapter is provided via the outbound-channel-adapter element.

<int-xmpp:outbound-channel-adapter id="outboundEventAdapter"
						channel="outboundEventChannel"
						xmpp-connection="testConnection"/>

The adapter expects as its input - at a minimum - a payload of type java.lang.String, and a header value for XmppHeaders.CHAT_TO that specifies to which user the Message should be sent. To create a message you might use the following Java code:

Message<String> xmppOutboundMsg = MessageBuilder.withPayload("Hello, XMPP!" )
						.setHeader(XmppHeaders.CHAT_TO, "userhandle")
						.build();

Another mechanism of setting the header is by using the XMPP header-enricher support. Here is an example.

<int-xmpp:header-enricher input-channel="input" output-channel="output">
	<int-xmpp:chat-to value="[email protected]"/>
</int-xmpp:header-enricher>

Starting with version 4.3 the packet extension support has been added to the ChatMessageSendingMessageHandler (<int-xmpp:outbound-channel-adapter>). Alongside with the regular String and org.jivesoftware.smack.packet.Message payload, now you can send a message with a payload as a org.jivesoftware.smack.packet.ExtensionElement which is populated to the org.jivesoftware.smack.packet.Message.addExtension() instead of setBody(). For the convenience an extension-provider option has been added for the ChatMessageSendingMessageHandler to allow to inject org.jivesoftware.smack.provider.ExtensionElementProvider, which builds an ExtensionElement against the payload at runtime. For this case the payload must be String in JSON or XML format depending of the XEP protocol.

=== XMPP Presence

XMPP also supports broadcasting state. You can use this capability to let people who have you on their roster see your state changes. This happens all the time with your IM clients; you change your away status, and then set an away message, and everybody who has you on their roster sees your icon or username change to reflect this new state, and additionally might see your new "away" message. If you would like to receive notification, or notify others, of state changes, you can use Spring Integration’s "presence" adapters.

==== Inbound Presence Message Channel Adapter

Spring Integration provides an Inbound Presence Message Channel Adapter which supports receiving Presence events from other users in the system who happen to be on your Roster. To do this, the adapter "logs in" as a user on your behalf, registers a RosterListener and forwards received Presence update events as Messages to the channel identified by the channel attribute. The payload of the Message will be a org.jivesoftware.smack.packet.Presence object (see https://www.igniterealtime.org/builds/smack/docs/latest/javadoc/org/jivesoftware/smack/packet/Presence.html).

Configuration support for the XMPP Inbound Presence Message Channel Adapter is provided via the presence-inbound-channel-adapter element.

<int-xmpp:presence-inbound-channel-adapter channel="outChannel"
		xmpp-connection="testConnection" auto-startup="false"/>

As you can see amongst the usual attributes this adapter also requires a reference to an XMPP Connection. It is also important to mention that this adapter is an event driven adapter and a Lifecycle implementation. It will register a RosterListener when started and will unregister that RosterListener when stopped.

==== Outbound Presence Message Channel Adapter

Spring Integration also supports sending Presence events to be seen by other users in the network who happen to have you on their Roster. When you send a Message to the Outbound Presence Message Channel Adapter it extracts the payload, which is expected to be of type org.jivesoftware.smack.packet.Presence and sends it to the XMPP Connection, thus advertising your presence events to the rest of the network.

Configuration support for the XMPP Outbound Presence Message Channel Adapter is provided via the presence-outbound-channel-adapter element.

<int-xmpp:presence-outbound-channel-adapter id="eventOutboundPresenceChannel"
	xmpp-connection="testConnection"/>

It can also be a Polling Consumer (if it receives Messages from a Pollable Channel) in which case you would need to register a Poller.

<int-xmpp:presence-outbound-channel-adapter id="pollingOutboundPresenceAdapter"
		xmpp-connection="testConnection"
		channel="pollingChannel">
	<int:poller fixed-rate="1000" max-messages-per-poll="1"/>
</int-xmpp:presence-outbound-channel-adapter>

Like its inbound counterpart, it requires a reference to an XMPP Connection.

[Note]Note

If you are relying on the default naming convention for an XMPP Connection bean (described earlier), and you have only one XMPP Connection bean configured in your Application Context, you may omit the xmpp-connection attribute. In that case, the bean with the name xmppConnection will be located and injected into the adapter.

=== Advanced Configuration

Since Spring Integration XMPP support is based on the Smack 4.0 API (https://www.igniterealtime.org/projects/smack/), it is important to know a few details related to more complex configuration of the XMPP Connection object.

As stated earlier the xmpp-connection namespace support is designed to simplify basic connection configuration and only supports a few common configuration attributes. However, the org.jivesoftware.smack.ConnectionConfiguration object defines about 20 attributes, and there is no real value of adding namespace support for all of them. So, for more complex connection configurations, simply configure an instance of our XmppConnectionFactoryBean as a regular bean, and inject a org.jivesoftware.smack.ConnectionConfiguration as a constructor argument to that FactoryBean. Every property you need, can be specified directly on that ConnectionConfiguration instance (a bean definition with the p namespace would work well). This way SSL, or any other attributes, could be set directly. Here’s an example:

<bean id="xmppConnection" class="o.s.i.xmpp.XmppConnectionFactoryBean">
    <constructor-arg>
        <bean class="org.jivesoftware.smack.ConnectionConfiguration">
            <constructor-arg value="myServiceName"/>
            <property name="socketFactory" ref="..."/>
        </bean>
    </constructor-arg>
</bean>

<int:channel id="outboundEventChannel"/>

<int-xmpp:outbound-channel-adapter id="outboundEventAdapter"
    channel="outboundEventChannel"
    xmpp-connection="xmppConnection"/>

Another important aspect of the Smack API is static initializers. For more complex cases (e.g., registering a SASL Mechanism), you may need to execute certain static initializers. One of those static initializers is SASLAuthentication, which allows you to register supported SASL mechanisms. For that level of complexity, we would recommend Spring JavaConfig-style of the XMPP Connection configuration. Then, you can configure the entire component through Java code and execute all other necessary Java code including static initializers at the appropriate time.

@Configuration
public class CustomConnectionConfiguration {
  @Bean
  public XMPPConnection xmppConnection() {
	SASLAuthentication.supportSASLMechanism("EXTERNAL", 0); // static initializer

	ConnectionConfiguration config = new ConnectionConfiguration("localhost", 5223);
	config.setTrustorePath("path_to_truststore.jks");
	config.setSecurityEnabled(true);
	config.setSocketFactory(SSLSocketFactory.getDefault());
	return new XMPPConnection(config);
  }
}

For more information on the JavaConfig style of Application Context configuration, refer to the following section in the Spring Reference Manual.

=== XMPP Message Headers

The Spring Integration XMPP Adapters will map standard XMPP properties automatically. These properties will be copied by default to and from Spring Integration MessageHeaders using the DefaultXmppHeaderMapper.

Any user-defined headers will NOT be copied to or from an XMPP Message, unless explicitly specified by the requestHeaderNames and/or replyHeaderNames properties of the DefaultXmppHeaderMapper.

[Tip]Tip

When mapping user-defined headers, the values can also contain simple wildcard patterns (e.g. "foo*" or "*foo") to be matched.

Starting with version 4.1, the AbstractHeaderMapper (a DefaultXmppHeaderMapper superclass) allows the NON_STANDARD_HEADERS token to be configured for the requestHeaderNames property (in addition to existing STANDARD_REQUEST_HEADERS) to map all user-defined headers.

Class org.springframework.xmpp.XmppHeaders identifies the default headers that will be used by the DefaultXmppHeaderMapper:

  • xmpp_from
  • xmpp_subject
  • xmpp_thread
  • xmpp_to
  • xmpp_type

Starting with version 4.3, patterns in the header mappings can be negated by preceding the pattern with !. Negated patterns get priority, so a list such as STANDARD_REQUEST_HEADERS,foo,ba*,!bar,!baz,qux,!foo will NOT map foo (nor bar nor baz); the standard headers plus bad, qux will be mapped.

[Important]Important

If you have a user defined header that begins with ! that you do wish to map, you need to escape it with \ thus: STANDARD_REQUEST_HEADERS,\!myBangHeader and it WILL be mapped.

=== XMPP Extensions

The XMPP protocol stands for eXstensible Messaging and Presence Protocol. The "extensible" part is important. XMPP is based around XML, a data format that supports a concept known as namespacing.

Through namespacing, you can add bits to XMPP that are not defined in the original specifications. This is important because the XMPP specification deliberately describes only a set of core things like:

  • How a client connects to a server
  • Encryption (SSL/TLS)
  • Authentication
  • How servers can communicate with each other to relay messages
  • and a few other basic building blocks.

Once you have implemented this, you have an XMPP client and can send any kind of data you like. But that’s not the end.

For example, perhaps you decide that you want to include formatting in a message (bold, italic, etc.) which is not defined in the core XMPP specification. Well, you can make up a way to do that, but unless everyone else does it the same way as you, no other software will be able interpret it (they will just ignore namespaces they don’t understand).

So the XMPP Standards Foundation (XSF) publishes a series of extra documents, known as XMPP Enhancement Proposals (XEPs). In general each XEP describes a particular activity (from message formatting, to file transfers, multi-user chats and many more), and they provide a standard format for everyone to use for that activity.

The Smack API provides many XEP implementations with its extensions and experimental projects. And starting with Spring Integration version 4.3 any XEP can be use with the existing XMPP channel adapters.

To be able to process XEPs or any other custom XMPP extensions, the Smack’s ProviderManager pre-configuration must be provided. It can be done via direct usage from the static Java code:

ProviderManager.addIQProvider("element", "namespace", new MyIQProvider());
ProviderManager.addExtensionProvider("element", "namespace", new MyExtProvider());

or via .providers configuration file in the specific instance and JVM argument:

-Dsmack.provider.file=file:///c:/my/provider/mycustom.providers

where mycustom.providers might be like this:

<?xml version="1.0"?>
<smackProviders>
<iqProvider>
    <elementName>query</elementName>
    <namespace>jabber:iq:time</namespace>
    <className>org.jivesoftware.smack.packet.Time</className>
</iqProvider>

<iqProvider>
    <elementName>query</elementName>
    <namespace>https://jabber.org/protocol/disco#items</namespace>
    <className>org.jivesoftware.smackx.provider.DiscoverItemsProvider</className>
</iqProvider>

<extensionProvider>
    <elementName>subscription</elementName>
    <namespace>https://jabber.org/protocol/pubsub</namespace>
    <className>org.jivesoftware.smackx.pubsub.provider.SubscriptionProvider</className>
</extensionProvider>
</smackProviders>

For example the most popular XMPP messaging extension is Google Cloud Messaging (GCM). The Smack provides the particular org.jivesoftware.smackx.gcm.provider.GcmExtensionProvider for that and registers that by default with the smack-experimental jar in the classpath using experimental.providers resource:

<!-- GCM JSON payload -->
<extensionProvider>
    <elementName>gcm</elementName>
    <namespace>google:mobile:data</namespace>
    <className>org.jivesoftware.smackx.gcm.provider.GcmExtensionProvider</className>
</extensionProvider>

Also the GcmPacketExtension is present for the target messaging protocol to parse incoming packets and build outgoing:

GcmPacketExtension gcmExtension = (GcmPacketExtension) xmppMessage.getExtension(GcmPacketExtension.NAMESPACE);
String message = gcmExtension.getJson());
GcmPacketExtension packetExtension = new GcmPacketExtension(gcmJson);
Message smackMessage = new Message();
smackMessage.addExtension(packetExtension);

See Section 31.3.2, “TCP Failover Client Connection Factory” and Section 31.3.2, “TCP Failover Client Connection Factory” above for more information.

== Zookeeper Support

=== Introduction

Zookeeper support was added to the framework in version 4.2, comprised of:

  • MetadataStore
  • LockRegistry
  • Leadership Event Handling

=== Zookeeper Metadata Store

The ZookeeperMetadataStore can be used where any MetadataStore is needed, such as peristent file list filters, etc. See Section 9.5, “Metadata Store” for more information.

<bean id="client" class="org.springframework.integration.zookeeper.config.CuratorFrameworkFactoryBean">
    <constructor-arg value="${connect.string}" />
</bean>

<bean id="meta" class="org.springframework.integration.zookeeper.metadata.ZookeeperMetadataStore">
    <constructor-arg ref="client" />
</bean>
@Bean
public MetadataStore zkStore(CuratorFramework client) {
    return new ZookeeperMetadataStore(client);
}

=== Zookeeper Lock Registry

The ZookeeperLockRegistry can be used where any LockRegistry is needed, such as when using an Aggregator in a clustered environment, with a shared MessageStore.

A LocRegistry is used to "look up" a lock based on a key (the aggregator uses the correlationId). By default, locks in the ZookeeperLockRegistry are maintained in zookeeper under the path /SpringIntegration-LockRegistry/. You can customize the path by providing an implementation of ZookeeperLockRegistry.KeyToPathStrategy.

public interface KeyToPathStrategy {

    String pathFor(String key);

    boolean bounded();

}

If the strategy returns true from isBounded, unused locks do not need to be harvested. For unbounded strategies (such as the default) you will need to invoke expireUnusedOlderThan(long age) from time to time, to remove old unused locks from memory.

=== Zookeeper Leadership Event Handling

To configure an application for leader election using Zookeeper in XML:

<int-zk:leader-listener client="client" path="/siNamespace" role="cluster" />

client is a reference to a CuratorFramework bean; a CuratorFrameworkFactoryBean is available. When a leader is elected, an OnGrantedEvent will be published for the role cluster; any endpoints in that role will be started. When leadership is revoked, an OnRevokedEvent will be published for the role cluster; any endpoints in that role will be stopped. See Section 8.2, “Endpoint Roles” for more information.

In Java configuration you can create an instance of the leader initiator like this:

@Bean
public LeaderInitiatorFactoryBean leaderInitiator(CuratorFramework client) {
    return new LeaderInitiatorFactoryBean()
                .setClient(client)
                .setPath("/siTest/")
                .setRole("cluster");
}

= Appendices

Advanced Topics and Additional Resources

== Spring Expression Language (SpEL)

=== Introduction

Many Spring Integration components can be configured using expressions. These expressions are written in the Spring Expression Language.

In most cases, the #root object is the Message which, of course, has two properties - headers and payload - allowing such expressions as payload, payload.foo, headers['my.header'] etc.

In some cases, additional variables are provided, for example the <int-http:inbound-gateway/> provides #requestParams (parameters from the HTTP request) and #pathVariables (values from path placeholders in the URI).

For all SpEL expressions, a BeanResolver is available, enabling references to any bean in the application context. For example @myBean.foo(payload). In addition, two PropertyAccessors are available; a MapAccessor enables accessing values in a Map using a key, and a ReflectivePropertyAccessor which allows access to fields and or JavaBean compliant properties (using getters and setters). This is how the Message headers and payload properties are accessible.

=== SpEL Evaluation Context Customization

Starting with Spring Integration 3.0, it is possible to add additional PropertyAccessor s to the SpEL evaluation contexts used by the framework. The framework provides the JsonPropertyAccessor which can be used (read-only) to access fields from a JsonNode, or JSON in a String. Or you can create your own PropertyAccessor if you have specific needs.

In addition, custom functions can be added. Custom functions are static methods declared on a class. Functions and property accessors are available in any SpEL expression used throughout the framework.

The following configuration shows how to directly configure the IntegrationEvaluationContextFactoryBean with custom property accessors and functions. However, for convenience, namespace support is provided for both, as described in the following sections, and the framework will automatically configure the factory bean on your behalf.

<bean id="integrationEvaluationContext"
			class="org.springframework.integration.config.IntegrationEvaluationContextFactoryBean">
	<property name="propertyAccessors">
		<util:map>
			<entry key="foo">
				<bean class="foo.MyCustomPropertyAccessor"/>
			</entry>
		</util:map>
	</property>
	<property name="functions">
		<map>
			<entry key="barcalc" value="#{T(foo.MyFunctions).getMethod('calc', T(foo.MyBar))}"/>
		</map>
	</property>
</bean>

This factory bean definition will override the default integrationEvaluationContext bean definition, adding the custom accessor to the list (which also includes the standard accessors mentioned above), and one custom function.

Note that custom functions are static methods. In the above example, the custom function is a static method calc on class MyFunctions and takes a single parameter of type MyBar.

Say you have a Message with a payload that has a type MyFoo on which you need to perform some action to create a MyBar object from it, and you then want to invoke a custom function calc on that object.

The standard property accessors wouldn’t know how to get a MyBar from a MyFoo so you could write and configure a custom property accessor to do so. So, your final expression might be "#barcalc(payload.myBar)".

The factory bean has another property typeLocator which allows you to customize the TypeLocator used during SpEL evaluation. This might be necessary when running in some environments that use a non-standard ClassLoader. In the following example, SpEL expressions will always use the bean factory’s class loader:

<bean id="integrationEvaluationContext"
		class="org.springframework.integration.config.IntegrationEvaluationContextFactoryBean">
	<property name="typeLocator">
		<bean class="org.springframework.expression.spel.support.StandardTypeLocator">
			<constructor-arg value="#{beanFactory.beanClassLoader}"/>
		</bean>
	</property>
</bean>

=== SpEL Functions

Namespace support is provided for easy addition of SpEL custom functions. You can specify <spel-function/> components to provide custom SpEL functions to the EvaluationContext used throughout the framework. Instead of configuring the factory bean above, simply add one or more of these components and the framework will automatically add them to the default integrationEvaluationContext factory bean.

For example, assuming we have a useful static method to evaluate XPath:

<int:spel-function id="xpath"
	class="com.foo.test.XPathUtils" method="evaluate(java.lang.String, java.lang.Object)"/>

<int:transformer input-channel="in" output-channel="out"
		 expression="#xpath('//foo/@bar', payload)" />

With this sample:

  • The default IntegrationEvaluationContextFactoryBean bean with id integrationEvaluationContext is registered with the application context.
  • The <spel-function/> is parsed and added to the functions Map of integrationEvaluationContext as map entry with id as the key and the static Method as the value.
  • The integrationEvaluationContext factory bean creates a new StandardEvaluationContext instance, and it is configured with the default PropertyAccessor s, BeanResolver and the custom functions.
  • That EvaluationContext instance is injected into the ExpressionEvaluatingTransformer bean.

To provide a SpEL Function via Java Configuration you should declare a SpelFunctionFactoryBean bean for each function. The sample above can be configured as follows:

@Bean
public SpelFunctionFactoryBean xpath() {
    return new SpelFunctionFactoryBean(XPathUtils.class, "evaluate");
}
[Note]Note

SpEL functions declared in a parent context are also made available in any child context(s). Each context has its own instance of the integrationEvaluationContext factory bean because each needs a different BeanResolver, but the function declarations are inherited and can be overridden if needed by declaring a SpEL function with the same name.

Built-in SpEL Functions

Spring Integration provides some standard functions, which are registered with the application context automatically on start up:

#jsonPath - to evaluate a jsonPath on some provided object. This function invokes JsonPathUtils.evaluate(...). This static method delegates to the Jayway JsonPath library. The following shows some usage examples:

<transformer expression="#jsonPath(payload, '$.store.book[0].author')"/>

<filter expression="#jsonPath(payload,'$..book[2].isbn') matches '\d-\d{3}-\d{5}-\d'"/>

<splitter expression="#jsonPath(payload, '$.store.book')"/>

<router expression="#jsonPath(payload, headers.jsonPath)">
	<mapping channel="output1" value="reference"/>
	<mapping channel="output2" value="fiction"/>
</router>

#jsonPath also supports the third optional parameter - an array of com.jayway.jsonpath.Filter, which could be provided by a reference to a bean or bean method, for example.

[Note]Note

Using this function requires the Jayway JsonPath library (json-path.jar) to be on the classpath; otherwise the #jsonPath SpEL function won’t be registered.

For more information regarding JSON see JSON Transformers in Section 7.1, “Transformer”.

#xpath - to evaluate an xpath on some provided object. For more information regarding xml and xpath see Section 31.3.2, “TCP Failover Client Connection Factory”.

=== PropertyAccessors

Namespace support is provided for the easy addition of SpEL custom PropertyAccessor implementations. You can specify the <spel-property-accessors/> component to provide a list of custom PropertyAccessor s to the EvaluationContext used throughout the framework. Instead of configuring the factory bean above, simply add one or more of these components, and the framework will automatically add the accessors to the default integrationEvaluationContext factory bean:

<int:spel-property-accessors>
	<bean id="jsonPA" class="org.springframework.integration.json.JsonPropertyAccessor"/>
	<ref bean="fooPropertyAccessor"/>
</int:spel-property-accessors>

With this sample, two custom PropertyAccessor s will be injected to the EvaluationContext in the order that they are declared.

To provide PropertyAccessor s via Java Configuration you should declare SpelPropertyAccessorRegistrar bean with the spelPropertyAccessorRegistrar (the IntegrationContextUtils.SPEL_PROPERTY_ACCESSOR_REGISTRAR_BEAN_NAME constant) name. The sample above can be configured like:

@Bean
public SpelPropertyAccessorRegistrar spelPropertyAccessorRegistrar() {
    return new SpelPropertyAccessorRegistrar(new JsonPropertyAccessor())
                    .add(fooPropertyAccessor());
}
[Note]Note

Custom PropertyAccessor s declared in a parent context are also made available in any child context(s). They are placed at the end of result list (but before the default org.springframework.context.expression.MapAccessor and o.s.expression.spel.support.ReflectivePropertyAccessor). If a PropertyAccessor with the same bean id is declared in a child context(s), it will override the parent accessor. Beans declared within a <spel-property-accessors/> must have an id attribute. The final order of usage is: the accessors in the current context, in the order in which they are declared, followed by any from parent contexts, in order, followed by the MapAccessor and finally the ReflectivePropertyAccessor.

== Message Publishing

The AOP Message Publishing feature allows you to construct and send a message as a by-product of a method invocation. For example, imagine you have a component and every time the state of this component changes you would like to be notified via a Message. The easiest way to send such notifications would be to send a message to a dedicated channel, but how would you connect the method invocation that changes the state of the object to a message sending process, and how should the notification Message be structured? The AOP Message Publishing feature handles these responsibilities with a configuration-driven approach.

=== Message Publishing Configuration

Spring Integration provides two approaches: XML and Annotation-driven.

==== Annotation-driven approach via @Publisher annotation

The annotation-driven approach allows you to annotate any method with the @Publisher annotation, specifying a channel attribute. The Message will be constructed from the return value of the method invocation and sent to a channel specified by the channel attribute. To further manage message structure, you can also use a combination of both @Payload and @Header annotations.

Internally this message publishing feature of Spring Integration uses both Spring AOP by defining PublisherAnnotationAdvisor and Spring 3.0’s Expression Language (SpEL) support, giving you considerable flexibility and control over the structure of the Message it will publish.

The PublisherAnnotationAdvisor defines and binds the following variables:

  • #return - will bind to a return value allowing you to reference it or its attributes (e.g., #return.foo where foo is an attribute of the object bound to #return)
  • #exception - will bind to an exception if one is thrown by the method invocation.
  • #args - will bind to method arguments, so individual arguments could be extracted by name (e.g., #args.fname as in the above method)

Let’s look at a couple of examples:

@Publisher
public String defaultPayload(String fname, String lname) {
  return fname + " " + lname;
}

In the above example the Message will be constructed with the following structure:

  • Message payload - will be the return type and value of the method. This is the default.
  • A newly constructed message will be sent to a default publisher channel configured with an annotation post processor (see the end of this section).
@Publisher(channel="testChannel")
public String defaultPayload(String fname, @Header("last") String lname) {
  return fname + " " + lname;
}

In this example everything is the same as above, except that we are not using a default publishing channel. Instead we are specifying the publishing channel via the channel attribute of the @Publisher annotation. We are also adding a @Header annotation which results in the Message header named last having the same value as the lname method parameter. That header will be added to the newly constructed Message.

@Publisher(channel="testChannel")
@Payload
public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) {
  return fname + " " + lname;
}

The above example is almost identical to the previous one. The only difference here is that we are using a @Payload annotation on the method, thus explicitly specifying that the return value of the method should be used as the payload of the Message.

@Publisher(channel="testChannel")
@Payload("#return + #args.lname")
public String setName(String fname, String lname, @Header("x") int num) {
  return fname + " " + lname;
}

Here we are expanding on the previous configuration by using the Spring Expression Language in the @Payload annotation to further instruct the framework how the message should be constructed. In this particular case the message will be a concatenation of the return value of the method invocation and the lname input argument. The Message header named x will have its value determined by the num input argument. That header will be added to the newly constructed Message.

@Publisher(channel="testChannel")
public String argumentAsPayload(@Payload String fname, @Header String lname) {
  return fname + " " + lname;
}

In the above example you see another usage of the @Payload annotation. Here we are annotating a method argument which will become the payload of the newly constructed message.

As with most other annotation-driven features in Spring, you will need to register a post-processor (PublisherAnnotationBeanPostProcessor).

<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>

You can instead use namespace support for a more concise configuration:

<int:annotation-config default-publisher-channel="defaultChannel"/>

Similar to other Spring annotations (@Component, @Scheduled, etc.), @Publisher can also be used as a meta-annotation. That means you can define your own annotations that will be treated in the same way as the @Publisher itself.

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Publisher(channel="auditChannel")
public @interface Audit {
}

Here we defined the @Audit annotation which itself is annotated with @Publisher. Also note that you can define a channel attribute on the meta-annotation thus encapsulating the behavior of where messages will be sent inside of this annotation. Now you can annotate any method:

@Audit
public String test() {
    return "foo";
}

In the above example every invocation of the test() method will result in a Message with a payload created from its return value. Each Message will be sent to the channel named auditChannel. One of the benefits of this technique is that you can avoid the duplication of the same channel name across multiple annotations. You also can provide a level of indirection between your own, potentially domain-specific annotations and those provided by the framework.

You can also annotate the class which would mean that the properties of this annotation will be applied on every public method of that class.

@Audit
static class BankingOperationsImpl implements BankingOperations {

  public String debit(String amount) {
     . . .

  }

  public String credit(String amount) {
     . . .
  }

}

==== XML-based approach via the <publishing-interceptor> element

The XML-based approach allows you to configure the same AOP-based Message Publishing functionality with simple namespace-based configuration of a MessagePublishingInterceptor. It certainly has some benefits over the annotation-driven approach since it allows you to use AOP pointcut expressions, thus possibly intercepting multiple methods at once or intercepting and publishing methods to which you don’t have the source code.

To configure Message Publishing via XML, you only need to do the following two things:

  • Provide configuration for MessagePublishingInterceptor via the <publishing-interceptor> XML element.
  • Provide AOP configuration to apply the MessagePublishingInterceptor to managed objects.
<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" />
</aop:config>
<publishing-interceptor id="interceptor" default-channel="defaultChannel">
  <method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="foo" value="bar"/>
  </method>
  <method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="foo" expression="'bar'.toUpperCase()"/>
  </method>
  <method pattern="echoDef*" payload="#return"/>
</publishing-interceptor>

As you can see the <publishing-interceptor> configuration looks rather similar to the Annotation-based approach, and it also utilizes the power of the Spring 3.0 Expression Language.

In the above example the execution of the echo method of a testBean will render a Message with the following structure:

  • The Message payload will be of type String with the content "Echoing: [value]" where value is the value returned by an executed method.
  • The Message will have a header with the name "foo" and value "bar".
  • The Message will be sent to echoChannel.

The second method is very similar to the first. Here every method that begins with repl will render a Message with the following structure:

  • The Message payload will be the same as in the above sample
  • The Message will have a header named "foo" whose value is the result of the SpEL expression 'bar'.toUpperCase() .
  • The Message will be sent to echoChannel.

The second method, mapping the execution of any method that begins with echoDef of testBean, will produce a Message with the following structure.

  • The Message payload will be the value returned by an executed method.
  • Since the channel attribute is not provided explicitly, the Message will be sent to the defaultChannel defined by the publisher.

For simple mapping rules you can rely on the publisher defaults. For example:

<publishing-interceptor id="anotherInterceptor"/>

This will map the return value of every method that matches the pointcut expression to a payload and will be sent to a default-channel. If the defaultChannel_is not specified (as above) the messages will be sent to the global _nullChannel.

Async Publishing

One important thing to understand is that publishing occurs in the same thread as your component’s execution. So by default in is synchronous. This means that the entire message flow would have to wait until the publisher’s flow completes.  However, quite often you want the complete opposite and that is to use this Message publishing feature to initiate asynchronous sub-flows. For example, you might host a service (HTTP, WS etc.) which receives a remote request.You may want to send this request internally into a process that might take a while. However you may also want to reply to the user right away. So, instead of sending inbound requests for processing via the output channel (the conventional way), you can simply use output-channel or a replyChannel header to send a simple acknowledgment-like reply back to the caller while using the Message publisher feature to initiate a complex flow.

EXAMPLE: Here is the simple service that receives a complex payload, which needs to be sent further for processing, but it also needs to reply to the caller with a simple acknowledgment.

public String echo(Object complexPayload) {
     return "ACK"; 
}

So instead of hooking up the complex flow to the output channel we use the Message publishing feature instead. We configure it to create a new Message using the input argument of the service method (above) and send that to the localProcessChannel. And to make sure this sub-flow is asynchronous all we need to do is send it to any type of asynchronous channel (ExecutorChannel in this example).

<int:service-activator  input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/>

<bean id="sampleservice" class="test.SampleService"/>

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(sampleservice)" />
</aop:config>

<int:publishing-interceptor id="interceptor" >
  <int:method pattern="echo" payload="#args[0]" channel="localProcessChannel">
    <int:header name="sample_header" expression="'some sample value'"/>
  </int:method>
</int:publishing-interceptor>

<int:channel id="localProcessChannel">
  <int:dispatcher task-executor="executor"/>
</int:channel>

<task:executor id="executor" pool-size="5"/>

Another way of handling this type of scenario is with a wire-tap.

==== Producing and publishing messages based on a scheduled trigger

In the above sections we looked at the Message publishing feature of Spring Integration which constructs and publishes messages as by-products of Method invocations. However in those cases, you are still responsible for invoking the method. In Spring Integration 2.0 we’ve added another related useful feature: support for scheduled Message producers/publishers via the new "expression" attribute on the inbound-channel-adapter element. Scheduling could be based on several triggers, any one of which may be configured on the poller sub-element. Currently we support cron, fixed-rate, fixed-delay as well as any custom trigger implemented by you and referenced by the trigger attribute value.

As mentioned above, support for scheduled producers/publishers is provided via the <inbound-channel-adapter> xml element. Let’s look at couple of examples:

<int:inbound-channel-adapter id="fixedDelayProducer"
       expression="'fixedDelayTest'"
       channel="fixedDelayChannel">
    <int:poller fixed-delay="1000"/>
</int:inbound-channel-adapter>

In the above example an inbound Channel Adapter will be created which will construct a Message with its payload being the result of the expression  defined in the expression attribute. Such messages will be created and sent every time the delay specified by the fixed-delay attribute occurs.

<int:inbound-channel-adapter id="fixedRateProducer"
       expression="'fixedRateTest'"
       channel="fixedRateChannel">
    <int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>

This example is very similar to the previous one, except that we are using the fixed-rate attribute which will allow us to send messages at a fixed rate (measuring from the start time of each task).

<int:inbound-channel-adapter id="cronProducer"
       expression="'cronTest'"
       channel="cronChannel">
    <int:poller cron="7 6 5 4 3 ?"/>
</int:inbound-channel-adapter>

This example demonstrates how you can apply a Cron trigger with a value specified in the cron attribute.

<int:inbound-channel-adapter id="headerExpressionsProducer"
       expression="'headerExpressionsTest'"
       channel="headerExpressionsChannel"
       auto-startup="false">
    <int:poller fixed-delay="5000"/>
    <int:header name="foo" expression="6 * 7"/>
    <int:header name="bar" value="x"/>
</int:inbound-channel-adapter>

Here you can see that in a way very similar to the Message publishing feature we are enriching a newly constructed Message with extra Message headers which can take scalar values or the results of evaluating Spring expressions.

If you need to implement your own custom trigger you can use the trigger attribute to provide a reference to any spring configured bean which implements the org.springframework.scheduling.Trigger interface.

<int:inbound-channel-adapter id="triggerRefProducer"
       expression="'triggerRefTest'" channel="triggerRefChannel">
    <int:poller trigger="customTrigger"/>
</int:inbound-channel-adapter>

<beans:bean id="customTrigger" class="o.s.scheduling.support.PeriodicTrigger">
    <beans:constructor-arg value="9999"/>
</beans:bean>

== Transaction Support

=== Understanding Transactions in Message flows

Spring Integration exposes several hooks to address transactional needs of you message flows. But to better understand these hooks and how you can benefit from them we must first revisit the 6 mechanisms that could be used to initiate Message flows and see how transactional needs of these flows could be addressed within each of these mechanisms.

Here are the 6 mechanisms to initiate a Message flow and their short summary (details for each are provided throughout this manual):

  • Gateway Proxy - Your basic Messaging Gateway
  • MessageChannel - Direct interactions with MessageChannel methods (e.g., channel.send(message))
  • Message Publisher - the way to initiate message flow as the by-product of method invocations on Spring beans
  • Inbound Channel Adapters/Gateways - the way to initiate message flow based on connecting third-party system with Spring Integration messaging system(e.g., [JmsMessage] → Jms Inbound Adapter[SI Message] → SI Channel)
  • Scheduler - the way to initiate message flow based on scheduling events distributed by a pre-configured Scheduler
  • Poller - similar to the Scheduler and is the way to initiate message flow based on scheduling or interval-based events distributed by a pre-configured Poller

These 6 could be split in 2 general categories:

  • Message flows initiated by a USER process - Example scenarios in this category would be invoking a Gateway method or explicitly sending a Message to a MessageChannel. In other words, these message flows depend on a third party process (e.g., some code that we wrote) to be initiated.
  • Message flows initiated by a DAEMON process - Example scenarios in this category would be a Poller polling a Message queue to initiate a new Message flow with the polled Message or a Scheduler scheduling the process by creating a new Message and initiating a message flow at a predefined time.

Clearly the Gateway Proxy, MessageChannel.send(..) and MessagePublisher all belong to the 1st category and Inbound Adapters/Gateways, Scheduler and Poller belong to the 2nd.

So, how do we address transactional needs in various scenarios within each category and is there a need for Spring Integration to provide something explicitly with regard to transactions for a particular scenario? Or, can Spring’s Transaction Support be leveraged instead?.

The first and most obvious goal is NOT to re-invent something that has already been invented unless you can provide a better solution. In our case Spring itself provides first class support for transaction management. So our goal here is not to provide something new but rather delegate/use Spring to benefit from the existing support for transactions. In other words as a framework we must expose hooks to the Transaction management functionality provided by Spring. But since Spring Integration configuration is based on Spring Configuration it is not always necessary to expose these hooks as they are already exposed via Spring natively. Remember every Spring Integration component is a Spring Bean after all.

With this goal in mind let’s look at the two scenarios. 

If you think about it, Message flows that are initiated by the USER process (Category 1) and obviously configured in a Spring Application Context, are subject to transactional configuration of such processes and therefore don’t need to be explicitly configured by Spring Integration to support transactions. The transaction could and should be initiated through standard Transaction support provided by Spring. The Spring Integration message flow will honor the transactional semantics of the components naturally because it is Spring configured. For example, a Gateway or ServiceActivator method could be annotated with @Transactional or TransactionInterceptor could be defined in an XML configuration with a point-cut expression pointing to specific methods that should be transactional. The bottom line is that you have full control over transaction configuration and boundaries in these scenarios.

However, things are a bit different when it comes to Message flows initiated by the DAEMON process (Category 2). Although configured by the developer these flows do not directly involve a human or some other process to be initiated. These are trigger-based flows that are initiated by a trigger process (DAEMON process) based on the configuration of such process. For example, we could have a Scheduler initiating a message flow every Friday night of every week. We can also configure a trigger that initiates a Message flow every second, etc. So, we obviously need a way to let these trigger-based processes know of our intention to make the resulting Message flows transactional so that a Transaction context could be created whenever a new Message flow is initiated. In other words we need to expose some Transaction configuration, but ONLY enough to delegate to Transaction support already provided by Spring (as we do in other scenarios).

Spring Integration provides transactional support for Pollers. Pollers are a special type of component because we can call receive() within that poller task against a resource that is itself transactional thus including receive() call in the the boundaries of the Transaction allowing it to be rolled back in case of a task failure. If we were to add the same support for channels, the added transactions would affect all downstream components starting with that send() call. That is providing a rather wide scope for transaction demarcation without any strong reason especially when Spring already provides several ways to address the transactional needs of any component downstream. However the receive() method being included in a transaction boundary is the "strong reason" for pollers.

==== Poller Transaction Support

Any time you configure a Poller you can provide transactional configuration via the transactional sub-element and its attributes:

<int:poller max-messages-per-poll="1" fixed-rate="1000">
    <transactional transaction-manager="txManager" 
                   isolation="DEFAULT"
                   propagation="REQUIRED" 
                   read-only="true" 
                   timeout="1000"/>
</poller>

As you can see this configuration looks very similar to native Spring transaction configuration. You must still provide a reference to a Transaction manager and specify transaction attributes or rely on defaults (e.g., if the transaction-manager attribute is not specified, it will default to the bean with the name transactionManager). Internally the process would be wrapped in Spring’s native Transaction where TransactionInterceptor is responsible for handling transactions. For more information on how to configure a Transaction Manager, the types of Transaction Managers (e.g., JTA, Datasource etc.) and other details related to transaction configuration please refer to Spring’s Reference manual (Chapter 10 - Transaction Management).

With the above configuration all Message flows initiated by this poller will be transactional. For more information and details on a Poller’s transactional configuration please refer to section - 21.1.1. Polling and Transactions.

Along with transactions, several more cross cutting concerns might need to be addressed when running a Poller. To help with that, the Poller element accepts an <advice-chain> _ sub-element which allows you to define a custom chain of Advice instances to be applied on the Poller. (see section 4.4 for more details) In Spring Integration 2.0, the Poller went through the a refactoring effort and is now using a proxy mechanism to address transactional concerns as well as other cross cutting concerns. One of the significant changes evolving from this effort is that we made _<transactional> and <advice-chain> elements mutually exclusive. The rationale behind this is that if you need more than one advice, and one of them is Transaction advice, then you can simply include it in the <advice-chain> with the same convenience as before but with much more control since you now have an option to position any advice in the desired order.

<int:poller max-messages-per-poll="1" fixed-rate="10000">
  <advice-chain>
    <ref bean="txAdvice"/>
    <ref bean="someAotherAdviceBean" />
    <beans:bean class="foo.bar.SampleAdvice"/>
  </advice-chain>
</poller>

<tx:advice id="txAdvice" transaction-manager="txManager">
  <tx:attributes>
    <tx:method name="get*" read-only="true"/>
    <tx:method name="*"/>
  </tx:attributes>
</tx:advice>

As you can see from the example above, we have provided a very basic XML-based configuration of Spring Transaction advice  - "txAdvice" and included it within the <advice-chain> defined by the Poller. If you only need to address transactional concerns of the Poller, then you can still use the <transactional> element as a convenience.

=== Transaction Boundaries

Another important factor is the boundaries of Transactions within a Message flow. When a transaction is started, the transaction context is bound to the current thread. So regardless of how many endpoints and channels you have in your Message flow your transaction context will be preserved as long as you are ensuring that the flow continues on the same thread. As soon as you break it by introducing a Pollable Channel or Executor Channel or initiate a new thread manually in some service, the Transactional boundary will be broken as well. Essentially the Transaction will END right there, and if a successful handoff has transpired between the threads, the flow would be considered a success and a COMMIT signal would be sent even though the flow will continue and might still result in an Exception somewhere downstream. If such a flow were synchronous, that Exception could be thrown back to the initiator of the Message flow who is also the initiator of the transactional context and the transaction would result in a ROLLBACK. The middle ground is to use transactional channels at any point where a thread boundary is being broken. For example, you can use a Queue-backed Channel that delegates to a transactional MessageStore strategy, or you could use a JMS-backed channel.

=== Transaction Synchronization

In some environments, it is advantageous to synchronize operations with a transaction that encompasses the entire flow. For example, consider a <file:inbound-channel-adapter/> at the start of a flow, that performs a number of database updates. If the transaction commits, we might want to move the file to a success directory, while we might want to move it to a failures directory if the transaction rolls back.

Spring Integration 2.2 introduces the capability of synchronizing these operations with a transaction. In addition, you can configure a PseudoTransactionManager if you don’t have a real transaction, but still want to perform different actions on success, or failure. For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.

Key strategy interfaces for this feature are

public interface TransactionSynchronizationFactory {

    TransactionSynchronization create(Object key);
}

public interface TransactionSynchronizationProcessor {

    void processBeforeCommit(IntegrationResourceHolder holder);

    void processAfterCommit(IntegrationResourceHolder holder);

    void processAfterRollback(IntegrationResourceHolder holder);

}

The factory is responsible for creating a TransactionSynchronization object. You can implement your own, or use the one provided by the framework: DefaultTransactionSynchronizationFactory. This implementation returns a TransactionSynchronization that delegates to a default implementation of TransactionSynchronizationProcessor, the ExpressionEvaluatingTransactionSynchronizationProcessor. This processor supports three SpEL expressions, beforeCommitExpression, afterCommitExpression, and afterRollbackExpression.

These actions should be self-explanatory to those familiar with transactions. In each case, the #root variable is the original Message; in some cases, other SpEL variables are made available, depending on the MessageSource being polled by the poller. For example, the MongoDbMessageSource provides the #mongoTemplate variable which references the message source’s MongoTemplate; the RedisStoreMessageSource provides the #store variable which references the RedisStore created by the poll.

To enable the feature for a particular poller, you provide a reference to the TransactionSynchronizationFactory on the poller’s <transactional/> element using the synchronization-factory attribute.

To simplify configuration of these components, namespace support for the default factory has been provided. Configuration is best described using an example:

<int-file:inbound-channel-adapter id="inputDirPoller"
    channel="someChannel"
    directory="/foo/bar"
    filter="filter"
    comparator="testComparator">
    <int:poller fixed-rate="5000">
        <int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
    </int:poller>
</int-file:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="payload.renameTo('/success/' + payload.name)" channel="committedChannel" />
    <int:after-rollback expression="payload.renameTo('/failed/' + payload.name)" channel="rolledBackChannel" />
</int:transaction-synchronization-factory>

The result of the SpEL evaluation is sent as the payload to either the committedChannel or rolledBackChannel (in this case, this would be Boolean.TRUE or Boolean.FALSE - the result of the java.io.File.renameTo() method call).

If you wish to send the entire payload for further Spring Integration processing, simply use the expression payload.

[Important]Important

It is important to understand that this is simply synchronizing the actions with a transaction, it does not make a resource that is not inherently transactional actually transactional. Instead, the transaction (be it JDBC or otherwise) is started before the poll, and committed/rolled back when the flow completes, followed by the synchronized action.

It is also important to understand that if you provide a custom TransactionSynchronizationFactory, it is responsible for creating a resource synchronization that will cause the bound resource to be unbound automatically, when the transaction completes. The default TransactionSynchronizationFactory does this by returning a subclass of ResourceHolderSynchronization, with the default shouldUnbindAtCompletion() returning true.

In addition to the after-commit and after-rollback expressions, before-commit is also supported. In that case, if the evaluation (or downstream processing) throws an exception, the transaction will be rolled back instead of being committed.

=== Pseudo Transactions

Referring to the above section, you may be thinking it would be useful to take these success or failure actions when a flow completes, even if there is no real transactional resources (such as JDBC) downstream of the poller. For example, consider a <file:inbound-channel-adapter/> followed by an <ftp:outbout-channel-adapter/>. Neither of these components is transactional but we might want to move the input file to different directories, based on the success or failure of the ftp transfer.

To provide this functionality, the framework provides a PseudoTransactionManager, enabling the above configuration even when there is no real transactional resource involved. If the flow completes normally, the beforeCommit and afterCommit synchronizations will be called, on failure the afterRollback will be called. Of course, because it is not a real transaction there will be no actual commit or rollback. The pseudo transaction is simply a vehicle used to enable the synchronization features.

To use a PseudoTransactionManager, simply define it as a <bean/>, in the same way you would configure a real transaction manager:

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />

== Security in Spring Integration

=== Introduction

Security is one of the important functions in any modern enterprise (or cloud) application, moreover it is critical for distributed systems, such as those built using Enterprise Integration Patterns. Messaging independence and loosely-coupling allow target systems to communicate with each other with any type of data in the message’s payload. We can either trust all those messages or secure our service against "infecting" messages.

Spring Integration together with Spring Security provide a simple and comprehensive way to secure message channels, as well as other part of the integration solution.

=== Securing channels

Spring Integration provides the interceptor ChannelSecurityInterceptor, which extends AbstractSecurityInterceptor and intercepts send and receive calls on the channel. Access decisions are then made with reference to a ChannelSecurityMetadataSource which provides the metadata describing the send and receive access policies for certain channels. The interceptor requires that a valid SecurityContext has been established by authenticating with Spring Security. See the Spring Security reference documentation for details.

Namespace support is provided to allow easy configuration of security constraints. This consists of the secured channels tag which allows definition of one or more channel name patterns in conjunction with a definition of the security configuration for send and receive. The pattern is a java.util.regexp.Pattern.

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int="http://www.springframework.org/schema/integration"
   xmlns:int-security="http://www.springframework.org/schema/integration/security"
  xmlns:beans="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:security="http://www.springframework.org/schema/security"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
      https://www.springframework.org/schema/beans/spring-beans.xsd
      http://www.springframework.org/schema/security
      https://www.springframework.org/schema/security/spring-security.xsd
      http://www.springframework.org/schema/integration
      https://www.springframework.org/schema/integration/spring-integration.xsd
      http://www.springframework.org/schema/integration/security
      https://www.springframework.org/schema/integration/security/spring-integration-security.xsd">

<int-security:secured-channels>
    <int-security:access-policy pattern="admin.*" send-access="ROLE_ADMIN"/>
    <int-security:access-policy pattern="user.*" receive-access="ROLE_USER"/>
</int-security:secured-channels>

By default the secured-channels namespace element expects a bean named authenticationManager which implements AuthenticationManager and a bean named accessDecisionManager which implements AccessDecisionManager. Where this is not the case references to the appropriate beans can be configured as attributes of the secured-channels element as below.

<int-security:secured-channels access-decision-manager="customAccessDecisionManager"
                              authentication-manager="customAuthenticationManager">
    <int-security:access-policy pattern="admin.*" send-access="ROLE_ADMIN"/>
    <int-security:access-policy pattern="user.*" receive-access="ROLE_USER"/>
</int-security:secured-channels>

Starting with version 4.2, the @SecuredChannel annotation is available for Java & Annotation configuration in @Configuration classes.

With the @SecuredChannel annotation, the Java configuration variant of the XML configuration above is:

@Configuration
@EnableIntegration
public class ContextConfiguration {

    @Bean
    @SecuredChannel(interceptor = "channelSecurityInterceptor", sendAccess = "ROLE_ADMIN")
    public SubscribableChannel adminChannel() {
    	return new DirectChannel();
    }

    @Bean
    @SecuredChannel(interceptor = "channelSecurityInterceptor", receiveAccess = "ROLE_USER")
    public SubscribableChannel userChannel() {
    	return new DirectChannel();
    }

    @Bean
    public ChannelSecurityInterceptor channelSecurityInterceptor(AuthenticationManager authenticationManager,
    		AccessDecisionManager accessDecisionManager) {
    	ChannelSecurityInterceptor channelSecurityInterceptor = new ChannelSecurityInterceptor();
    	channelSecurityInterceptor.setAuthenticationManager(authenticationManager);
    	channelSecurityInterceptor.setAccessDecisionManager(accessDecisionManager);
    	return channelSecurityInterceptor;
    }

}

=== SecurityContext Propagation

To be sure that our interaction with the application is secure, according to its security system rules, we should supply some security context with an authentication (principal) object. The Spring Security project provides a flexible, canonical mechanism to authenticate our application clients over HTTP, WebSocket or SOAP protocols (as can be done for any other integration protocol with a simple Spring Security extension) and it provides a SecurityContext for further authorization checks on the application objects, such as message channels. By default, the SecurityContext is tied with the current Thread 's execution state using the (ThreadLocalSecurityContextHolderStrategy). It is accessed by an AOP interceptor on secured methods to check if that principal of the invocation has sufficent permissions to call that method, for example. This works well with the current thread, but often, processing logic can be performed on another thread or even on several threads, or on to some external system(s).

Standard thread-bound behavior is easy to configure if our application is built on the Spring Integration components and its message channels. In this case, the secured objects may be any service activator or transformer, secured with a MethodSecurityInterceptor in their <request-handler-advice-chain> (see Section 8.9, “Adding Behavior to Endpoints”) or even MessageChannel (see Section 31.3.2, “TCP Failover Client Connection Factory” above). When using DirectChannel communication, the SecurityContext is available automatically, because the downstream flow runs on the current thread. But in case of the QueueChannel, ExecutorChannel and PublishSubscribeChannel with an Executor, messages are transferred from one thread to another (or several) by the nature of those channels. In order to support such scenarios, we can either transfer an Authentication object within the message headers and extract and authenticate it on the other side before secured object access. Or, we can propagate the SecurityContext to the thread receiving the transferred message.

Starting with version 4.2 SecurityContext propagation has been introduced. It is implemented as a SecurityContextPropagationChannelInterceptor, which can simply be added to any MessageChannel or configured as a @GlobalChannelInterceptor. The logic of this interceptor is based on the SecurityContext extraction from the current thread from the preSend() method, and its populating to another thread from the postReceive() (beforeHandle()) method. Actually, this interceptor is an extension of the more generic ThreadStatePropagationChannelInterceptor, which wraps the message-to-send together with the state-to-propagate in an internal Message<?> extension - MessageWithThreadState<S>, - on one side and extracts the original message back and state-to-propagate on another. The ThreadStatePropagationChannelInterceptor can be extended for any context propagation use-case and SecurityContextPropagationChannelInterceptor is a good sample on the matter.

[Important]Important

Since the logic of the ThreadStatePropagationChannelInterceptor is based on message modification (it returns an internal MessageWithThreadState object to send), you should be careful when combining this interceptor with any other which is intended to modify messages too, e.g. through the MessageBuilder.withPayload(...)...build() - the state-to-propagate may be lost. In most cases to overcome the issue, it’s sufficient to order interceptors for the channel and ensure the ThreadStatePropagationChannelInterceptor is the last one in the stack.

Propagation and population of SecurityContext is just one half of the work. Since the message isn’t an owner of the threads in the message flow and we should be sure that we are secure against any incoming messages, we have to clean up the SecurityContext from ThreadLocal. The SecurityContextPropagationChannelInterceptor provides afterMessageHandled() interceptor’s method implementation to do the clean up operation to free the Thread in the end of invocation from that propagated principal. This means that, when the thread that processes the handed-off message, completes the processing of the message (successfully or otherwise), the context is cleared so that it can’t be inadvertently be used when processing another message.

== Spring Integration Samples

=== Introduction

As of Spring Integration 2.0, the samples are no longer included with the Spring Integration distribution. Instead we have switched to a much simpler collaborative model that should promote better community participation and, ideally, more contributions. Samples now have a dedicated Git repository and a dedicated JIRA Issue Tracking system. Sample development will also have its own lifecycle which is not dependent on the lifecycle of the framework releases, although the repository will still be tagged with each major release for compatibility reasons.

The great benefit to the community is that we can now add more samples and make them available to you right away without waiting for the next release. Having its own JIRA that is not tied to the the actual framework is also a great benefit. You now have a dedicated place to suggest samples as well as report issues with existing samples. Or, _ you may want to submit a sample to us_ as an attachment through the JIRA or, better, through the collaborative model that Git promotes. If we believe your sample adds value, we would be more then glad to add it to the samples repository, properly crediting you as the author.

=== Where to get Samples

The Spring Integration Samples project is hosted on GitHub. You can find the repository at:

https://github.com/SpringSource/spring-integration-samples

In order to check out or clone (Git parlance) the samples, please make sure you have a Git client installed on your system. There are several GUI-based products available for many platforms, e.g. EGit for the Eclipse IDE. A simple Google search will help you find them. Of course you can also just use the command line interface for <https://git-scm.com/,Git>.

[Note]Note

If you need more information on how to install and/or use Git, please visit: https://git-scm.com/.

In order to checkout (clone in Git terms) the Spring Integration samples repository using the Git command line tool, issue the following commands:

$ git clone https://github.com/SpringSource/spring-integration-samples.git

That is all you need to do in order to clone the entire samples repository into a directory named spring-integration-samples within the working directory where you issued that git command. Since the samples repository is a live repository, you might want to perform periodic pulls (updates) to get new samples, as well as updates to the existing samples. In order to do so issue the following git pull command:

$ git pull

=== Submitting Samples or Sample Requests

How can I contribute my own Samples?

Github is for social coding: if you want to submit your own code examples to the Spring Integration Samples project, we encourage contributions through pull requests from forks of this repository. If you want to contribute code this way, please reference, if possible, ahttps://jira.springframework.org/browse/INTSAMPLES[JIRA Ticket] that provides some details regarding the provided sample.

[Important]Sign the contributor license agreement

Very important: before we can accept your Spring Integration sample, we will need you to sign the SpringSource contributor license agreement (CLA). Signing the contributor’s agreement does not grant anyone commit rights to the main repository, but it does mean that we can accept your contributions, and you will get an author credit if we do. In order to read and sign the CLA, please go to:

https://support.springsource.com/spring_committer_signup

From the Project drop down, please select Spring Integration. The Project Lead is Gary Russell.

Code Contribution Process

For the actual code contribution process, please read the the Contributor Guidelines for Spring Integration, they apply for this project as well:

https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md

This process ensures that every commit gets peer-reviewed. As a matter of fact, the core committers follow the exact same rules. We are gratefully looking forward to your Spring Integration Samples!

Sample Requests

As mentioned earlier, the Spring Integration Samples project has a dedicated JIRA Issue tracking system. To submit new sample requests, please visit our JIRA Issue Tracking system:

https://jira.springframework.org/browse/INTSAMPLES.

=== Samples Structure

Starting with Spring Integration 2.0, the structure of the samples changed as well. With plans for more samples we realized that some samples have different goals than others. While they all share the common goal of showing you how to apply and work with the Spring Integration framework, they also differ in areas where some samples are meant to concentrate on a technical use case while others focus on a business use case, and some samples are all about showcasing various techniques that could be applied to address certain scenarios (both technical and business). The new categorization of samples will allow us to better organize them based on the problem each sample addresses while giving you a simpler way of finding the right sample for your needs.

Currently there are 4 categories. Within the samples repository each category has its own directory which is named after the category name:

BASIC (samples/basic)

This is a good place to get started. The samples here are technically motivated and demonstrate the bare minimum with regard to configuration and code. These should help you to get started quickly by introducing you to the basic concepts, API and configuration of Spring Integration as well as Enterprise Integration Patterns (EIP). For example, if you are looking for an answer on how to implement and wire a Service Activator to a Message Channel or how to use a Messaging Gateway as a facade to your message exchange, or how to get started with using MAIL or TCP/UDP modules etc., this would be the right place to find a good sample. The bottom line is this is a good place to get started.

INTERMEDIATE (samples/intermediate)

This category targets developers who are already familiar with the Spring Integration framework (past getting started), but need some more guidance while resolving the more advanced technical problems one might deal with after switching to a Messaging architecture. For example, if you are looking for an answer on how to handle errors in various message exchange scenarios or how to properly configure the Aggregator for the situations where some messages might not ever arrive for aggregation, or any other issue that goes beyond a basic implementation and configuration of a particular component and addresses what else types of problems, this would be the right place to find these type of samples.

ADVANCED (samples/advanced)

This category targets developers who are very familiar with the Spring Integration framework but are looking to extend it to address a specific custom need by using Spring Integration’s public API. For example, if you are looking for samples showing you how to implement a custom Channel or Consumer (event-based or polling-based), or you are trying to figure out what is the most appropriate way to implement a custom Bean parser on top of the Spring Integration Bean parser hierarchy when implementing your own namespace and schema for a custom component, this would be the right place to look. Here you can also find samples that will help you with Adapter development. Spring Integration comes with an extensive library of adapters to allow you to connect remote systems with the Spring Integration messaging framework. However you might have a need to integrate with a system for which the core framework does not provide an adapter. So, you may decide to implement your own (and potentially contribute it). This category would include samples showing you how.

APPLICATIONS (samples/applications)

This category targets developers and architects who have a good understanding of Message-driven architecture and EIP, and an above average understanding of Spring and Spring Integration who are looking for samples that address a particular business problem. In other words the emphasis of samples in this category is business use cases and how they can be solved with a Message-Driven Architecture and Spring Integration in particular. For example, if you are interested to see how a Loan Broker or Travel Agent process could be implemented and automated via Spring Integration, this would be the right place to find these types of samples.

[Important]Important

Remember: Spring Integration is a community driven framework, therefore community participation is IMPORTANT. That includes Samples; so, if you can’t find what you are looking for, let us know!

=== Samples

Currently Spring Integration comes with quite a few samples and you can only expect more. To help you better navigate through them, each sample comes with its own readme.txt file which covers several details about the sample (e.g., what EIP patterns it addresses, what problem it is trying to solve, how to run sample etc.). However, certain samples require a more detailed and sometimes graphical explanation. In this section you’ll find details on samples that we believe require special attention.

==== Loan Broker

In this section, we will review the Loan Broker sample application that is included in the Spring Integration samples. This sample is inspired by one of the samples featured in Gregor Hohpe and Bobby Woolf’s book, Enterprise Integration Patterns.

The diagram below represents the entire process

Figure 31.1. Loan Broker Sample

loan broker eip

Now lets look at this process in more detail

At the core of an EIP architecture are the very simple yet powerful concepts of Pipes and Filters, and of course: Messages. Endpoints (Filters) are connected with one another via Channels (Pipes). The producing endpoint sends Message to the Channel, and the Message is retrieved by the Consuming endpoint. This architecture is meant to define various mechanisms that describe HOW information is exchanged between the endpoints, without any awareness of WHAT those endpoints are or what information they are exchanging. Thus, it provides for a very loosely coupled and flexible collaboration model while also decoupling Integration concerns from Business concerns. EIP extends this architecture by further defining:

  • The types of pipes (Point-to-Point Channel, Publish-Subscribe Channel, Channel Adapter, etc.)
  • The core filters and patterns around how filters collaborate with pipes (Message Router, Splitters and Aggregators, various Message Transformation patterns, etc.)

The details and variations of this use case are very nicely described in Chapter 9 of the EIP Book, but here is the brief summary; A Consumer while shopping for the best Loan Quote(s) subscribes to the services of a Loan Broker, which handles details such as:

  • Consumer pre-screening (e.g., obtain and review the consumer’s Credit history)
  • Determine the most appropriate Banks (e.g., based on consumer’s credit history/score)
  • Send a Loan quote request to each selected Bank
  • Collect responses from each Bank
  • Filter responses and determine the best quote(s), based on consumer’s requirements.
  • Pass the Loan quote(s) back to the consumer.

Obviously the real process of obtaining a loan quote is a bit more complex, but since our goal here is to demonstrate how Enterprise Integration Patterns are realized and implemented within SI, the use case has been simplified to concentrate only on the Integration aspects of the process. It is not an attempt to give you an advice in consumer finances.

As you can see, by hiring a Loan Broker, the consumer is isolated from the details of the Loan Broker’s operations, and each Loan Broker’s operations may defer from one another to maintain competitive advantage, so whatever we assemble/implement must be flexible so any changes could be introduced quickly and painlessly. Speaking of change, the Loan Broker sample does not actually talk to any imaginary Banks or Credit bureaus. Those services are stubbed out. Our goal here is to assemble, orchestrate and test the integration aspect of the process as a whole. Only then can we start thinking about wiring such process to the real services. At that time the assembled process and its configuration will not change regardless of the number of Banks a particular Loan Broker is dealing with, or the type of communication media (or protocols) used (JMS, WS, TCP, etc.) to communicate with these Banks.

DESIGN

As you analyze the 6 requirements above you’ll quickly see that they all fall into the category of Integration concerns. For example, in the consumer pre-screening step we need to gather additional information about the consumer and the consumer’s desires and enrich the loan request with additional meta information. We then have to filter such information to select the most appropriate list of Banks, and so on. Enrich, filter, select – these are all integration concerns for which EIP defines a solution in the form of patterns. SI provides an implementation of these patterns.

Figure 31.2. Messaging Gateway

gateway

The Messaging Gateway pattern provides a simple mechanism to access messaging systems, including our Loan Broker. In SI you define the Gateway as a Plain Old Java Interface (no need to provide an implementation), configure it via the XML <gateway> element or via annotation and use it as any other Spring bean. SI will take care of delegating and mapping method invocations to the Messaging infrastructure by generating a Message (payload is mapped to an input parameter of the method) and sending it to the designated channel.

<int:gateway id="loanBrokerGateway"
  default-request-channel="loanBrokerPreProcessingChannel"
  service-interface="org.springframework.integration.samples.loanbroker.LoanBrokerGateway">
  <int:method name="getBestLoanQuote">
    <int:header name="RESPONSE_TYPE" value="BEST"/>
  </int:method>
</int:gateway>

Our current Gateway provides two methods that could be invoked. One that will return the best single quote and another one that will return all quotes. Somehow downstream we need to know what type of reply the caller is looking for. The best way to achieve this in Messaging architecture is to enrich the content of the message with some meta-data describing your intentions. Content Enricher is one of the patterns that addresses this and although Spring Integration does provide a separate configuration element to enrich Message Headers with arbitrary data (we’ll see it later), as a convenience, since_Gateway_ element is responsible to construct the initial Message it provides embedded capability to enrich the newly created Message with arbitrary Message Headers. In our example we are adding header RESPONSE_TYPE with value BEST' whenever the getBestQuote() method is invoked. For other method we are not adding any header. Now we can check downstream for an existence of this header and based on its presence and its value we can determine what type of reply the caller is looking for.

Based on the use case we also know there are some pre-screening steps that needs to be performed such as getting and evaluating the consumer’s credit score, simply because some premiere Banks will only typically accept quote requests from consumers that meet a minimum credit score requirement. So it would be nice if the Message would be enriched with such information before it is forwarded to the Banks. It would also be nice if when several processes needs to be completed to provide such meta-information, those processes could be grouped in a single unit. In our use case we need to determine credit score and based on the credit score and some rule select a list of Message Channels (Bank Channels) we will sent quote request to.

Composed Message Processor

The Composed Message Processor pattern describes rules around building endpoints that maintain control over message flow which consists of multiple message processors. In Spring Integration Composed Message Processor pattern is implemented via <chain> element.

Figure 31.3. Chain

chain

As you can see from the above configuration we have a chain with inner header-enricher element which will further enrich the content of the Message with the header CREDIT_SCORE and value that will be determined by the call to a credit service (simple POJO spring bean identified by creditBureau name) and then it will delegate to the Message Router

Figure 31.4. Message Router

bank router

There are several implementations of the Message Routing pattern available in Spring Integration. Here we are using a router that will determine a list of channels based on evaluating an expression (Spring Expression Language) which will look at the credit score that was determined is the previous step and will select the list of channels from the Map bean with id banks whose values are premier or secondary based o the value of credit score. Once the list of Channels is selected, the Message will be routed to those Channels.

Now, one last thing the Loan Broker needs to to is to receive the loan quotes form the banks, aggregate them by consumer (we don’t want to show quotes from one consumer to another), assemble the response based on the consumer’s selection criteria (single best quote or all quotes) and reply back to the consumer.

Figure 31.5. Message Aggregator

quotes aggregator

An Aggregator pattern describes an endpoint which groups related Messages into a single Message. Criteria and rules can be provided to determine an aggregation and correlation strategy. SI provides several implementations of the Aggregator pattern as well as a convenient name-space based configuration.

<int:aggregator id="quotesAggregator"
      input-channel="quotesAggregationChannel"
      method="aggregateQuotes">
  <beans:bean class="org.springframework.integration.samples.loanbroker.LoanQuoteAggregator"/>
</int:aggregator>

Our Loan Broker defines a quotesAggregator bean via the <aggregator> element which provides a default aggregation and correlation strategy. The default correlation strategy correlates messages based on the correlationId header (see Correlation Identifier pattern). What’s interesting is that we never provided the value for this header. It was set earlier by the router automatically, when it generated a separate Message for each Bank channel.

Once the Messages are correlated they are released to the actual Aggregator implementation. Although default Aggregator is provided by SI, its strategy (gather the list of payloads from all Messages and construct a new Message with this List as payload) does not satisfy our requirement. The reason is that our consumer might require a single best quote or all quotes. To communicate the consumer’s intention, earlier in the process we set the RESPONSE_TYPE header. Now we have to evaluate this header and return either all the quotes (the default aggregation strategy would work) or the best quote (the default aggregation strategy will not work because we have to determine which loan quote is the best).

Obviously selecting the best quote could be based on complex criteria and would influence the complexity of the aggregator implementation and configuration, but for now we are making it simple. If consumer wants the best quote we will select a quote with the lowest interest rate. To accomplish that the LoanQuoteAggregator.java will sort all the quotes and return the first one. The LoanQuote.java implements Comparable which compares quotes based on the rate attribute. Once the response Message is created it is sent to the default-reply-channel of the Messaging Gateway (thus the consumer) which started the process. Our consumer got the Loan Quote!

Conclusion

As you can see a rather complex process was assembled based on POJO (read existing, legacy), light weight, embeddable messaging framework (Spring Integration) with a loosely coupled programming model intended to simplify integration of heterogeneous systems without requiring a heavy-weight ESB-like engine or proprietary development and deployment environment, because as a developer you should not be porting your Swing or console-based application to an ESB-like server or implementing proprietary interfaces just because you have an integration concern.

This and other samples in this section are built on top of Enterprise Integration Patterns and can be considered "building blocks" for YOUR solution; they are not intended to be complete solutions. Integration concerns exist in all types of application (whether server based or not). It should not require change in design, testing and deployment strategy if such applications need to be integrated.

==== The Cafe Sample

In this section, we will review a Cafe sample application that is included in the Spring Integration samples. This sample is inspired by another sample featured in Gregor Hohpe’s https://www.enterpriseintegrationpatterns.com/ramblings.html[Ramblings].

The domain is that of a Cafe, and the basic flow is depicted in the following diagram:

Figure 31.6. Cafe Sample

cafe eip

The Order object may contain multiple OrderItems. Once the order is placed, a Splitter will break the composite order message into a single message per drink. Each of these is then processed by a Router that determines whether the drink is hot or cold (checking the OrderItem object’s isIced property). The Barista prepares each drink, but hot and cold drink preparation are handled by two distinct methods: prepareHotDrink and prepareColdDrink. The prepared drinks are then sent to the Waiter where they are aggregated into a Delivery object.

Here is the XML configuration:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int="http://www.springframework.org/schema/integration"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:beans="http://www.springframework.org/schema/beans"
 xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
 xsi:schemaLocation="http://www.springframework.org/schema/beans
  https://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/integration
  https://www.springframework.org/schema/integration/spring-integration.xsd
  http://www.springframework.org/schema/integration/stream
  https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

    <int:gateway id="cafe" service-interface="o.s.i.samples.cafe.Cafe"/>

    <int:channel  id="orders"/>
    <int:splitter input-channel="orders" ref="orderSplitter"
                  method="split" output-channel="drinks"/>

    <int:channel id="drinks"/>
    <int:router  input-channel="drinks"
                 ref="drinkRouter" method="resolveOrderItemChannel"/>

    <int:channel id="coldDrinks"><int:queue capacity="10"/></int:channel>
    <int:service-activator input-channel="coldDrinks" ref="barista"
                           method="prepareColdDrink" output-channel="preparedDrinks"/>

    <int:channel id="hotDrinks"><int:queue capacity="10"/></int:channel>
    <int:service-activator input-channel="hotDrinks" ref="barista"
                           method="prepareHotDrink" output-channel="preparedDrinks"/>

    <int:channel id="preparedDrinks"/>
    <int:aggregator input-channel="preparedDrinks" ref="waiter"
                    method="prepareDelivery" output-channel="deliveries"/>

    <int-stream:stdout-channel-adapter id="deliveries"/>

    <beans:bean id="orderSplitter"
                class="org.springframework.integration.samples.cafe.xml.OrderSplitter"/>

    <beans:bean id="drinkRouter"
                class="org.springframework.integration.samples.cafe.xml.DrinkRouter"/>

    <beans:bean id="barista" class="o.s.i.samples.cafe.xml.Barista"/>
    <beans:bean id="waiter"  class="o.s.i.samples.cafe.xml.Waiter"/>

    <int:poller id="poller" default="true" fixed-rate="1000"/>

</beans:beans>

As you can see, each Message Endpoint is connected to input and/or output channels. Each endpoint will manage its own Lifecycle (by default endpoints start automatically upon initialization - to prevent that add the "auto-startup" attribute with a value of "false"). Most importantly, notice that the objects are simple POJOs with strongly typed method arguments. For example, here is the Splitter:

public class OrderSplitter {
    public List<OrderItem> split(Order order) {
        return order.getItems();
    }
}

In the case of the Router, the return value does not have to be a MessageChannel instance (although it can be). As you see in this example, a String-value representing the channel name is returned instead.

public class DrinkRouter {
    public String resolveOrderItemChannel(OrderItem orderItem) {
        return (orderItem.isIced()) ? "coldDrinks" : "hotDrinks";
    }
}

Now turning back to the XML, you see that there are two <service-activator> elements. Each of these is delegating to the same Barista instance but different methods: prepareHotDrink or prepareColdDrink corresponding to the two channels where order items have been routed.

public class Barista {

    private long hotDrinkDelay = 5000;
    private long coldDrinkDelay = 1000;

    private AtomicInteger hotDrinkCounter = new AtomicInteger();
    private AtomicInteger coldDrinkCounter = new AtomicInteger();

    public void setHotDrinkDelay(long hotDrinkDelay) {
        this.hotDrinkDelay = hotDrinkDelay;
    }

    public void setColdDrinkDelay(long coldDrinkDelay) {
        this.coldDrinkDelay = coldDrinkDelay;
    }

    public Drink prepareHotDrink(OrderItem orderItem) {
        try {
            Thread.sleep(this.hotDrinkDelay);
            System.out.println(Thread.currentThread().getName()
                    + " prepared hot drink #" + hotDrinkCounter.incrementAndGet()
                    + " for order #" + orderItem.getOrder().getNumber()
                    + ": " + orderItem);
            return new Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(),
                    orderItem.isIced(), orderItem.getShots());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
    }

    public Drink prepareColdDrink(OrderItem orderItem) {
        try {
            Thread.sleep(this.coldDrinkDelay);
            System.out.println(Thread.currentThread().getName()
                    + " prepared cold drink #" + coldDrinkCounter.incrementAndGet()
                    + " for order #" + orderItem.getOrder().getNumber() + ": "
                    + orderItem);
            return new Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(),
                    orderItem.isIced(), orderItem.getShots());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
    }
}

As you can see from the code excerpt above, the barista methods have different delays (the hot drinks take 5 times as long to prepare). This simulates work being completed at different rates. When the`CafeDemo` main method runs, it will loop 100 times sending a single hot drink and a single cold drink each time. It actually sends the messages by invoking the placeOrder method on the Cafe interface. Above, you will see that the <gateway> element is specified in the configuration file. This triggers the creation of a proxy that implements the given service-interface and connects it to a channel. The channel name is provided on the @Gateway annotation of the Cafe interface.

public interface Cafe {

    @Gateway(requestChannel="orders")
    void placeOrder(Order order);

}

Finally, have a look at the main() method of the CafeDemo itself.

public static void main(String[] args) {
    AbstractApplicationContext context = null;
    if (args.length > 0) {
        context = new FileSystemXmlApplicationContext(args);
    }
    else {
        context = new ClassPathXmlApplicationContext("cafeDemo.xml", CafeDemo.class);
    }
    Cafe cafe = context.getBean("cafe", Cafe.class);
    for (int i = 1; i <= 100; i++) {
        Order order = new Order(i);
        order.addItem(DrinkType.LATTE, 2, false);
        order.addItem(DrinkType.MOCHA, 3, true);
        cafe.placeOrder(order);
    }
}
[Tip]Tip

To run this sample as well as 8 others, refer to the README.txt within the "samples" directory of the main distribution as described at the beginning of this chapter.

When you run cafeDemo, you will see that the cold drinks are initially prepared more quickly than the hot drinks. Because there is an aggregator, the cold drinks are effectively limited by the rate of the hot drink preparation. This is to be expected based on their respective delays of 1000 and 5000 milliseconds. However, by configuring a poller with a concurrent task executor, you can dramatically change the results. For example, you could use a thread pool executor with 5 workers for the hot drink barista while keeping the cold drink barista as it is:

<int:service-activator input-channel="hotDrinks"
                     ref="barista"
                     method="prepareHotDrink"
                     output-channel="preparedDrinks"/>

  <int:service-activator input-channel="hotDrinks"
                     ref="barista"
                     method="prepareHotDrink"
                     output-channel="preparedDrinks">
      <int:poller task-executor="pool" fixed-rate="1000"/>
  </int:service-activator>

  <task:executor id="pool" pool-size="5"/>

Also, notice that the worker thread name is displayed with each invocation. You will see that the hot drinks are prepared by the task-executor threads. If you provide a much shorter poller interval (such as 100 milliseconds), then you will notice that occasionally it throttles the input by forcing the task-scheduler (the caller) to invoke the operation.

[Note]Note

In addition to experimenting with the poller’s concurrency settings, you can also add the transactional sub-element and then refer to any PlatformTransactionManager instance within the context.

==== The XML Messaging Sample

The xml messaging sample in basic/xml illustrates how to use some of the provided components which deal with xml payloads. The sample uses the idea of processing an order for books represented as xml.

NOTE:This sample shows that the namespace prefix can be whatever you want; while we usually use, int-xml for integration XML components, the sample uses si-xml.

First the order is split into a number of messages, each one representing a single order item using the XPath splitter component.

<si-xml:xpath-splitter id="orderItemSplitter" input-channel="ordersChannel"
              output-channel="stockCheckerChannel" create-documents="true">
      <si-xml:xpath-expression expression="/orderNs:order/orderNs:orderItem"
                                namespace-map="orderNamespaceMap" />
  </si-xml:xpath-splitter>

A service activator is then used to pass the message into a stock checker POJO. The order item document is enriched with information from the stock checker about order item stock level. This enriched order item message is then used to route the message. In the case where the order item is in stock the message is routed to the warehouse.

<si-xml:xpath-router id="instockRouter" input-channel="orderRoutingChannel" resolution-required="true">
    <si-xml:xpath-expression expression="/orderNs:orderItem/@in-stock" namespace-map="orderNamespaceMap" />
    <si-xml:mapping value="true" channel="warehouseDispatchChannel"/>
    <si-xml:mapping value="false" channel="outOfStockChannel"/>
</si-xml:xpath-router>

Where the order item is not in stock the message is transformed using xslt into a format suitable for sending to the supplier.

<si-xml:xslt-transformer input-channel="outOfStockChannel"
  output-channel="resupplyOrderChannel"
  xsl-resource="classpath:org/springframework/integration/samples/xml/bigBooksSupplierTransformer.xsl"/>

== Configuration

=== Introduction

Spring Integration offers a number of configuration options. Which option you choose depends upon your particular needs and at what level you prefer to work. As with the Spring framework in general, it is also possible to mix and match the various techniques according to the particular problem at hand. For example, you may choose the XSD-based namespace for the majority of configuration combined with a handful of objects that are configured with annotations. As much as possible, the two provide consistent naming. XML elements defined by the XSD schema will match the names of annotations, and the attributes of those XML elements will match the names of annotation properties. Direct usage of the API is of course always an option, but we expect that most users will choose one of the higher-level options, or a combination of the namespace-based and annotation-driven configuration.

=== Namespace Support

Spring Integration components can be configured with XML elements that map directly to the terminology and concepts of enterprise integration. In many cases, the element names match those of the Enterprise Integration Patterns.

To enable Spring Integration’s core namespace support within your Spring configuration files, add the following namespace reference and schema mapping in your top-level beans element:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
           https://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/integration
           https://www.springframework.org/schema/integration/spring-integration.xsd">

You can choose any name after "xmlns:"; int is used here for clarity, but you might prefer a shorter abbreviation. Of course if you are using an XML-editor or IDE support, then the availability of auto-completion may convince you to keep the longer name for clarity. Alternatively, you can create configuration files that use the Spring Integration schema as the primary namespace:

<beans:beans xmlns="http://www.springframework.org/schema/integration"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:beans="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
           https://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/integration
           https://www.springframework.org/schema/integration/spring-integration.xsd">

When using this alternative, no prefix is necessary for the Spring Integration elements. On the other hand, if you want to define a generic Spring "bean" within the same configuration file, then a prefix would be required for the bean element (<beans:bean .../>). Since it is generally a good idea to modularize the configuration files themselves based on responsibility and/or architectural layer, you may find it appropriate to use the latter approach in the integration-focused configuration files, since generic beans are seldom necessary within those same files. For purposes of this documentation, we will assume the "integration" namespace is primary.

Many other namespaces are provided within the Spring Integration distribution. In fact, each adapter type (JMS, File, etc.) that provides namespace support defines its elements within a separate schema. In order to use these elements, simply add the necessary namespaces with an "xmlns" entry and the corresponding "schemaLocation" mapping. For example, the following root element shows several of these namespace declarations:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-file="http://www.springframework.org/schema/integration/file"
  xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
  xmlns:int-mail="http://www.springframework.org/schema/integration/mail"
  xmlns:int-rmi="http://www.springframework.org/schema/integration/rmi"
  xmlns:int-ws="http://www.springframework.org/schema/integration/ws"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/file
    https://www.springframework.org/schema/integration/file/spring-integration-file.xsd
    http://www.springframework.org/schema/integration/jms
    https://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
    http://www.springframework.org/schema/integration/mail
    https://www.springframework.org/schema/integration/mail/spring-integration-mail.xsd
    http://www.springframework.org/schema/integration/rmi
    https://www.springframework.org/schema/integration/rmi/spring-integration-rmi.xsd
    http://www.springframework.org/schema/integration/ws
    https://www.springframework.org/schema/integration/ws/spring-integration-ws.xsd">
 ...
</beans>

The reference manual provides specific examples of the various elements in their corresponding chapters. Here, the main thing to recognize is the consistency of the naming for each namespace URI and schema location.

=== Configuring the Task Scheduler

In Spring Integration, the ApplicationContext plays the central role of a Message Bus, and there are only a couple configuration options to consider. First, you may want to control the central TaskScheduler instance. You can do so by providing a single bean with the name "taskScheduler". This is also defined as a constant:

IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME

By default Spring Integration relies on an instance of ThreadPoolTaskScheduler as described in the Task Execution and Scheduling section of the Spring Framework reference manual. That default TaskScheduler will startup automatically with a pool of 10 threads, but see Section 31.3.2, “TCP Failover Client Connection Factory”. If you provide your own TaskScheduler instance instead, you can set the autoStartup property to false, and/or you can provide your own pool size value.

When Polling Consumers provide an explicit task-executor reference in their configuration, the invocation of the handler methods will happen within that executor’s thread pool and not the main scheduler pool. However, when no task-executor is provided for an endpoint’s poller, it will be invoked by one of the main scheduler’s threads.

[Caution]Caution

Do not run long-running tasks on poller threads; use a task executor instead. If you have a lot of polling endpoints, you can cause thread starvation, unless you increase the pool size. Also, polling consumers have a default receiveTimeout of 1 second; since the poller thread blocks for this time, it is recommended that a task executor be used when many such endpoints exist, again to avoid starvation. Alternatively, reduce the receiveTimeout.

[Note]Note

An endpoint is a Polling Consumer if its input channel is one of the queue-based (i.e. pollable) channels. Event Driven Consumers are those having input channels that have dispatchers instead of queues (i.e. they are subscribable). Such endpoints have no poller configuration since their handlers will be invoked directly.

[Important]Important

When running in a JEE container, you may need to use Spring’s TimerManagerTaskScheduler as described here, instead of the default taskScheduler. To do that, simply define a bean with the appropriate JNDI name for your environment, for example:

<bean id="taskScheduler" class="o.s.scheduling.commonj.TimerManagerTaskScheduler">
    <property name="timerManagerName" value="tm/MyTimerManager" />
    <property name="resourceRef" value="true" />
</bean>

The next section will describe what happens if Exceptions occur within the asynchronous invocations.

=== Error Handling

As described in the overview at the very beginning of this manual, one of the main motivations behind a Message-oriented framework like Spring Integration is to promote loose-coupling between components. The Message Channel plays an important role in that producers and consumers do not have to know about each other. However, the advantages also have some drawbacks. Some things become more complicated in a very loosely coupled environment, and one example is error handling.

When sending a Message to a channel, the component that ultimately handles that Message may or may not be operating within the same thread as the sender. If using a simple default DirectChannel (with the <channel> element that has no <queue> sub-element and no task-executor attribute), the Message-handling will occur in the same thread as the Message-sending. In that case, if an Exception is thrown, it can be caught by the sender (or it may propagate past the sender if it is an uncaught RuntimeException). So far, everything is fine. This is the same behavior as an Exception-throwing operation in a normal call stack. However, when adding the asynchronous aspect, things become much more complicated. For instance, if the channel element does provide a queue sub-element, then the component that handles the Message will be operating in a different thread than the sender. The sender may have dropped the Message into the channel and moved on to other things. There is no way for the Exception to be thrown directly back to that sender using standard Exception throwing techniques. Instead, to handle errors for asynchronous processes requires an asynchronous error-handling mechanism as well.

Spring Integration supports error handling for its components by publishing errors to a Message Channel. Specifically, the Exception will become the payload of a Spring Integration Message. That Message will then be sent to a Message Channel that is resolved in a way that is similar to the replyChannel resolution. First, if the request Message being handled at the time the Exception occurred contains an errorChannel header (the header name is defined in the constant: MessageHeaders.ERROR_CHANNEL), the ErrorMessage will be sent to that channel. Otherwise, the error handler will send to a "global" channel whose bean name is "errorChannel" (this is also defined as a constant: IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME).

A default "errorChannel" bean is created behind the scenes by the Framework. However, you can just as easily define your own if you want to control the settings.

<int:channel id="errorChannel">
    <int:queue capacity="500"/>
</int:channel>
[Note]Note

The default "errorChannel" is a PublishSubscribeChannel.

The most important thing to understand here is that the messaging-based error handling will only apply to Exceptions that are thrown by a Spring Integration task that is executing within a TaskExecutor. This does not apply to Exceptions thrown by a handler that is operating within the same thread as the sender (e.g. through a DirectChannel as described above).

[Note]Note

When Exceptions occur in a scheduled poller task’s execution, those exceptions will be wrapped in ErrorMessages and sent to the errorChannel as well.

To enable global error handling, simply register a handler on that channel. For example, you can configure Spring Integration’s ErrorMessageExceptionTypeRouter as the handler of an endpoint that is subscribed to the errorChannel. That router can then spread the error messages across multiple channels based on Exception type.

=== Global Properties

Certain global framework properties can be overridden by providing a properties file on the classpath.

The default properties can be found in /META-INF/spring.integration.default.properties in the spring-integration-core jar. You can see them on GitHub here, but here are the current default values:

spring.integration.channels.autoCreate=true 1
spring.integration.channels.maxUnicastSubscribers=0x7fffffff 2
spring.integration.channels.maxBroadcastSubscribers=0x7fffffff 3
spring.integration.taskScheduler.poolSize=10 4
spring.integration.messagingTemplate.throwExceptionOnLateReply=false 5
spring.integration.messagingAnnotations.require.componentAnnotation=false 6
spring.integration.readOnly.headers= 7
spring.integration.endpoints.noAutoStartup= 8
spring.integration.postProcessDynamicBeans=false 9

1

When true, input-channel s will be automatically declared as DirectChannel s when not explicitly found in the application context.

2

This property provides the default number of subscribers allowed on, say, a DirectChannel. It can be used to avoid inadvertently subscribing multiple endpoints to the same channel. This can be overridden on individual channels with the max-subscribers attribute.

3

This property provides the default number of subscribers allowed on, say, a PublishSubscribeChannel. It can be used to avoid inadvertently subscribing more than the expected number of endpoints to the same channel. This can be overridden on individual channels with the max-subscribers attribute.

4

The number of threads available in the default taskScheduler bean; see Section 31.3.2, “TCP Failover Client Connection Factory”.

5

When true, messages that arrive at a gateway reply channel will throw an exception, when the gateway is not expecting a reply - because the sending thread has timed out, or already received a reply.

6

When true, Messaging Annotation Support (Section 31.3.2, “TCP Failover Client Connection Factory”) requires a declaration of the @MessageEndpoint (or any other @Component) annotation on the class level.

7

A comma-separated list of message header names which should not be populated into Message s during a header copying operation. The list is used by the DefaultMessageBuilderFactory bean and propagated to the IntegrationMessageHeaderAccessor instances (see the section called “MessageHeaderAccessor API”), used to build messages via MessageBuilder (see Section 5.1.4, “The MessageBuilder Helper Class”). By default only MessageHeaders.ID and MessageHeaders.TIMESTAMP are not copied during message building. Since version 4.3.2

8

A comma-separated list of AbstractEndpoint bean names patterns (xxx*, *xxx, *xxx* or xxx*yyy) which should not be started automatically during application startup. These endpoints can be started later manually by their bean name via Control Bus (see Section 9.6, “Control Bus”), by their role using the SmartLifecycleRoleController (see Section 8.2, “Endpoint Roles”) or via simple Lifecycle bean injection. The effect of this global property can be explicitly overridden by specifying auto-startup XML or autoStartup annotation attribute, or via call to the AbstractEndpoint.setAutoStartup() in bean definition. Since version 4.3.12

9

A boolean flag to indicate that BeanPostProcessor s should post-process beans registered at runtime, e.g. message channels created via IntegrationFlowContext can be supplied with global channel interceptors. Since version 4.3.15

These properties can be overridden by adding a file /META-INF/spring.integration.properties to the classpath. It is not necessary to provide all the properties, just those that you want to override.

[Note]Note

In versions prior to 4.3, these property names had a typographical error (...integraton...); they have now been corrected (...integration...).

=== Annotation Support

In addition to the XML namespace support for configuring Message Endpoints, it is also possible to use annotations. First, Spring Integration provides the class-level @MessageEndpoint as a stereotype annotation, meaning that it is itself annotated with Spring’s @Component annotation and is therefore recognized automatically as a bean definition when using Spring component-scanning.

Even more important are the various method-level annotations that indicate the annotated method is capable of handling a message. The following example demonstrates both:

@MessageEndpoint
public class FooService {

    @ServiceActivator
    public void processMessage(Message message) {
        ...
    }
}

Exactly what it means for the method to "handle" the Message depends on the particular annotation. Annotations available in Spring Integration include:

  • @Aggregator
  • @Filter
  • @Router
  • @ServiceActivator
  • @Splitter
  • @Transformer
  • @InboundChannelAdapter
  • @BridgeFrom
  • @BridgeTo
  • @MessagingGateway
  • @IntegrationComponentScan

The behavior of each is described in its own chapter or section within this reference.

[Note]Note

If you are using XML configuration in combination with annotations, the @MessageEndpoint annotation is not required. If you want to configure a POJO reference from the "ref" attribute of a <service-activator/> element, it is sufficient to provide the method-level annotations. In that case, the annotation prevents ambiguity even when no "method" attribute exists on the <service-activator/> element.

In most cases, the annotated handler method should not require the Message type as its parameter. Instead, the method parameter type can match the message’s payload type.

public class FooService {

    @ServiceActivator
    public void bar(Foo foo) {
        ...
    }

}

When the method parameter should be mapped from a value in the MessageHeaders, another option is to use the parameter-level @Header annotation. In general, methods annotated with the Spring Integration annotations can either accept the Message itself, the message payload, or a header value (with @Header) as the parameter. In fact, the method can accept a combination, such as:

public class FooService {

    @ServiceActivator
    public void bar(String payload, @Header("x") int valueX, @Header("y") int valueY) {
        ...
    }

}

There is also a @Headers annotation that provides all of the Message headers as a Map:

public class FooService {

    @ServiceActivator
    public void bar(String payload, @Headers Map<String, Object> headerMap) {
        ...
    }

}
[Note]Note

The value of the annotation can also be a SpEL expression (e.g., someHeader.toUpperCase()) which is useful when you wish to manipulate the header value before injecting it. It also provides an optional required property which specifies whether the attribute value must be available within the headers. The default value for required is true.

For several of these annotations, when a Message-handling method returns a non-null value, the endpoint will attempt to send a reply. This is consistent across both configuration options (namespace and annotations) in that such an endpoint’s output channel will be used if available, and the REPLY_CHANNEL message header value will be used as a fallback.

[Tip]Tip

The combination of output channels on endpoints and the reply channel message header enables a pipeline approach where multiple components have an output channel, and the final component simply allows the reply message to be forwarded to the reply channel as specified in the original request message. In other words, the final component depends on the information provided by the original sender and can dynamically support any number of clients as a result. This is an example of Return Address.

In addition to the examples shown here, these annotations also support inputChannel and outputChannel properties.

@Service
public class FooService {

    @ServiceActivator(inputChannel="input", outputChannel="output")
    public void bar(String payload, @Headers Map<String, Object> headerMap) {
        ...
    }

}

The processing of these annotations creates the same beans (AbstractEndpoint s and MessageHandler s (or MessageSource s for the inbound channel adapter - see below) as with similar xml components. The bean names are generated with this pattern: [componentName].[methodName].[decapitalizedAnnotationClassShortName] (e.g for the sample above - fooService.bar.serviceActivator) for the AbstractEndpoint and the same name with an additional .handler (.source) suffix for the MessageHandler (MessageSource) bean. The MessageHandler s (MessageSource s) are also eligible to be tracked by Section 9.3, “Message History”.

Starting with version 4.0, all Messaging Annotations provide SmartLifecycle options - autoStartup and phase to allow endpoint lifecycle control on application context initialization. They default to true and 0 respectively. To change the state of an endpoint (e.g` start()/stop()) obtain a reference to the endpoint bean using the `BeanFactory (or autowiring) and invoke the method(s), or send a command message to the Control Bus (Section 9.6, “Control Bus”). For these purposes you should use the beanName mentioned above.

@Poller

Before Spring Integration 4.0, the above Messaging Annotations required that the inputChannel was a reference to a SubscribableChannel. For PollableChannel s there was need to use a <int:bridge/>, to configure a <int:poller/> to make the composite endpoint - a PollingConsumer. Starting with version 4.0, the @Poller annotation has been introduced to allow the configuration of poller attributes directly on the above Messaging Annotations:

public class AnnotationService {

	@Transformer(inputChannel = "input", outputChannel = "output",
		poller = @Poller(maxMessagesPerPoll = "${poller.maxMessagesPerPoll}", fixedDelay = "${poller.fixedDelay}"))
	public String handle(String payload) {
		...
	}
}

This annotation provides only simple PollerMetadata options. The @Poller's attributes maxMessagesPerPoll, fixedDelay, fixedRate and cron can be configured with _property-placeholder_s. If it is necessary to provide more polling options (e.g. transaction, advice-chain, error-handler), the`PollerMetadata` should be configured as a generic bean with its bean name used for @Poller's value attribute. In this case, no other attributes are allowed (they would be specified on the PollerMetadata bean). Note, if inputChannel is PollableChannel and no @Poller is configured, the default PollerMetadata will be used, if it is present in the application context. To declare the default poller using @Configuration, use:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
	PollerMetadata pollerMetadata = new PollerMetadata();
	pollerMetadata.setTrigger(new PeriodicTrigger(10));
	return pollerMetadata;
}

With this endpoint using the default poller:

public class AnnotationService {

	@Transformer(inputChannel = "aPollableChannel", outputChannel = "output")
	public String handle(String payload) {
		...
	}
}

To use a named poller, use:

@Bean
public PollerMetadata myPoller() {
	PollerMetadata pollerMetadata = new PollerMetadata();
	pollerMetadata.setTrigger(new PeriodicTrigger(1000));
	return pollerMetadata;
}

With this endpoint using the default poller:

public class AnnotationService {

	@Transformer(inputChannel = "aPollableChannel", outputChannel = "output"
						poller = @Poller("myPoller"))
	public String handle(String payload) {
		...
	}
}

@InboundChannelAdapter

Starting with version 4.0, the @InboundChannelAdapter method annotation is available. This produces a SourcePollingChannelAdapter integration component based on a MethodInvokingMessageSource for the annotated method. This annotation is an analogue of <int:inbound-channel-adapter> XML component and has the same restrictions: the method cannot have parameters, and the return type must not be void. It has two attributes: value - the required MessageChannel bean name and poller - an optional @Poller annotation, as described above. If there is need to provide some MessageHeaders, use a Message<?> return type and build the Message<?> within the method using a MessageBuilder to configure its MessageHeaders.

@InboundChannelAdapter("counterChannel")
public Integer count() {
	return this.counter.incrementAndGet();
}

@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixed-rate = "5000"))
public String foo() {
	return "foo";
}

Starting with version 4.3 the channel alias for the value annotation attribute has been introduced for better source code readability. Also the target MessageChannel bean is resolved in the SourcePollingChannelAdapter by the provided name (outputChannelName options) on the first receive() call, not during initialization phase. It allows the late binding logic, when the target MessageChannel bean from the consumer perspective is created and registered a bit later than the @InboundChannelAdapter parsing phase.

The first example requires that the default poller has been declared elsewhere in the application context.

@MessagingGateway

See Section 8.4.6, “@MessagingGateway Annotation”.

@IntegrationComponentScan

The standard Spring Framework @ComponentScan annotation doesn’t scan interfaces for stereotype @Component annotations. To overcome this limitation and allow the configuration of @MessagingGateway (see Section 8.4.6, “@MessagingGateway Annotation”), the @IntegrationComponentScan mechanism has been introduced. This annotation must be placed along with a @Configuration annotation, and customized for the scanning options, such as basePackages and basePackageClasses. In this case all discovered interfaces annotated with @MessagingGateway will be parsed and registered as a GatewayProxyFactoryBean s. All other class-based components are parsed by the standard @ComponentScan. In future, more scanning logic may be added to the @IntegrationComponentScan.

==== Messaging Meta-Annotations

Starting with version 4.0, all Messaging Annotations can be configured as meta-annotations and all user-defined Messaging Annotations can define the same attributes to override their default values. In addition, meta-annotations can be configured hierarchically:

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@ServiceActivator(inputChannel = "annInput", outputChannel = "annOutput")
public @interface MyServiceActivator {

	String[] adviceChain = { "annAdvice" };
}

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@MyServiceActivator
public @interface MyServiceActivator1 {

	String inputChannel();

	String outputChannel();
}
...

@MyServiceActivator1(inputChannel = "inputChannel", outputChannel = "outputChannel")
public Object service(Object payload) {
   ...
}

This allows users to set defaults for various attributes and enables isolation of framework Java dependencies to user annotations, avoiding their use in user classes. If the framework finds a method with a user annotation that has a framework meta-annotation, it is treated as if the method was annotated directly with the framework annotation.

==== Annotations on @Beans

Starting with version 4.0, Messaging Annotations can be configured on @Bean method definitions in @Configuration classes, to produce Message Endpoints based on the beans, not methods. It is useful when @Bean definitions are "out of the box" MessageHandler s (AggregatingMessageHandler, DefaultMessageSplitter etc.), Transformer s (JsonToObjectTransformer, ClaimCheckOutTransformer etc.), MessageSource s (FileReadingMessageSource, RedisStoreMessageSource etc.):

@Configuration
@EnableIntegration
public class MyFlowConfiguration {

	@Bean
	@InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "1000"))
	public MessageSource<String> consoleSource() {
		return CharacterStreamReadingMessageSource.stdin();
	}

	@Bean
	@Transformer(inputChannel = "inputChannel", outputChannel = "httpChannel")
	public ObjectToMapTransformer toMapTransformer() {
		return new ObjectToMapTransformer();
	}

	@Bean
	@ServiceActivator(inputChannel = "httpChannel")
	public MessageHandler httpHandler() {
		HttpRequestExecutingMessageHandler handler = new HttpRequestExecutingMessageHandler("http://foo/service");
		handler.setExpectedResponseType(String.class);
		handler.setOutputChannelName("outputChannel");
		return handler;
	}

	@Bean
	@ServiceActivator(inputChannel = "outputChannel")
	public LoggingHandler loggingHandler() {
		return new LoggingHandler("info");
	}

}

The meta-annotation rules work on @Bean methods as well (@MyServiceActivator above can be applied to a @Bean definition).

[Note]Note

When using these annotations on consumer @Bean definitions, if the bean definition returns an appropriate MessageHandler (depending on the annotation type), attributes such as outputChannel, requiresReply etc, must be set on the MessageHandler @Bean definition itself. The only annotation attributes used are adviceChain, autoStartup, inputChannel, phase, poller, all other attributes are for the handler.

[Note]Note

The bean names are generated with this algorithm: * The MessageHandler (MessageSource) @Bean gets its own standard name from the method name or name attribute on the @Bean. This works like there is no Messaging Annotation on the @Bean method. * The AbstractEndpoint bean name is generated with the pattern: [configurationComponentName].[methodName].[decapitalizedAnnotationClassShortName]. For example the endpoint (SourcePollingChannelAdapter) for the consoleSource() definition above gets a bean name like: myFlowConfiguration.consoleSource.inboundChannelAdapter.

[Important]Important

When using these annotations on @Bean definitions, the inputChannel must reference a declared bean; channels are not automatically declared in this case.

[Note]Note

With Java & Annotation configuration we can use any @Conditional (e.g. @Profile) definition on the @Bean method level, meaning to skip the bean registration by some condition reason:

@Bean
@ServiceActivator(inputChannel = "skippedChannel")
@Profile("foo")
public MessageHandler skipped() {
	return System.out::println;
}

Together with the existing Spring Container logic, the Messaging Endpoint bean, based on the @ServiceActivator annotation, won’t be registered as well.

==== Creating a Bridge with Annotations

Starting with version 4.0, the Messaging Annotation and Java configuration provides @BridgeFrom and @BridgeTo @Bean method annotations to mark MessageChannel beans in @Configuration classes. This is just for completeness, providing a convenient mechanism to declare a`BridgeHandler` and its Message Endpoint configuration:

@Bean
public PollableChannel bridgeFromInput() {
	return new QueueChannel();
}

@Bean
@BridgeFrom(value = "bridgeFromInput", poller = @Poller(fixedDelay = "1000"))
public MessageChannel bridgeFromOutput() {
	return new DirectChannel();
}
@Bean
public QueueChannel bridgeToOutput() {
	return new QueueChannel();
}

@Bean
@BridgeTo("bridgeToOutput")
public MessageChannel bridgeToInput() {
	return new DirectChannel();
}

These annotations can be used as meta-annotations as well.

==== Advising Annotated Endpoints

See Section 8.9.7, “Advising Endpoints Using Annotations”.

=== Message Mapping rules and conventions

Spring Integration implements a flexible facility to map Messages to Methods and their arguments without providing extra configuration by relying on some default rules as well as defining certain conventions.

==== Simple Scenarios

Single un-annotated parameter (object or primitive) which is not a Map/Properties with non-void return type;

public String foo(Object o);

Details:

Input parameter is Message Payload. If parameter type is not compatible with Message Payload an attempt will be made to convert it using Conversion Service provided by Spring 3.0. The return value will be incorporated as a Payload of the returned Message

Single un-annotated parameter (object or primitive) which is not a Map/Properties with Message return type;

public Message  foo(Object o);

Details:

Input parameter is Message Payload. If parameter type is not compatible with Message Payload an attempt will be made to convert it using Conversion Service provided by Spring 3.0. The return value is a newly constructed Message that will be sent to the next destination.

_Single parameter which is a Message or its subclass with arbitrary object/primitive return type; _

public int foo(Message  msg);

Details:

Input parameter is Message itself. The return value will become a payload of the Message that will be sent to the next destination.

Single parameter which is a Message or its subclass with Message or its subclass as a return type;

public Message foo(Message msg);

Details:

Input parameter is Message itself. The return value is a newly constructed Message that will be sent to the next destination.

Single parameter which is of type Map or Properties with Message as a return type;

public Message foo(Map m);

Details:

This one is a bit interesting. Although at first it might seem like an easy mapping straight to Message Headers, the preference is always given to a Message Payload. This means that if Message Payload is of type Map, this input argument will represent Message Payload. However if Message Payload is not of type Map, then no conversion via Conversion Service will be attempted and the input argument will be mapped to Message Headers.

Two parameters where one of them is arbitrary non-Map/Properties type object/primitive and another is Map/Properties type object (regardless of the return)

public Message foo(Map h, <T> t);

Details:

This combination contains two input parameters where one of them is of type Map. Naturally the non-Map parameters (regardless of the order) will be mapped to a Message Payload and the Map/Properties (regardless of the order) will be mapped to  Message Headers giving you a nice POJO way of interacting with Message structure.

No parameters (regardless of the return)

public String foo();

Details:

This Message Handler method will be invoked based on the Message sent to the input channel this handler is hooked up to, however no Message data will be mapped, thus making Message act as event/trigger to invoke such handlerThe output will be mapped according to the rules above

No parameters, void return

public void foo();

Details:

Same as above, but no output 

Annotation based mappings

Annotation based mapping is the safest and least ambiguous approach to map Messages to Methods. There wil be many pointers to annotation based mapping throughout this manual, however here are couple of examples:

public String foo(@Payload String s, @Header("foo") String b) 

Very simple and explicit way of mapping Messages to method. As you’ll see later on, without an annotation this signature would result in an ambiguous condition. However by explicitly mapping the first argument to a Message Payload and the second argument to a value of the foo Message Header, we have avoided any ambiguity.

public String foo(@Payload String s, @RequestParam("foo") String b) 

Looks almost identical to the previous example, however @RequestMapping or any other non-Spring Integration mapping annotation is irrelevant and therefore will be ignored leaving the second parameter unmapped. Although the second parameter could easily be mapped to a Payload, there can only be one Payload. Therefore this method mapping is ambiguous.

public String foo(String s, @Header("foo") String b) 

The same as above. The only difference is that the first argument will be mapped to the Message Payload implicitly.

public String foo(@Headers Map m, @Header("foo") Map f, @Header("bar") String bar)

Yet another signature that would definitely be treated as ambiguous without annotations because it has more than 2 arguments. Furthermore, two of them are Maps. However, with annotation-based mapping, the ambiguity is easily avoided. In this example the first argument is mapped to all the Message Headers, while the second and third argument map to the values of Message Headers foo and bar. The payload is not being mapped to any argument.

==== Complex Scenarios

Multiple parameters:

Multiple parameters could create a lot of ambiguity with regards to determining the appropriate mappings. The general advice is to annotate your method parameters with @Payload and/or @Header/@Headers Below are some of the examples of ambiguous conditions which result in an Exception being raised.

public String foo(String s, int i)
  • the two parameters are equal in weight, therefore there is no way to determine which one is a payload.
public String foo(String s, Map m, String b)
  • almost the same as above. Although the Map could be easily mapped to Message Headers, there is no way to determine what to do with the two Strings.
public String foo(Map m, Map f)
  • although one might argue that one Map could be mapped to Message Payload and another one to Message Headers, it would be unreasonable to rely on the order (e.g., first is Payload, second Headers)
[Tip]Tip

Basically any method signature with more than one method argument which is not (Map, <T>), and those parameters are not annotated, will result in an ambiguous condition thus triggering an Exception.

Multiple methods:

Message Handlers with multiple methods are mapped based on the same rules that are described above, however some scenarios might still look confusing.

Multiple methods (same or different name) with legal (mappable) signatures:

public class Foo {
  public String foo(String str, Map m);

  public String foo(Map m);
}

As you can see, the Message could be mapped to either method. The first method would be invoked where Message Payload could be mapped to str  and Message Headers could be mapped to m. The second method could easily also be a candidate where only Message Headers are mapped to m. To make meters worse both methods have the same name which at first might look very ambiguous considering the following configuration:

<int:service-activator input-channel="input" output-channel="output" method="foo">
  <bean class="org.bar.Foo"/>
</int:service-activator>

At this point it would be important to understand Spring Integration mapping Conventions where at the very core, mappings are based on Payload first and everything else next. In other words the method whose argument could be mapped to a Payload will take precedence over all other methods.

On the other hand let’s look at slightly different example:

public class Foo {
  public String foo(String str, Map m);

  public String foo(String str);
}

If you look at it you can probably see a truly ambiguous condition. In this example since both methods have signatures that could be mapped to a Message Payload. They also have the same name. Such handler methods will trigger an Exception. However if the method names were different you could influence the mapping with a method attribute (see below):

public class Foo {
  public String foo(String str, Map m);

  public String bar(String str);
}
<int:service-activator input-channel="input" output-channel="output" method="bar">
  <bean class="org.bar.Foo"/>
</int:service-activator>

Now there is no ambiguity since the configuration explicitly maps to the bar method which has no name conflicts.

== Additional Resources

=== Spring Integration Home

The definitive source of information about Spring Integration is the Spring Integration Home at https://spring.io. That site serves as a hub of information and is the best place to find up-to-date announcements about the project as well as links to articles, blogs, and new sample applications.

== Change History

=== Changes between 4.1 and 4.2

Please be sure to also see the Migration Guide for important changes that might affect your applications. Migration guides for all versions back to 2.1 can be found on the Wiki.

=== New Components

==== Major Management/JMX Rework

A new MetricsFactory strategy interface has been introduced. This, together with other changes in the JMX and management infrastructure provides much more control over management configuration and runtime performance.

However, this has some important implications for (some) user environments.

For complete details, see Section 9.1, “Metrics and Management” and the section called “JMX Improvements”.

==== MongoDB Metadata Store

The MongoDbMetadataStore is now available. For more information, see Section 22.3.2, “MongoDB Metadata Store”.

==== SecuredChannel Annotation

The @SecuredChannel annotation has been introduced, replacing the deprecated ChannelSecurityInterceptorFactoryBean. For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.

==== SecurityContext Propagation

The SecurityContextPropagationChannelInterceptor has been introduced for the SecurityContext propagation from one message flow’s Thread to another. For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.

==== FileSplitter

The FileSplitter, which splits text files into lines, was added in 4.1.2. It now has full support in the int-file: namespace; see Section 14.5, “File Splitter” for more information.

==== Zookeeper Support

Zookeeper support has been added to the framework to assist when running on a clustered/multi-host environment.

  • ZookeeperMetadataStore
  • ZookeeperLockRegistry
  • Zookeeper Leadership

See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.

==== Thread Barrier

A new thread <int:barrier/> component is available allowing a thread to be suspended until some asynchronous event occurs.

See Section 6.8, “Thread Barrier” for more information.

==== STOMP Support

STOMP support has been added to the framework as inbound and outbound channel adapters pair. See Chapter 28, STOMP Support for more information.

==== Codec A new Codec abstraction has been introduced, to encode/decode objects to/from byte[]. An implementation that uses Kryo is provided. Codec-based transformers and message converters are also provided.

See Section 7.4, “Codec” for more information.

==== Message PreparedStatement Setter

A new MessagePreparedStatementSetter functional interface callback is available for the JdbcMessageHandler (<int-jdbc:outbound-gateway> and <int-jdbc:outbound-channel-adapter>) as an alternative to the SqlParameterSourceFactory to populate parameters on the PreparedStatement with the requestMessage context.

See Section 18.2, “Outbound Channel Adapter” for more information.

=== General Changes

==== Wire Tap

As an alternative to the existing selector attribute, the <wire-tap/> now supports the selector-expression attribute.

==== File Changes

See Chapter 14, File Support for more information about these changes.

===== Appending New Lines

The <int-file:outbound-channel-adapter> and <int-file:outbound-gateway> now support an append-new-line attribute. If set to true, a new line is appended to the file after a message is written. The default attribute value is false.

===== Ignoring Hidden Files

The ignore-hidden attribute has been introduced for the <int-file:inbound-channel-adapter> to pick up or not the hidden files from the source directory. It is true by default.

===== Writing InputStream Payloads

The FileWritingMessageHandler now also accepts InputStream as a valid message payload type.

===== HeadDirectoryScanner

The HeadDirectoryScanner can now be used with other FileListFilter s.

===== Last Modified Filter

The LastModifiedFileListFilter has been added.

===== WatchService Directory Scanner

The WatchServiceDirectoryScanner is now available.

===== Persistent File List Filter Changes

The AbstractPersistentFileListFilter has a new property flushOnUpdate which, when set to true, will flush() the metadata store if it implements Flushable (e.g. the PropertiesPersistingMetadataStore).

==== Class Package Change

The ScatterGatherHandler class has been moved from the org.springframework.integration.handler to the org.springframework.integration.scattergather.

==== TCP Changes

===== TCP Serializers

The TCP Serializers no longer flush() the OutputStream; this is now done by the TcpNxxConnection classes. If you are using the serializers directly within user code, you may have to flush() the OutputStream.

===== Server Socket Exceptions

TcpConnectionServerExceptionEvent s are now published whenever an unexpected exception occurs on a TCP server socket (also added to 4.1.3, 4.0.7). See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.

===== TCP Server Port

If a TCP server socket factory is configured to listen on a random port, the actual port chosen by the OS can now be obtained using getPort(). getServerSocketAddress() is also available.

See Section 31.3, “TCP Connection Factories” for more information.

===== TCP Gateway Remote Timeout

The TcpOutboundGateway now supports remote-timeout-expression as an alternative to the existing remote-timeout attribute. This allows setting the timeout based on each message.

Also, the remote-timeout no longer defaults to the same value as reply-timeout which has a completely different meaning.

See Table 31.6, “TCP Outbound Gateway Attributes” for more information.

===== TCP SSLSession Available for Header Mapping

TcpConnection s now support getSslSession() to enable users to extract information from the session to add to message headers.

See IP Message Headers for more information.

===== TCP Events

New events are now published whenever a correlation exception occurs - for example sending a message to a non-existent socket.

The TcpConnectionEventListeningMessageProducer is deprecated; use the generic event adapter instead.

See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.

==== @InboundChannelAdapter

Previously, the @Poller on an inbound channel adapter defaulted the maxMessagesPerPoll attribute to -1 (infinity). This was inconsistent with the XML configuration of <inbound-channel-adapter/> s, which defaults to 1. The annotation now defaults this attribute to 1.

==== API Changes

o.s.integration.util.FunctionIterator now requires a o.s.integration.util.Function instead of a reactor.function.Function. This was done to remove an unnecessary hard dependency on Reactor. Any uses of this iterator will need to change the import.

Of course, Reactor is still supported for functionality such as the Promise gateway; the dependency was removed for those users who don’t need it.

==== JMS Changes

===== Reply Listener Lazy Initialization

It is now possible to configure the reply listener in JMS outbound gateways to be initialized on-demand and stopped after an idle period, instead of being controlled by the gateway’s lifecycle.

See Section 20.5, “Outbound Gateway” for more information.

===== Conversion Errors in Message-Driven Endpoints

The error-channel now is used for the conversion errors, which have caused a transaction rollback and message redelivery previously.

See Section 20.2, “Message-Driven Channel Adapter” and Section 20.4, “Inbound Gateway” for more information.

===== Default Acknowledge Mode

When using an implicitly defined DefaultMessageListenerContainer, the default acknowledge is now transacted. transacted is recommended when using this container, to avoid message loss. This default now applies to the message-driven inbound adapter and the inbound gateway, it was already the default for jms-backed channels.

See Section 20.2, “Message-Driven Channel Adapter” and Section 20.4, “Inbound Gateway” for more information.

===== Shared Subscriptions

Namespace support for shared subscriptions (JMS 2.0) has been added to message-driven endpoints and the <int-jms:publish-subscribe-channel>. Previously, you had to wire up listener containers as <bean/> s to use shared connections.

See Chapter 20, JMS Support for more information.

==== Conditional Pollers Much more flexibility is now provided for dynamic polling.

See Section 4.2.3, “Conditional Pollers for Message Sources” for more information.

==== AMQP Changes

===== Publisher Confirms

The <int-amqp:outbound-gateway> now supports confirm-correlation-expression and confirm-(n)ack-channel attributes with similar purpose as for <int-amqp:outbound-channel-adapter>.

===== Correlation Data

For both the outbound channel adapter and gateway, if the correlation data is a Message<?>, it will be the basis of the message on the ack/nack channel, with the additional header(s) added. Previously, any correlation data (including Message<?>) was returned as the payload of the ack/nack message.

===== The Inbound Gateway properties

The <int-amqp:inbound-gateway> now exposes the amqp-template attribute to allow more control over an external bean for the reply RabbitTemplate or even provide your own AmqpTemplate implementation. In addition the default-reply-to is exposed to be used if request message doesn’t have replyTo property.

See Chapter 11, AMQP Support for more information.

==== XPath Splitter Improvements

The XPathMessageSplitter (<int-xml:xpath-splitter>) now allows the configuration of output-properties for the internal javax.xml.transform.Transformer and supports an Iterator mode (defaults to true) for the xpath evaluation org.w3c.dom.NodeList result.

See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.

==== HTTP Changes

===== CORS

The HTTP Inbound Endpoints (<int-http:inbound-channel-adapter> and <int-http:inbound-gateway>) now allow the configuration of Cross-Origin Resource Sharing (CORS).

See Section 17.4.4, “Cross-Origin Resource Sharing (CORS) Support” for more information.

===== Inbound Gateway Timeout

The HTTP inbound gateway can be configured as to what status code to return when a request times out. The default is now 500 Internal Server Error instead of 200 OK.

See Section 17.4.5, “Response StatusCode” for more information.

===== Form Data

Documentation is provided for when proxying multipart/form-data requests. See Chapter 17, HTTP Support for more information.

==== Gateway Changes

===== Gateway Methods can Return CompletableFuture<?>

When using Java 8, gateway methods can now return CompletableFuture<?>. See the section called “CompletableFuture” for more information.

===== MessagingGateway Annotation

The request and reply timeout properties are now String instead of Long to allow configuration with property placeholders or SpEL. See Section 8.4.6, “@MessagingGateway Annotation”.

==== Aggregator Changes

===== Aggregator Performance

This release includes some performance improvements for aggregating components (aggregator, resequencer, etc), by more efficiently removing messages from groups when they are released. New methods (removeMessagesFromGroup) have been added to the message store. Set the removeBatchSize property (default 100) to adjust the number of messages deleted in each operation. Currently, JDBC, Redis and MongoDB message stores support this property.

===== Output Message Group Processor

When using a ref or inner bean for the aggregator, it is now possible to bind a MessageGroupProcessor directly. In addition, a SimpleMessageGroupProcessor is provided that simply returns the collection of messages in the group. When an output processor produces a collection of Message<?>, the aggregator releases those messages individually. Configuring the SimpleMessageGroupProcessor makes the aggregator a message barrier, were messages are held up until they all arrive, and are then released individually. See Section 6.4, “Aggregator” for more information.

==== (S)FTP Changes

===== Inbound channel adapters

You can now specify a remote-directory-expression on the inbound channel adapters, to determine the directory at runtime. See Chapter 15, FTP/FTPS Adapters and Chapter 27, SFTP Adapters for more information.

===== Gateway Partial Results

When use FTP/SFTP outbound gateways to operate on multiple files (mget, mput), it is possible for an exception to occur after part of the request is completed. If such a condition occurs, a PartialSuccessException is thrown containing the partial results. See Section 15.7, “FTP Outbound Gateway” and Section 27.10, “SFTP Outbound Gateway” for more information.

===== Delegating Session Factory

A delegating session factory is now available, enabling the selection of a particular session factory based on some thread context value.

See Section 15.3, “Delegating Session Factory” and Section 27.4, “Delegating Session Factory” for more information.

===== Default Sftp Session Factory

Previously, the DefaultSftpSessionFactory unconditionally allowed connections to unknown hosts. This is now configurable (default false).

The factory now requires a configured knownHosts file unless the allowUnknownKeys property is true (default false).

See Section 27.2.1, “Configuration Properties” for more information.

===== Message Session Callback

The MessageSessionCallback<F, T> has been introduced to perform any custom Session operation(s) with the requestMessage context in the <int-(s)ftp:outbound-gateway/>.

See Section 15.10, “MessageSessionCallback” and Section 27.12, “MessageSessionCallback” for more information.

==== Websocket Changes

WebSocketHandlerDecoratorFactory support has been added to the ServerWebSocketContainer to allow chained customization for the internal WebSocketHandler. See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.

==== Application Event Adapters changes

The ApplicationEvent adapters can now operate with payload as event directly allow omitting custom ApplicationEvent extensions. The publish-payload boolean attribute has been introduced on the <int-event:outbound-channel-adapter> for this purpose. See Chapter 12, Spring ApplicationEvent Support for more information.

=== Changes between 4.0 and 4.1

Please be sure to also see the Migration Guide for important changes that might affect your applications. Migration guides for all versions back to 2.1 can be found on the Wiki.

==== New Components

===== Promise<?> Gateway

A Reactor Promise return type is now supported for Messaging Gateway methods. See Section 8.4.9, “Asynchronous Gateway”.

===== WebSocket support

The WebSocket module is now available. It is fully based on the Spring WebSocket and Spring Messaging modules and provides an <inbound-channel-adapter> and an <outbound-channel-adapter>. See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.

===== Scatter-Gather EIP pattern

The Scatter-Gather EIP pattern is now implemented. See Section 6.7, “Scatter-Gather” for more information.

===== Routing Slip Pattern

The Routing Slip EIP pattern implementation is now provided. See the section called “Routing Slip” for more information.

===== Idempotent Receiver Pattern

The Idempotent Receiver EIP implementation is now provided via the <idempotent-receiver> component in XML, or the IdempotentReceiverInterceptor and IdempotentReceiver annotation when using Java Configuration. See Section 8.9.10, “Idempotent Receiver Enterprise Integration Pattern” and their JavaDocs for more information.

===== BoonJsonObjectMapper

The Boon`JsonObjectMapper` is now provided for the JSON transformers. See Section 7.1, “Transformer” for more information.

===== Redis Queue Gateways

The <redis-queue-inbound-gateway> and <redis-queue-outbound-gateway> components are now provided. See Section 24.10, “Redis Queue Inbound Gateway” and Section 24.9, “Redis Queue Outbound Gateway”.

===== PollSkipAdvice

The PollSkipAdvice is now provided to be used within <advice-chain> of the <poller> to determine if the current poll should be suppressed (skipped) by some condition implemented with PollSkipStrategy. See Section 4.2, “Poller” for more information.

==== General Changes

===== AMQP Inbound Endpoints, Channel

Elements that utilize a message listener container (inbound endpoints, channel) now support the missing-queues-fatal attribute. See Chapter 11, AMQP Support for more information.

===== AMQP Outbound Endpoints

The AMQP outbound endpoints support a new property lazy-connect (default true). When true, the connection to the broker is not established until the first message arrives (assuming there are no inbound endpoints, which always attempt to establish the connection during startup). When set the false an attempt to establish the connection is made during application startup. See Chapter 11, AMQP Support for more information.

===== SimpleMessageStore

The SimpleMessageStore no longer makes a copy of the group when calling getMessageGroup(). See Caution with SimpleMessageStore for more information.

===== Web Service Outbound Gateway: encode-uri

The <ws:outbound-gateway/> now provides an encode-uri attribute to allow disabling the encoding of the URI object before sending the request.

===== Http Inbound Channel Adapter and StatusCode

The <http:inbound-channel-adapter> can now be configured with a status-code-expression to override the default 200 OK status. See Section 17.4, “HTTP Namespace Support” for more information.

===== MQTT Adapter Changes

The MQTT channel adapters can now be configured to connect to multiple servers, for example, to support High Availability (HA). See Chapter 23, MQTT Support for more information.

The MQTT message-driven channel adapter now supports specifying the QoS setting for each subscription. See Section 23.2, “Inbound (message-driven) Channel Adapter” for more information.

The MQTT outbound channel adapter now supports asynchronous sends, avoiding blocking until delivery is confirmed. See Section 23.3, “Outbound Channel Adapter” for more information.

It is now possible to programmatically subscribe to and unsubscribe from topics at runtime. See Section 23.2, “Inbound (message-driven) Channel Adapter” for more information.

===== FTP/SFTP Adapter Changes

The FTP and SFTP outbound channel adapters now support appending to remote files, as well as taking specific actions when a remote file already exists. The remote file templates now also support this as well as rmdir() and exists(). In addition, the remote file templates provide access to the underlying client object enabling access to low-level APIs.

See Chapter 15, FTP/FTPS Adapters and Chapter 27, SFTP Adapters for more information.

===== Splitter and Iterator

Splitter components now support an Iterator as the result object for producing output messages. See Section 6.3, “Splitter” for more information.

===== Aggregator

Aggregator s now support a new attribute expire-groups-upon-timeout. See Section 6.4.4, “Configuring an Aggregator” for more information.

===== Content Enricher Improvements

An null-result-expression attribute has been added, which is evaluated and returned if <enricher> returns null. It can be added in <header> and <property>. See Section 7.2, “Content Enricher” for more information.

An error-channel attribute has been added, which is used to handle an error flow if Exception occurs downstream of the request-channel. This enable you to return an alternative object to use for enrichment. See Section 7.2, “Content Enricher” for more information.

===== Header Channel Registry

The <header-enricher/>'s <header-channels-to-string/> element can now override the header channel registry’s default time for retaining channel mappings. See the section called “Header Channel Registry” for more information.

===== Orderly Shutdown

Improvements have been made to the orderly shutdown algorithm. See Section 9.7, “Orderly Shutdown” for more information.

===== Management for RecipientListRouter

The RecipientListRouter provides now several management operations to configure recipients at runtime. With that the <recipient-list-router> can now be configured without any <recipient> from the start. See the section called “RecipientListRouterManagement” for more information.

===== AbstractHeaderMapper: NON_STANDARD_HEADERS token

The AbstractHeaderMapper implementations now provides the additional NON_STANDARD_HEADERS token to map any user-defined headers, which aren’t mapped by default. See Section 11.12, “AMQP Message Headers” for more information.

===== AMQP Channels: template-channel-transacted

The new template-channel-transacted attribute has been introduced for AMQP MessageChannel s. See Section 11.11, “AMQP Backed Message Channels” for more information.

===== Syslog Adapter

The default syslog message converter now has an option to retain the original message in the payload, while still setting the headers. See Section 30.2, “Syslog <inbound-channel-adapter>” for more information.

===== Async Gateway

In addition to the Promise return type mentioned above, gateway methods may now return a ListenableFuture, introduced in Spring Framework 4.0. You can also disable the async processing in the gateway, allowing a downstream flow to directly return a Future. See Section 8.4.9, “Asynchronous Gateway”.

===== Aggregator Advice Chain

Aggregator s and Resequencer s now support an <expire-advice-chain/> and <expire-transactional/> sub-elements to advise the forceComplete operation. See Section 6.4.4, “Configuring an Aggregator” for more information.

===== Outbound Channel Adapter and Scripts

The <int:outbound-channel-adapter/> now supports the <script/> sub-element. The underlying script must have a void return type or return null. See Section 8.8, “Groovy support” and Section 8.7, “Scripting support”.

===== Resequencer Changes

When a message group in a resequencer is timed out (using group-timeout or a MessageGroupStoreReaper), late arriving messages will now be discarded immediately by default. See Section 6.5, “Resequencer”.

===== Optional POJO method parameter

Now Spring Integration consistently handles the Java 8’s Optional type. See Section 8.5.2, “Configuring Service Activator”.

===== QueueChannel: backed Queue type

The QueueChannel backed Queue type has been changed from BlockingQueue to the more generic Queue. It allows the use of any external Queue implementation, for example Reactor’s PersistentQueue. See the section called “QueueChannel Configuration”.

===== ChannelInterceptor Changes

The ChannelInterceptor now supports additional afterSendCompletion() and afterReceiveCompletion() methods. See Section 4.1.3, “Channel Interceptors”.

===== IMAP PEEK

Since version 4.1.1 there is a change of behavior if you explicitly set the javamail property mail.[protocol].peek to false (where [protocol] is imap or imaps). See Important: IMAP PEEK.

=== Changes between 3.0 and 4.0

Please be sure to also see the Migration Guide for important changes that might affect your applications. Migration guides for all versions back to 2.1 can be found on the Wiki.

==== New Components

===== MQTT Channel Adapters

The MQTT channel adapters (previously available in the Spring Integration Extensions repository) are now available as part of the normal Spring Integration distribution. See Chapter 23, MQTT Support

===== @EnableIntegration

The @EnableIntegration annotation has been added, to permit declaration of standard Spring Integration beans when using @Configuration classes. See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.

===== @IntegrationComponentScan

The @IntegrationComponentScan annotation has been added, to permit classpath scanning for Spring Integration specific components. See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.

===== @EnableMessageHistory

Message history can now be enabled with the @EnableMessageHistory annotation in a @Configuration class; in addition the message history settings can be modified by a JMX MBean. In addition auto-created MessageHandler s for annotated endpoints (e.g. @ServiceActivator, @Splitter etc.) now are also trackable by MessageHistory. For more information, see Section 9.3, “Message History”.

===== @MessagingGateway

Messaging gateway interfaces can now be configured with the @MessagingGateway annotation. It is an analogue of the <int:gateway/> xml element. For more information, see Section 8.4.6, “@MessagingGateway Annotation”.

===== Spring Boot @EnableAutoConfiguration

As well as the @EnableIntegration annotation mentioned above, a a hook has been introduced to allow the Spring Integration infrastructure beans to be configured using Spring Boot’s @EnableAutoConfiguration. For more information seehttp://docs.spring.io/spring-boot/docs/current/reference/html/using-boot-auto-configuration.html[Spring Boot - AutoConfigure].

===== @GlobalChannelInterceptor

As well as the @EnableIntegration annotation mentioned above, the @GlobalChannelInterceptor annotation has bean introduced. For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.

===== @IntegrationConverter

The @IntegrationConverter annotation has bean introduced, as an analogue of <int:converter/> component. For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.

===== @EnablePublisher

The @EnablePublisher annotation has been added, to allow the specification of a default-publisher-channel for @Publisher annotations. See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.

===== Redis Channel Message Stores

A new Redis MessageGroupStore, that is optimized for use when backing a QueueChannel for persistence, is now provided. For more information, see Section 24.4.1, “Redis Channel Message Stores”.

A new Redis ChannelPriorityMessageStore is now provided. This can be used to retrieve messages by priority. For more information, see Section 24.4.1, “Redis Channel Message Stores”.

===== MongodDB Channel Message Store

MongoDB support now provides the MongoDbChannelMessageStore - a channel specific MessageStore implementation. With priorityEnabled = true, it can be used in <int:priority-queue> s to achieve priority order polling of persisted messages. For more information see Section 22.3.1, “MongoDB Channel Message Store”.

===== @EnableIntegrationMBeanExport

The IntegrationMBeanExporter can now be enabled with the @EnableIntegrationMBeanExport annotation in a @Configuration class. For more information, see Section 9.2.7, “MBean Exporter”.

===== ChannelSecurityInterceptorFactoryBean

Configuration of Spring Security for message channels using @Configuration classes is now supported by using a ChannelSecurityInterceptorFactoryBean. For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.

===== Redis Command Gateway

The Redis support now provides the <outbound-gateway> component to perform generic Redis commands using the RedisConnection#execute method. For more information, see Section 24.8, “Redis Outbound Command Gateway”.

===== RedisLockRegistry and GemfireLockRegistry

The RedisLockRegistry and GemfireLockRegistry are now available supporting global locks visible to multiple application instances/servers. These can be used with aggregating message handlers across multiple application instances such that group release will occur on only one instance. For more information, see Section 24.11, “Redis Lock Registry”, Section 16.6, “Gemfire Lock Registry” and Section 6.4, “Aggregator”.

===== @Poller

Annotation-based messaging configuration can now have a poller attribute. This means that methods annotated with (@ServiceActivator, @Aggregator etc.) can now use an inputChannel that is a reference to a PollableChannel. For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.

===== @InboundChannelAdapter and SmartLifecycle for Annotated Endpoints

The @InboundChannelAdapter method annotation is now available. It is an analogue of the <int:inbound-channel-adapter> XML component. In addition, all Messaging Annotations now provide SmartLifecycle options. For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.

===== Twitter Search Outbound Gateway

A new twitter endpoint <int-twitter-search-outbound-gateway/> has been added. Unlike the search inbound adapter which polls using the same search query each time, the outbound gateway allows on-demand customized queries. For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.

===== Gemfire Metadata Store

The GemfireMetadataStore is provided, allowing it to be used, for example, in a AbstractPersistentAcceptOnceFileListFilter implementation in a multiple application instance/server environment. For more information, see Section 9.5, “Metadata Store”, Section 14.2, “Reading Files”, Section 15.4, “FTP Inbound Channel Adapter” and Section 27.7, “SFTP Inbound Channel Adapter”.

===== @BridgeFrom and @BridgeTo Annotations

Annotation and Java configuration has introduced @BridgeFrom and @BridgeTo @Bean method annotations to mark MessageChannel beans in @Configuration classes. For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.

===== Meta Messaging Annotations

Messaging Annotations (@ServiceActivator, @Router, @MessagingGateway etc.) can now be configured as meta-annotations for user-defined Messaging Annotations. In addition the user-defined annotations can have the same attributes (inputChannel, @Poller, autoStartup etc.). For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.

==== General Changes

===== Requires Spring Framework 4.0

Core messaging abstractions (Message, MessageChannel etc) have moved to the Spring Framework spring-messaging module. Users who reference these classes directly in their code will need to make changes as described in the first section of the Migration Guide.

===== Header Type for XPath Header Enricher

The header-type attribute has been introduced for the header sub-element of the <int-xml:xpath-header-enricher>. This attribute provides the target type for the header value to which the result of the XPath expression evaluation will be converted. For more information see Section 31.3.2, “TCP Failover Client Connection Factory”.

===== Object To Json Transformer: Node Result

The result-type attribute has been introduced for the <int:object-to-json-transformer>. This attribute provides the target type for the result of object mapping to JSON. It supports STRING (default) and NODE. For more information see the section called “JSON Transformers”.

===== JMS Header Mapping

The DefaultJmsHeaderMapper now maps an incoming JMSPriority header to the Spring Integration priority header. Previously priority was only considered for outbound messages. For more information see Section 20.6, “Mapping Message Headers to/from JMS Message”.

===== JMS Outbound Channel Adapter

The JMS outbound channel adapter now supports the session-transacted attribute (default false). Previously, you had to inject a customized JmsTemplate to use transactions. See Section 20.3, “Outbound Channel Adapter”.

===== JMS Inbound Channel Adapter

The JMS inbound channel adapter now supports the session-transacted attribute (default false). Previously, you had to inject a customized JmsTemplate to use transactions (the adapter allowed transacted in the acknowledgeMode which was incorrect, and didn’t work; this value is no longer allowed). SeeSection 20.1, “Inbound Channel Adapter”.

===== Datatype Channels

You can now specify a MessageConverter to be used when converting (if necessary) payloads to one of the accepted datatype s in a Datatype channel. For more information see the section called “Datatype Channel Configuration”.

===== Simpler Retry Advice Configuration

Simplified namespace support has been added to configure a RequestHandlerRetryAdvice. For more information see the section called “Configuring the Retry Advice”.

===== Correlation Endpoint: Time-based Release Strategy

The mutually exclusive group-timeout and group-timeout-expression attributes have been added to the <int:aggregator> and <int:resequencer>. These attributes allow forced completion of a partial MessageGroup, if the ReleaseStrategy does not release a group and no further messages arrive within the time specified. For more information see Section 6.4.4, “Configuring an Aggregator”.

===== Redis Metadata Store

The RedisMetadataStore now implements ConcurrentMetadataStore, allowing it to be used, for example, in a AbstractPersistentAcceptOnceFileListFilter implementation in a multiple application instance/server environment. For more information, see Section 24.5, “Redis Metadata Store”, Section 14.2, “Reading Files”, Section 15.4, “FTP Inbound Channel Adapter” and Section 27.7, “SFTP Inbound Channel Adapter”.

===== JdbcChannelMessageStore and PriorityChannel

The JdbcChannelMessageStore now implements PriorityCapableChannelMessageStore, allowing it to be used as a message-store reference for priority-queue s. For more information, see Section 18.4.2, “Backing Message Channels”.

===== AMQP Endpoints Delivery Mode

Spring AMQP, by default, creates persistent messages on the broker. This behavior can be overridden by setting the amqp_deliveryMode header and/or customizing the mappers. A convenient default-delivery-mode attribute has now been added to the adapters to provide easier configuration of this important setting. For more information, see Section 11.5, “Outbound Channel Adapter” and Section 11.6, “Outbound Gateway”.

===== FTP Timeouts

The DefaultFtpSessionFactory now exposes the connectTimeout, defaultTimeout and dataTimeout properties, avoiding the need to subclass the factory just to set these common properties. The postProcess* methods are still available for more advanced configuration. See Section 15.2, “FTP Session Factory” for more information.

===== Twitter: StatusUpdatingMessageHandler

The StatusUpdatingMessageHandler (<int-twitter:outbound-channel-adapter>) now supports the tweet-data-expression attribute to build a org.springframework.social.twitter.api.TweetData object for updating the timeline status allowing, for example, attaching an image. See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.

===== JPA Retrieving Gateway: id-expression

The id-expression attribute has been introduced for <int-jpa:retrieving-outbound-gateway> to perform EntityManager.find(Class entityClass, Object primaryKey). See Section 19.6.3, “Retrieving Outbound Gateway” for more information.

===== TCP Deserialization Events

When one of the standard deserializers encounters a problem decoding the input stream to a message, it will now emit a TcpDeserializationExceptionEvent, allowing applications to examine the data at the point the exception occurred. See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.

===== Messaging Annotations on @Bean Definitions

Messaging Annotations (@ServiceActivator, @Router, @InboundChannelAdapter etc.) can now be configured on @Bean definitions in @Configuration classes. For more information, see Section 31.3.2, “TCP Failover Client Connection Factory”.

=== Changes Between 2.2 and 3.0

==== New Components

===== HTTP Request Mapping

The HTTP module now provides powerful Request Mapping support for Inbound Endpoints. Class UriPathHandlerMapping was replaced by IntegrationRequestMappingHandlerMapping, which is registered under the bean name integrationRequestMappingHandlerMapping in the application context. Upon parsing of the HTTP Inbound Endpoint, a new IntegrationRequestMappingHandlerMapping bean is either registered or an existing bean is being reused. To achieve flexible Request Mapping configuration, Spring Integration provides the <request-mapping/> sub-element for the <http:inbound-channel-adapter/> and the <http:inbound-gateway/>. Both HTTP Inbound Endpoints are now fully based on the Request Mapping infrastructure that was introduced with Spring MVC 3.1. For example, multiple paths are supported on a single inbound endpoint. For more information see Section 17.4, “HTTP Namespace Support”.

===== Spring Expression Language (SpEL) Configuration

A new IntegrationEvaluationContextFactoryBean is provided to allow configuration of custom PropertyAccessor s and functions for use in SpEL expressions throughout the framework. For more information see Section 31.3.2, “TCP Failover Client Connection Factory”.

===== SpEL Functions Support

To customize the SpEL EvaluationContext with static Method functions, the new <spel-function/> component is introduced. Two built-in functions are also provided (#jsonPath and #xpath). For more information see Section 31.3.2, “TCP Failover Client Connection Factory”.

===== SpEL PropertyAccessors Support

To customize the SpEL EvaluationContext with PropertyAccessor implementations the new <spel-property-accessors/> component is introduced. For more information see Section 31.3.2, “TCP Failover Client Connection Factory”.

===== Redis: New Components

A new Redis-based MetadataStore implementation has been added. The RedisMetadataStore can be used to maintain state of a MetadataStore across application restarts. This new MetadataStore implementation can be used with adapters such as:

  • Twitter Inbound Adapters
  • Feed Inbound Channel Adapter

New queue-based components have been added. The <int-redis:queue-inbound-channel-adapter/> and the <int-redis:queue-outbound-channel-adapter/> components are provided to perform right pop and left push operations on a Redis List, respectively.

For more information see Chapter 24, Redis Support.

===== Header Channel Registry

It is now possible to instruct the framework to store reply and error channels in a registry for later resolution. This is useful for cases where the replyChannel or errorChannel might be lost; for example when serializing a message. See Section 7.2.2, “Header Enricher” for more information.

===== MongoDB support: New ConfigurableMongoDbMessageStore

In addition to the existing eMongoDbMessageStore, a new ConfigurableMongoDbMessageStore has been introduced. This provides a more robust and flexible implementation of MessageStore for MongoDB. It does not have backward compatibility, with the existing store, but it is recommended to use it for new applications. Existing applications can use it, but messages in the old store will not be available. See Chapter 22, MongoDb Support for more information.

===== Syslog Support

Building on the 2.2 SyslogToMapTransformer Spring Integration 3.0 now introduces UDP and TCP inbound channel adapters especially tailored for receiving SYSLOG messages. For more information, seeChapter 30, Syslog Support.

===== Tail Support

File 'tail’ing inbound channel adapters are now provided to generate messages when lines are added to the end of text files; see Section 14.2.6, “'Tail’ing Files”.

===== JMX Support

  • A new <int-jmx:tree-polling-channel-adapter/> is provided; this adapter queries the JMX MBean tree and sends a message with a payload that is the graph of objects that matches the query. By default the MBeans are mapped to primitives and simple Objects like Map, List and arrays - permitting simple transformation, for example, to JSON.
  • The IntegrationMBeanExporter now allows the configuration of a custom ObjectNamingStrategy using the naming-strategy attribute.

For more information, see Section 9.2, “JMX Support”.

===== TCP/IP Connection Events and Connection Management

TcpConnection s now emit ApplicationEvent s (specifically TcpConnectionEvent s) when connections are opened, closed, or an exception occurs. This allows applications to be informed of changes to TCP connections using the normal Spring ApplicationListener mechanism.

AbstractTcpConnection has been renamed TcpConnectionSupport; custom connections that are subclasses of this class, can use its methods to publish events. Similarly, AbstractTcpConnectionInterceptor has been renamed to TcpConnectionInterceptorSupport.

In addition, a new <int-ip:tcp-connection-event-inbound-channel-adapter/> is provided; by default, this adapter sends all TcpConnectionEvent s to a Channel.

Further, the TCP Connection Factories, now provide a new method getOpenConnectionIds(), which returns a list of identifiers for all open connections; this allows applications, for example, to broadcast to all open connections.

Finally, the connection factories also provide a new method closeConnection(String connectionId) which allows applications to explicitly close a connection using its ID.

For more information see Section 31.3.2, “TCP Failover Client Connection Factory”.

===== Inbound Channel Adapter Script Support

The <int:inbound-channel-adapter/> now supports <expression/> and <script/> sub-elements to create a MessageSource; see Section 4.3.3, “Channel Adapter Expressions and Scripts”.

===== Content Enricher: Headers Enrichment Support

The Content Enricher now provides configuration for <header/> sub-elements, to enrich the outbound Message with headers based on the reply Message from the underlying message flow. For more information see Section 7.2.3, “Payload Enricher”.

==== General Changes

===== Message ID Generation

Previously, message ids were generated using the JDK UUID.randomUUID() method. With this release, the default mechanism has been changed to use a more efficient algorithm which is significantly faster. In addition, the ability to change the strategy used to generate message ids has been added. For more information see the section called “Message ID Generation”.

===== <gateway> Changes

  • It is now possible to set common headers across all gateway methods, and more options are provided for adding, to the message, information about which method was invoked.
  • It is now possible to entirely customize the way that gateway method calls are mapped to messages.
  • The GatewayMethodMetadata is now public class and it makes possible flexibly to configure the GatewayProxyFactoryBean programmatically from Java code.

For more information see Section 8.4, “Messaging Gateways”.

===== HTTP Endpoint Changes

  • Outbound Endpoint encode-uri - <http:outbound-gateway/> and <http:outbound-channel-adapter/> now provide an encode-uri attribute to allow disabling the encoding of the URI object before sending the request.
  • Inbound Endpoint merge-with-default-converters - <http:inbound-gateway/> and <http:inbound-channel-adapter/> now have a merge-with-default-converters attribute to include the list of default HttpMessageConverter s after the custom message converters.
  • If-(Un)Modified-Since HTTP Headers - previously, If-Modified-Since and If-Unmodified-Since HTTP headers were incorrectly processed within from/to HTTP headers mapping in the DefaultHttpHeaderMapper. Now, in addition correcting that issue, DefaultHttpHeaderMapper provides date parsing from formatted strings for any HTTP headers that accept date-time values.
  • Inbound Endpoint Expression Variables - In addition to the existing #requestParams and #pathVariables, the <http:inbound-gateway/> and <http:inbound-channel-adapter/> now support additional useful variables: #matrixVariables, #requestAttributes, #requestHeaders and #cookies. These variables are available in both payload and header expressions.
  • Outbound Endpoint uri-variables-expression - HTTP Outbound Endpoints now support the uri-variables-expression attribute to specify an Expression to evaluate a Map for all URI variable placeholders within URL template. This allows selection of a different map of expressions based on the outgoing message.

For more information see Chapter 17, HTTP Support.

===== Jackson Support (JSON)

  • A new abstraction for JSON conversion has been introduced. Implementations for Jackson 1.x and Jackson 2 are currently provided, with the version being determined by presence on the classpath. Previously, only Jackson 1.x was supported.
  • The ObjectToJsonTransformer and JsonToObjectTransformer now emit/consume headers containing type information.

For more information, see JSON Transformers in Section 7.1, “Transformer”.

===== Chain Elements id Attribute

Previously, the id attribute for elements within a <chain> was ignored and, in some cases, disallowed. Now, the id attribute is allowed for all elements within a <chain>. The bean names of chain elements is a combination of the surrounding chain’s id and the id of the element itself. For example: fooChain$child.fooTransformer.handler. For more information see Section 6.6, “Message Handler Chain”.

===== Aggregator empty-group-min-timeout property

The AbstractCorrelatingMessageHandler provides a new property empty-group-min-timeout to allow empty group expiry to run on a longer schedule than expiring partial groups. Empty groups will not be removed from the MessageStore until they have not been modified for at least this number of milliseconds. For more information see Section 6.4.4, “Configuring an Aggregator”.

===== Persistent File List Filters (file, (S)FTP)

New FileListFilter s that use a persistent MetadataStore are now available. These can be used to prevent duplicate files after a system restart. SeeSection 14.2, “Reading Files”, Section 15.4, “FTP Inbound Channel Adapter”, and Section 27.7, “SFTP Inbound Channel Adapter” for more information.

===== Scripting Support: Variables Changes

A new variables attribute has been introduced for scripting components. In addition, variable bindings are now allowed for inline scripts. See Section 8.8, “Groovy support” and Section 8.7, “Scripting support” for more information.

===== Direct Channel Load Balancing configuration

Previously, when configuring LoadBalancingStrategy on the channel’s dispatcher sub-element, the only available option was to use a pre-defined enumeration of values which did not allow one to set a custom implementation of the LoadBalancingStrategy. You can now use load-balancer-ref to provide a reference to a custom implementation of the LoadBalancingStrategy. For more information see the section called “DirectChannel”.

===== PublishSubscribeChannel Behavior

Previously, sending to a <publish-subscribe-channel/> that had no subscribers would return a false result. If used in conjunction with a MessagingTemplate, this would result in an exception being thrown. Now, the PublishSubscribeChannel has a property minSubscribers (default 0). If the message is sent to at least the minimum number of subscribers, the send is deemed to be successful (even if zero). If an application is expecting to get an exception under these conditions, set the minimum subscribers to at least 1.

===== FTP, SFTP and FTPS Changes

The FTP, SFTP and FTPS endpoints no longer cache sessions by default

The deprecated cached-sessions attribute has been removed from all endpoints. Previously, the embedded caching mechanism controlled by this attribute’s value didn’t provide a way to limit the size of the cache, which could grow indefinitely. The CachingConnectionFactory was introduced in release 2.1 and it became the preferred (and is now the only) way to cache sessions.

The CachingConnectionFactory now provides a new method resetCache(). This immediately closes idle sessions and causes in-use sessions to be closed as and when they are returned to the cache.

The DefaultSftpSessionFactory (in conjunction with a CachingSessionFactory) now supports multiplexing channels over a single SSH connection (SFTP Only).

FTP, SFTP and FTPS Inbound Adapters

Previously, there was no way to override the default filter used to process files retrieved from a remote server. The filter attribute determines which files are retrieved but the FileReadingMessageSource uses an AcceptOnceFileListFilter. This means that if a new copy of a file is retrieved, with the same name as a previously copied file, no message was sent from the adapter.

With this release, a new attribute local-filter allows you to override the default filter, for example with an AcceptAllFileListFilter, or some other custom filter.

For users that wish the behavior of the AcceptOnceFileListFilter to be maintained across JVM executions, a custom filter that retains state, perhaps on the file system, can now be configured.

Inbound Channel Adapters now support the preserve-timestamp attribute, which sets the local file modified timestamp to the timestamp from the server (default false).

FTP, SFTP and FTPS Gateways

  • The gateways now support the mv command, enabling the renaming of remote files.
  • The gateways now support recursive ls and mget commands, enabling the retrieval of a remote file tree.
  • The gateways now support put and mput commands, enabling sending file(s) to the remote server.
  • The local-filename-generator-expression attribute is now supported, enabling the naming of local files during retrieval. By default, the same name as the remote file is used.
  • The local-directory-expression attribute is now supported, enabling the naming of local directories during retrieval based on the remote directory.

Remote File Template

A new higher-level abstraction (RemoteFileTemplate) is provided over the Session implementations used by the FTP and SFTP modules. While it is used internally by endpoints, this abstraction can also be used programmatically and, like all Spring *Template implementations, reliably closes the underlying session while allowing low level access to the session when needed.

For more information, see Chapter 15, FTP/FTPS Adapters and Chapter 27, SFTP Adapters.

===== requires-reply Attribute for Outbound Gateways

All Outbound Gateways (e.g. <jdbc:outbound-gateway/> or <jms:outbound-gateway/>) are designed for request-reply scenarios. A response is expected from the external service and will be published to the reply-channel, or the replyChannel message header. However, there are some cases where the external system might not always return a result, e.g. a <jdbc:outbound-gateway/>, when a SELECT ends with an empty ResultSet or, say, a Web Service is One-Way. An option is therefore needed to configure whether or not a reply is required. For this purpose, the requires-reply attribute has been introduced for Outbound Gateway components. In most cases, the default value for requires-reply is true and, if there is not any result, a ReplyRequiredException will be thrown. Changing the value to false means that, if an external service doesn’t return anything, the message-flow will end at that point, similar to an Outbound Channel Adapter.

[Note]Note

The WebService outbound gateway has an additional attribute ignore-empty-responses; this is used to treat an empty String response as if no response was received. It is true by default but can be set to false to allow the application to receive an empty String in the reply message payload. When the attribute is true an empty string is treated as no response for the purposes of the requires-reply attribute. requires-reply is false by default for the WebService outbound gateway.

Note, the requiresReply property was previously present in the AbstractReplyProducingMessageHandler but set to false, and there wasn’t any way to configure it on Outbound Gateways using the XML namespace.

[Important]Important

Previously, a gateway receiving no reply would silently end the flow (with a DEBUG log message); with this change an exception will now be thrown by default by most gateways. To revert to the previous behavior, set requires-reply to false.

===== AMQP Outbound Gateway Header Mapping

Previously, the <int-amqp:outbound-gateway/> mapped headers before invoking the message converter, and the converter could overwrite headers such as content-type. The outbound adapter maps the headers after the conversion, which means headers like content-type from the outbound Message (if present) are used.

Starting with this release, the gateway now maps the headers after the message conversion, consistent with the adapter. If your application relies on the previous behavior (where the converter’s headers overrode the mapped headers), you either need to filter those headers (before the message reaches the gateway) or set them appropriately. The headers affected by the SimpleMessageConverter are content-type and content-encoding. Custom message converters may set other headers.

===== Stored Procedure Components Improvements

For more complex database-specific types, not supported by the standard CallableStatement.getObject method, 2 new additional attributes were introduced to the <sql-parameter-definition/> element with OUT-direction:

type-name

return-type

The row-mapper attribute of the Stored Procedure Inbound Channel Adapter <returning-resultset/> sub-element now supports a reference to a RowMapper bean definition. Previously, it contained just a class name (which is still supported).

For more information see Section 18.5, “Stored Procedures”.

===== Web Service Outbound URI Configuration

Web Service Outbound Gateway uri attribute now supports <uri-variable/> substitution for all URI-schemes supported by Spring Web Services. For more information see Section 31.3.2, “TCP Failover Client Connection Factory”.

===== Redis Adapter Changes

The Redis Inbound Channel Adapter can now use a null value for serializer property, with the raw data being the message payload.

The Redis Outbound Channel Adapter now has the topic-expression property to determine the Redis topic against the Message at runtime.

The Redis Inbound Channel Adapter, in addition to the existing topics attribute, now has the topic-patterns attribute.

For more information, see Chapter 24, Redis Support.

===== Advising Filters

Previously, when a <filter/> had a <request-handler-advice-chain/>, the discard action was all performed within the scope of the advice chain (including any downstream flow on the discard-channel). The filter element now has an attribute discard-within-advice (default true), to allow the discard action to be performed after the advice chain completes. See Section 8.9.6, “Advising Filters”.

===== Advising Endpoints using Annotations

Request Handler Advice Chains can now be configured using annotations. See Section 8.9.7, “Advising Endpoints Using Annotations”.

===== ObjectToStringTransformer Improvements

This transformer now correctly transforms byte[] and char[] payloads to String. For more information see Section 7.1, “Transformer”.

===== JPA Support Changes

Payloads to persist or merge can now be of type https://docs.oracle.com/javase/7/docs/api/java/lang/Iterable.html[java.lang.Iterable].

In that case, each object returned by the Iterable is treated as an entity and persisted or merged using the underlying EntityManager. NULL values returned by the iterator are ignored.

The JPA adapters now have additional attributes to optionally flush and clear entities from the associated persistence context after performing persistence operations.

Retrieving gateways had no mechanism to specify the first record to be retrieved which is a common use case. The retrieving gateways now support specifying this parameter using a first-result and first-result-expression attributes to the gateway definition. Section 19.6.3, “Retrieving Outbound Gateway”.

The JPA retrieving gateway and inbound adapter now have an attribute to specify the maximum number of results in a result set as an expression. In addition, the max-results attribute has been introduced to replace max-number-of-results, which has been deprecated. max-results and max-results-expression are used to provide the maximum number of results, or an expression to compute the maximum number of results, respectively, in the result set.

For more information see Chapter 19, JPA Support.

===== Delayer: delay expression

Previously, the <delayer> provided a delay-header-name attribute to determine the delay value at runtime. In complex cases it was necessary to precede the <delayer> with a <header-enricher>. Spring Integration 3.0 introduced the expression attribute and expression sub-element for dynamic delay determination. The delay-header-name attribute is now deprecated because the header evaluation can be specified in the expression. In addition, the ignore-expression-failures was introduced to control the behavior when an expression evaluation fails. For more information see Section 8.6, “Delayer”.

===== JDBC Message Store Improvements

Spring Integration 3.0 adds a new set of DDL scripts for MySQL version 5.6.4 and higher. Now MySQL supports fractional seconds and is thus improving the FIFO ordering when polling from a MySQL-based Message Store. For more information, please see Section 18.4.1, “The Generic JDBC Message Store”.

===== IMAP Idle Connection Exceptions

Previously, if an IMAP idle connection failed, it was logged but there was no mechanism to inform an application. Such exceptions now generate ApplicationEvent s. Applications can obtain these events using an <int-event:inbound-channel-adapter> or any ApplicationListener configured to receive an ImapIdleExceptionEvent or one of its super classes.

===== Message Headers and TCP

The TCP connection factories now enable the configuration of a flexible mechanism to transfer selected headers (as well as the payload) over TCP. A new TcpMessageMapper enables the selection of the headers, and an appropriate (de)serializer needs to be configured to write the resulting Map to the TCP stream. A MapJsonSerializer is provided as a convenient mechanism to transfer headers and payload over TCP. For more information see Section 31.3.2, “TCP Failover Client Connection Factory”.

===== JMS Message Driven Channel Adapter

Previously, when configuring a <message-driven-channel-adapter/>, if you wished to use a specific TaskExecutor, it was necessary to declare a container bean and provide it to the adapter using the container attribute. The task-executor is now provided, allowing it to be set directly on the adapter. This is in addition to several other container attributes that were already available.

===== RMI Inbound Gateway

The RMI Inbound Gateway now supports an error-channel attribute. See Section 26.3, “Inbound RMI”.

===== XsltPayloadTransformer

You can now specify the transformer factory class name using the transformer-factory-class attribute. See Section 31.3.2, “TCP Failover Client Connection Factory”

=== Changes between 2.1 and 2.2

==== New Components

===== RedisStore Inbound and Outbound Channel Adapters

Spring Integration now has RedisStore Inbound and Outbound Channel Adapters allowing you to write and read Message payloads to/from Redis collection(s). For more information please see Section 24.7, “RedisStore Outbound Channel Adapter” and Section 24.6, “RedisStore Inbound Channel Adapter”.

===== MongoDB Inbound and Outbound Channel Adapters

Spring Integration now has MongoDB Inbound and Outbound Channel Adapters allowing you to write and read Message payloads to/from a MongoDB document store. For more information please see Section 22.5, “MongoDB Outbound Channel Adapter” and Section 22.4, “MongoDB Inbound Channel Adapter”.

===== JPA Endpoints

Spring Integration now includes components for the Java Persistence API (JPA) for retrieving and persisting JPA entity objects. The JPA Adapter includes the following components:

For more information please see Chapter 19, JPA Support

==== General Changes

===== Spring 3.1 Used by Default

Spring Integration now uses Spring 3.1.

===== Adding Behavior to Endpoints

The ability to add an <advice-chain/> to a poller has been available for some time. However, the behavior added by this affects the entire integration flow. It did not address the ability to add, say, retry, to an individual endpoint. The 2.2. release introduces the <request-handler-advice-chain/> to many endpoints.

In addition, 3 standard Advice classes have been provided for this purpose:

  • MessageHandlerRetryAdvice
  • MessageHandlerCircuitBreakerAdvice
  • ExpressionEvaluatingMessageHandlerAdvice

For more information, see Section 8.9, “Adding Behavior to Endpoints”.

===== Transaction Synchronization and Pseudo Transactions

Pollers can now participate in Spring’s Transaction Synchronization feature. This allows for synchronizing such operations as renaming files by an inbound channel adapter depending on whether the transaction commits, or rolls back.

In addition, these features can be enabled when there is not a real transaction present, by means of a PseudoTransactionManager.

For more information see Section 31.3.2, “TCP Failover Client Connection Factory”.

===== File Adapter - Improved File Overwrite/Append Handling

When using the File Oubound Channel Adapter or the File Outbound Gateway, a new mode property was added. Prior to Spring Integration 2.2, target files were replaced when they existed. Now you can specify the following options:

  • REPLACE (Default)
  • APPEND
  • FAIL
  • IGNORE

For more information please see Section 14.3.3, “Dealing with Existing Destination Files”.

===== Reply-Timeout added to more Outbound Gateways

The XML Namespace support adds the reply-timeout attribute to the following Outbound Gateways:

  • Amqp Outbound Gateway
  • File Outbound Gateway
  • Ftp Outbound Gateway
  • Sftp Outbound Gateway
  • Ws Outbound Gateway

===== Spring-AMQP 1.1

Spring Integration now uses Spring AMQP 1.1. This enables several features to be used within a Spring Integration application, including…​

  • A fixed reply queue for the outbound gateway
  • HA (mirrored) queues
  • Publisher Confirms
  • Returned Messages
  • Support for Dead Letter Exchanges/Dead Letter Queues

===== JDBC Support - Stored Procedures Components

SpEL Support

When using the Stored Procedure components of the Spring Integration JDBC Adapter, you can now provide Stored Procedure Names or Stored Function Names using Spring Expression Language (SpEL).

This allows you to specify the Stored Procedures to be invoked at runtime. For example, you can provide Stored Procedure names that you would like to execute via Message Headers. For more information please see Section 18.5, “Stored Procedures”.

JMX Support

The Stored Procedure components now provide basic JMX support, exposing some of their properties as MBeans:

  • Stored Procedure Name
  • Stored Procedure Name Expression
  • JdbcCallOperations Cache Statistics

===== JDBC Support - Outbound Gateway

When using the JDBC Outbound Gateway, the update query is no longer mandatory. You can now provide solely a select query using the request message as a source of parameters.

===== JDBC Support - Channel-specific Message Store Implementation

A new Message Channel-specific Message Store Implementation has been added, providing a more scalable solution using database-specific SQL queries. For more information please see: Section 18.4.2, “Backing Message Channels”.

===== Orderly Shutdown

A method stopActiveComponents() has been added to the IntegrationMBeanExporter. This allows a Spring Integration application to be shut down in an orderly manner, disallowing new inbound messages to certain adapters and waiting for some time to allow in-flight messages to complete.

===== JMS Oubound Gateway Improvements

The JMS Outbound Gateway can now be configured to use a`MessageListener` container to receive replies. This can improve performance of the gateway.

===== object-to-json-transformer

The ObjectToJsonTransformer now sets the content-type header to application/json by default. For more information see Section 7.1, “Transformer”.

===== HTTP Support

Java serialization over HTTP is no longer enabled by default. Previously, when setting a expected-response-type to a Serializable object, the Accept header was not properly set up. The SerializingHttpMessageConverter has now been updated to set the Accept header to application/x-java-serialized-object. However, because this could cause incompatibility with existing applications, it was decided to no longer automatically add this converter to the HTTP endpoints.

If you wish to use Java serialization, you will need to add the SerializingHttpMessageConverter to the appropriate endpoints, using the message-converters attribute, when using XML configuration, or using the setMessageConverters() method.

Alternatively, you may wish to consider using JSON instead which is enabled by simply having Jackson on the classpath.

=== Changes between 2.0 and 2.1

==== New Components

===== JSR-223 Scripting Support

In Spring Integration 2.0, support for Groovy was added. With Spring Integration 2.1 we expanded support for additional languages substantially by implementing support for JSR-223 (Scripting for the Java™ Platform). Now you have the ability to use any scripting language that supports JSR-223 including:

  • Javascript
  • Ruby/JRuby
  • Python/Jython
  • Groovy

For further details please see Section 8.7, “Scripting support”.

===== GemFire Support

Spring Integration provides support for GemFire by providing inbound adapters for entry and continuous query events, an outbound adapter to write entries to the cache, and MessageStore and MessageGroupStore implementations. Spring integration leverages the Spring Gemfire project, providing a thin wrapper over its components.

For further details please see Chapter 16, GemFire Support.

===== AMQP Support

Spring Integration 2.1 adds several Channel Adapters for receiving and sending messages using thehttp://www.amqp.org/[Advanced Message Queuing Protocol] (AMQP). Furthermore, Spring Integration also provides a point-to-point Message Channel, as well as a publish/subscribe Message Channel that are backed by AMQP Exchanges and Queues.

For further details please see Chapter 11, AMQP Support.

===== MongoDB Support

As of version 2.1 Spring Integration provides support for MongoDB by providing a MongoDB-based MessageStore.

For further details please see Chapter 22, MongoDb Support.

===== Redis Support

As of version 2.1 Spring Integration supports Redis, an advanced key-value store, by providing a Redis-based MessageStore as well as Publish-Subscribe Messaging adapters.

For further details please see Chapter 24, Redis Support.

===== Support for Spring’s Resource abstraction

As of version 2.1, we’ve introduced a new Resource Inbound Channel Adapter that builds upon Spring’s Resource abstraction to support greater flexibility across a variety of actual types of underlying resources, such as a file, a URL, or a class path resource. Therefore, it’s similar to but more generic than the File Inbound Channel Adapter.

For further details please see Section 25.2, “Resource Inbound Channel Adapter”.

===== Stored Procedure Components

With Spring Integration 2.1, the JDBC Module also provides Stored Procedure support by adding several new components, including inbound/outbound channel adapters and an Outbound Gateway. The Stored Procedure support leverages Spring’shttp://static.springsource.org/spring/docs/3.0.x/javadoc-api/org/springframework/jdbc/core/simple/SimpleJdbcCall.html[SimpleJdbcCall] class and consequently supports stored procedures for:

  • Apache Derby
  • DB2
  • MySQL
  • Microsoft SQL Server
  • Oracle
  • PostgreSQL
  • Sybase

The Stored Procedure components also support Sql Functions for the following databases:

  • MySQL
  • Microsoft SQL Server
  • Oracle
  • PostgreSQL

For further details please see Section 18.5, “Stored Procedures”.

===== XPath and XML Validating Filter

Spring Integration 2.1 provides a new XPath-based Message Filter, that is part of the XML module. The XPath Filter allows you to filter messages using provided XPath Expressions. Furthermore, documentation was added for the XML Validating Filter.

For more details please see Section 31.3.2, “TCP Failover Client Connection Factory” and Section 31.3.2, “TCP Failover Client Connection Factory”.

===== Payload Enricher

Since Spring Integration 2.1, the Payload Enricher is provided. A Payload Enricher defines an endpoint that typically passes ahttp://static.springsource.org/spring-integration/api/org/springframework/integration/Message.html[Message] to the exposed request channel and then expects a reply message. The reply message then becomes the root object for evaluation of expressions to enrich the target payload.

For further details please see Section 7.2.3, “Payload Enricher”.

===== FTP and SFTP Outbound Gateways

Spring Integration 2.1 provides two new Outbound Gateways in order to interact with remote File Transfer Protocol (FTP) or Secure File Transfer Protocol (SFT) servers. These two gateways allow you to directly execute a limited set of remote commands.

For instance, you can use these Outbound Gateways to list, retrieve and delete remote files and have the Spring Integration message flow continue with the remote server’s response.

For further details please see Section 15.7, “FTP Outbound Gateway” and Section 27.10, “SFTP Outbound Gateway”.

===== FTP Session Caching

As of version 2.1, we have exposed more flexibility with regards to session management for remote file adapters (e.g., FTP, SFTP etc).

Specifically, the cache-sessions attribute, which is available via the XML namespace support, is now deprecated. Alternatively, we added the sessionCacheSize and sessionWaitTimeout attributes on the CachingSessionFactory.

For further details please see Section 15.8, “FTP Session Caching” and Section 27.5, “SFTP Session Caching”.

==== Framework Refactoring

===== Standardizing Router Configuration

Router parameters have been standardized across all router implementations with Spring Integration 2.1 providing a more consistent user experience.

With Spring Integration 2.1 the ignore-channel-name-resolution-failures attribute has been removed in favor of consolidating its behavior with the resolution-required attribute. Also, the resolution-required attribute now defaults to true.

Starting with Spring Integration 2.1, routers will no longer silently drop any messages, if no default output channel was defined. This means, that by default routers now require at least one resolved channel (if no default-output-channel was set) and by default will throw a MessageDeliveryException if no channel was determined (or an attempt to send was not successful).

If, however, you do desire to drop messages silently, simply set default-output-channel="nullChannel".

[Important]Important

With the standardization of Router parameters and the consolidation of the parameters described above, there is the possibility of breaking older Spring Integration based applications.

For further details please see Section 6.1, “Routers”

===== XML Schemas updated to 2.1

Spring Integration 2.1 ships with an updated XML Schema (version 2.1), providing many improvements, e.g. the Router standardizations discussed above.

From now on, users must always declare the latest XML schema (currently version 2.1). Alternatively, they can use the version-less schema. Generally, the best option is to use version-less namespaces, as these will automatically use the latest available version of Spring Integration.

Declaring a version-less Spring Integration namespace:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:int="http://www.springframework.org/schema/integration"
   xsi:schemaLocation="http://www.springframework.org/schema/integration
           https://www.springframework.org/schema/integration/spring-integration.xsd
           http://www.springframework.org/schema/beans
           https://www.springframework.org/schema/beans/spring-beans.xsd">
...
</beans>

Declaring a Spring Integration namespace using an explicit version:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:int="http://www.springframework.org/schema/integration"
   xsi:schemaLocation="http://www.springframework.org/schema/integration
           https://www.springframework.org/schema/integration/spring-integration-2.2.xsd
           http://www.springframework.org/schema/beans
           https://www.springframework.org/schema/beans/spring-beans.xsd">
...
</beans>

The old 1.0 and 2.0 schemas are still there, but if an Application Context still references one of those deprecated schemas, the validator will fail on initialization.

==== Source Control Management and Build Infrastructure

===== Source Code now hosted on Github

Since version 2.0, the Spring Integration project uses Git for version control. In order to increase community visibility even further, the project was moved from SpringSource hosted Git repositories to Github. The Spring Integration Git repository is located at: spring-integration.

For the project we also improved the process of providing code contributions and we ensure that every commit is peer-reviewed. In fact, core committers now follow the same process as contributors. For more details please see:

Contributing.

===== Improved Source Code Visibility with Sonar

In an effort to provide better source code visibility and consequently to monitor the quality of Spring Integration’s source code, an instance of Sonar was setup and metrics are gathered nightly and made available at:

sonar.spring.io.

==== New Samples

For the 2.1 release of Spring Integration we also expanded the Spring Integration Samples project and added many new samples, e.g. samples covering AMQP support, the new payload enricher, a sample illustrating techniques for testing Spring Integration flow fragments, as well as an example for executing Stored Procedures against Oracle. For details please visit:

spring-integration-samples.

=== Changes between 1.0 and 2.0

For a detailed migration guide in regards to upgrading an existing application that uses Spring Integration older than version 2.0, please see:

Spring Integration 1.0 to 2.0 Migration Guide

==== Spring 3 support

Spring Integration 2.0 is built on top of Spring 3.0.5 and makes many of its features available to our users.

===== Support for the Spring Expression Language (SpEL)

You can now use SpEL expressions within the transformer, router, filter, splitter, aggregator, service-activator, header-enricher, and many more elements of the Spring Integration core namespace as well as various adapters. There are many samples provided throughout this manual.

===== ConversionService and Converter

You can now benefit from Conversion Service support provided with Spring while configuring many Spring Integration components such as Datatype Channel. See Section 4.1.2, “Message Channel Implementations” as well Section 8.5.1, “Introduction”. Also, the SpEL support mentioned in the previous point also relies upon the ConversionService. Therefore, you can register Converters once, and take advantage of them anywhere you are using SpEL expressions.

===== TaskScheduler and Trigger

Spring 3.0 defines two new strategies related to scheduling: TaskScheduler and Trigger Spring Integration (which uses a lot of scheduling) now builds upon these. In fact, Spring Integration 1.0 had originally defined some of the components (e.g. CronTrigger) that have now been migrated into Spring 3.0’s core API. Now, you can benefit from reusing the same components within the entire Application Context (not just Spring Integration configuration). Configuration of Spring Integration Pollers has been greatly simplified as well by providing attributes for directly configuring rates, delays, cron expressions, and trigger references. See Section 4.3, “Channel Adapter” for sample configurations.

===== RestTemplate and HttpMessageConverter

Our outbound HTTP adapters now delegate to Spring’s RestTemplate for executing the HTTP request and handling its response. This also means that you can reuse any custom HttpMessageConverter implementations. See Section 17.3, “Http Outbound Components” for more details.

==== Enterprise Integration Pattern Additions

Also in 2.0 we have added support for even more of the patterns described in Hohpe and Woolf’s Enterprise Integration Patterns book.

===== Message History

We now provide support for the Message History pattern allowing you to keep track of all traversed components, including the name of each channel and endpoint as well as the timestamp of that traversal. See Section 9.3, “Message History” for more details.

===== Message Store

We now provide support for the Message Store pattern. The Message Store provides a strategy for persisting messages on behalf of any process whose scope extends beyond a single transaction, such as the Aggregator and Resequencer. Many sections of this document provide samples on how to use a Message Store as it affects several areas of Spring Integration. See Section 9.4, “Message Store”, Section 7.3, “Claim Check”, Section 4.1, “Message Channels”, Section 6.4, “Aggregator”, Chapter 18, JDBC Support, and Section 6.5, “Resequencer” for more details

===== Claim Check

We have added an implementation of the Claim Check pattern. The idea behind the Claim Check pattern is that you can exchange a Message payload for a "claim ticket" and vice-versa. This allows you to reduce bandwidth and/or avoid potential security issues when sending Messages across channels. See Section 7.3, “Claim Check” for more details.

===== Control Bus

We have provided implementations of the Control Bus pattern which allows you to use messaging to manage and monitor endpoints and channels. The implementations include both a SpEL-based approach and one that executes Groovy scripts. See Section 9.6, “Control Bus” and Section 8.8.2, “Control Bus” for more details.

==== New Channel Adapters and Gateways

We have added several new Channel Adapters and Messaging Gateways in Spring Integration 2.0.

===== TCP/UDP Adapters

We have added Channel Adapters for receiving and sending messages over the TCP and UDP internet protocols. See Chapter 31, TCP and UDP Support for more details. Also, you can checkout the following blog: TCP/UDP support

===== Twitter Adapters

Twitter adapters provides support for sending and receiving Twitter Status updates as well as Direct Messages. You can also perform Twitter Searches with an inbound Channel Adapter. See Section 31.3.2, “TCP Failover Client Connection Factory” for more details.

===== XMPP Adapters

The new XMPP adapters support both Chat Messages and Presence events. See Section 31.3.2, “TCP Failover Client Connection Factory” for more details.

===== FTP/FTPS Adapters

Inbound and outbound File transfer support over FTP/FTPS is now available. See Chapter 15, FTP/FTPS Adapters for more details.

===== SFTP Adapters

Inbound and outbound File transfer support over SFTP is now available. See Chapter 27, SFTP Adapters for more details.

===== Feed Adapters

We have also added Channel Adapters for receiving news feeds (ATOM/RSS). See Chapter 13, Feed Adapter for more details.

==== Other Additions

===== Groovy Support

With Spring Integration 2.0 we’ve added Groovy support allowing you to use Groovy scripting language to provide integration and/or business logic. See Section 8.8, “Groovy support” for more details.

===== Map Transformers

These symmetrical transformers convert payload objects to and from a Map. See Section 7.1, “Transformer” for more details.

===== JSON Transformers

These symmetrical transformers convert payload objects to and from JSON. See Section 7.1, “Transformer” for more details.

===== Serialization Transformers

These symmetrical transformers convert payload objects to and from byte arrays. They also support the Serializer and Deserializer strategy interfaces that have been added as of Spring 3.0.5. See Section 7.1, “Transformer” for more details.

==== Framework Refactoring

The core API went through some significant refactoring to make it simpler and more usable. Although we anticipate that the impact to the end user should be minimal, please read through this document to find what was changed. Especially, visit Section 6.1.5, “Dynamic Routers” , Section 8.4, “Messaging Gateways”, Section 17.3, “Http Outbound Components”, Section 5.1, “Message”, and Section 6.4, “Aggregator” for more details. If you are depending directly on some of the core components (Message, MessageHeaders, MessageChannel, MessageBuilder, etc.), you will notice that you need to update any import statements. We restructured some packaging to provide the flexibility we needed for extending the domain model while avoiding any cyclical dependencies (it is a policy of the framework to avoid such "tangles").

==== New Source Control Management and Build Infrastructure

With Spring Integration 2.0 we have switched our build environment to use Git for source control. To access our repository simply follow this URL: https://git.springsource.org/spring-integration. We have also switched our build system to Gradle.

==== New Spring Integration Samples

With Spring Integration 2.0 we have decoupled the samples from our main release distribution. Please read this blog to get more info New Spring Integration Samples We have also created many new samples, including samples for every new Adapter.

==== Spring Tool Suite Visual Editor for Spring Integration

There is an amazing new visual editor for Spring Integration included within the latest version of SpringSource Tool Suite. If you are not already using STS, please download it here:

Spring Tool Suite