32. TCP and UDP Support

Spring Integration provides Channel Adapters for receiving and sending messages over internet protocols. Both UDP (User Datagram Protocol) and TCP (Transmission Control Protocol) adapters are provided. Each adapter provides for one-way communication over the underlying protocol. In addition, simple inbound and outbound tcp gateways are provided. These are used when two-way communication is needed.

32.1 Introduction

Two flavors each of UDP inbound and outbound channel adapters are provided UnicastSendingMessageHandler sends a datagram packet to a single destination. UnicastReceivingChannelAdapter receives incoming datagram packets. MulticastSendingMessageHandler sends (broadcasts) datagram packets to a multicast address. MulticastReceivingChannelAdapter receives incoming datagram packets by joining to a multicast address.

TCP inbound and outbound channel adapters are provided TcpSendingMessageHandler sends messages over TCP. TcpReceivingChannelAdapter receives messages over TCP.

An inbound TCP gateway is provided; this allows for simple request/response processing. While the gateway can support any number of connections, each connection can only process serially. The thread that reads from the socket waits for, and sends, the response before reading again. If the connection factory is configured for single use connections, the connection is closed after the socket times out.

An outbound TCP gateway is provided; this allows for simple request/response processing. If the associated connection factory is configured for single use connections, a new connection is immediately created for each new request. Otherwise, if the connection is in use, the calling thread blocks on the connection until either a response is received or a timeout or I/O error occurs.

The TCP and UDP inbound channel adapters, and the TCP inbound gateway, support the "error-channel" attribute. This provides the same basic functionality as described in Section 8.4.1, “Enter the GatewayProxyFactoryBean”.

32.2 UDP Adapters

32.2.1 Outbound (XML Configuration)

<int-ip:udp-outbound-channel-adapter id="udpOut"
    host="somehost"
    port="11111"
    multicast="false"
    channel="exampleChannel"/>

A simple UDP outbound channel adapter.

[Tip]Tip

When setting multicast to true, provide the multicast address in the host attribute.

UDP is an efficient, but unreliable protocol. Two attributes are added to improve reliability. When check-length is set to true, the adapter precedes the message data with a length field (4 bytes in network byte order). This enables the receiving side to verify the length of the packet received. If a receiving system uses a buffer that is too short the contain the packet, the packet can be truncated. The length header provides a mechanism to detect this.

Starting with version 4.3, the port can be set to 0, in which case the Operating System chooses the port; the chosen port can be discovered by invoking getPort() after the adapter is started and isListening() returns true.

<int-ip:udp-outbound-channel-adapter id="udpOut"
    host="somehost"
    port="11111"
    multicast="false"
    check-length="true"
    channel="exampleChannel"/>

An outbound channel adapter that adds length checking to the datagram packets.

[Tip]Tip

The recipient of the packet must also be configured to expect a length to precede the actual data. For a Spring Integration UDP inbound channel adapter, set its check-length attribute.

The second reliability improvement allows an application-level acknowledgment protocol to be used. The receiver must send an acknowledgment to the sender within a specified time.

<int-ip:udp-outbound-channel-adapter id="udpOut"
    host="somehost"
    port="11111"
    multicast="false"
    check-length="true"
    acknowledge="true"
    ack-host="thishost"
    ack-port="22222"
    ack-timeout="10000"
    channel="exampleChannel"/>

An outbound channel adapter that adds length checking to the datagram packets and waits for an acknowledgment.

[Tip]Tip

Setting acknowledge to true implies the recipient of the packet can interpret the header added to the packet containing acknowledgment data (host and port). Most likely, the recipient will be a Spring Integration inbound channel adapter.

[Tip]Tip

When multicast is true, an additional attribute min-acks-for-success specifies how many acknowledgments must be received within the ack-timeout.

For even more reliable networking, TCP can be used.

Starting with version 4.3, the ackPort can be set to 0, in which case the Operating System chooses the port.

32.2.2 Outbound (Java Configuration)

@Bean
@ServiceActivator(inputChannel = "udpOut")
public UnicastSendingMessageHandler handler() {
	return new UnicastSendingMessageHandler("localhost", 11111);
}

(or MulticastSendingChannelAdapter for multicast).

32.2.3 Outbound (Java DSL Configuration)

@Bean
public IntegrationFlow udpOutFlow() {
	return IntegrationFlows.from("udpOut")
			.handle(Udp.outboundAdapter("localhost", 1234))
			.get();
}

32.2.4 Inbound (XML Configuration)

<int-ip:udp-inbound-channel-adapter id="udpReceiver"
    channel="udpOutChannel"
    port="11111"
    receive-buffer-size="500"
    multicast="false"
    check-length="true"/>

A basic unicast inbound udp channel adapter.

<int-ip:udp-inbound-channel-adapter id="udpReceiver"
    channel="udpOutChannel"
    port="11111"
    receive-buffer-size="500"
    multicast="true"
    multicast-address="225.6.7.8"
    check-length="true"/>

A basic multicast inbound udp channel adapter.

By default, reverse DNS lookups are done on inbound packets to convert IP addresses to hostnames for use in message headers. In environments where DNS is not configured, this can cause delays. This default behavior can be overridden by setting the lookup-host attribute to "false".

32.2.5 Inbound (Java Configuration)

@Bean
public UnicastReceivingChannelAdapter udpIn() {
	UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(11111);
	adapter.setOutputChannelName("udpChannel");
	return adapter;
}

32.2.6 Inbound (Java DSL Configuration)

@Bean
public IntegrationFlow udpIn() {
	return IntegrationFlows.from(Udp.inboundAdapter(11111))
			.channel("udpChannel")
			.get();
}

32.2.7 Server Listening Events

Starting with version 5.0.2, a UdpServerListeningEvent is emitted when an inbound adapter is started and has begun listening. This is useful when the adapter is configured to listen on port 0, meaning that the operating system chooses the port. It can also be used instead of polling isListening(), if you need to wait before starting some other process that will connect to the socket.

32.2.8 Advanced Outbound Configuration

The destination-expression and socket-expression options are available for the <int-ip:udp-outbound-channel-adapter> (UnicastSendingMessageHandler).

The destination-expression can be used as a runtime alternative to the hardcoded host/port pair to determine the destination address for the outgoing datagram packet against a requestMessage as the root object for the evaluation context. The expression must evaluate to an URI, or String in the URI style (see RFC-2396) or SocketAddress. The inbound IpHeaders.PACKET_ADDRESS header can be used for this expression as well. In the Framework, this header is populated by the DatagramPacketMessageMapper, when we receive datagrams in the UnicastReceivingChannelAdapter and convert them to messages. The header value is exactly the result of DatagramPacket.getSocketAddress() of the incoming datagram.

With the socket-expression, the Outbound Channel Adapter can use e.g. an Inbound Channel Adapter socket to send datagrams through same port which they were received. It’s useful in a scenario when our application works as a UDP server and clients operate behind NAT. This expression must evaluate to a DatagramSocket. The requestMessage is used as the root object for the evaluation context. The socket-expression parameter cannot be used with parameters multicast and acknowledge.

<int-ip:udp-inbound-channel-adapter id="inbound" port="0" channel="in" />

<int:channel id="in" />

<int:transformer expression="new String(payload).toUpperCase()"
                       input-channel="in" output-channel="out"/>

<int:channel id="out" />

<int-ip:udp-outbound-channel-adapter id="outbound"
                        socket-expression="@inbound.socket"
                        destination-expression="headers['ip_packetAddress']"
                        channel="out" />

The equivalent configuration using Java DSL Configuration:

@Bean
public IntegrationFlow udpEchoUpcaseServer() {
	return IntegrationFlows.from(Udp.inboundAdapter(11111).id("udpIn"))
			.<byte[], String>transform(p -> new String(p).toUpperCase())
			.handle(Udp.outboundAdapter("headers['ip_packetAddress']")
					.socketExpression("@udpIn.socket"))
			.get();
}

32.3 TCP Connection Factories

For TCP, the configuration of the underlying connection is provided using a Connection Factory. Two types of connection factory are provided; a client connection factory and a server connection factory. Client connection factories are used to establish outgoing connections; Server connection factories listen for incoming connections.

A client connection factory is used by an outbound channel adapter but a reference to a client connection factory can also be provided to an inbound channel adapter and that adapter will receive any incoming messages received on connections created by the outbound adapter.

A server connection factory is used by an inbound channel adapter or gateway (in fact the connection factory will not function without one). A reference to a server connection factory can also be provided to an outbound adapter; that adapter can then be used to send replies to incoming messages to the same connection.

[Tip]Tip

Reply messages will only be routed to the connection if the reply contains the header ip_connectionId that was inserted into the original message by the connection factory.

[Tip]Tip

This is the extent of message correlation performed when sharing connection factories between inbound and outbound adapters. Such sharing allows for asynchronous two-way communication over TCP. By default, only payload information is transferred using TCP; therefore any message correlation must be performed by downstream components such as aggregators or other endpoints. Support for transferring selected headers was introduced in version 3.0. For more information refer to Section 32.8, “TCP Message Correlation”.

A maximum of one adapter of each type may be given a reference to a connection factory.

Connection factories using java.net.Socket and java.nio.channel.SocketChannel are provided.

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"/>

A simple server connection factory that uses java.net.Socket connections.

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    using-nio="true"/>

A simple server connection factory that uses java.nio.channel.SocketChannel connections.

[Note]Note

Starting with Spring Integration version 4.2, if the server is configured to listen on a random port (0), the actual port chosen by the OS can be obtained using getPort(). Also, getServerSocketAddress() is available to get the complete SocketAddress. See the javadocs for the TcpServerConnectionFactory interface for more information.

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"/>

A client connection factory that uses java.net.Socket connections and creates a new connection for each message.

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"
    using-nio=true/>

A client connection factory that uses java.nio.channel.Socket connections and creates a new connection for each message.

TCP is a streaming protocol; this means that some structure has to be provided to data transported over TCP, so the receiver can demarcate the data into discrete messages. Connection factories are configured to use (de)serializers to convert between the message payload and the bits that are sent over TCP. This is accomplished by providing a deserializer and serializer for inbound and outbound messages respectively. A number of standard (de)serializers are provided.

The ByteArrayCrlfSerializer*, converts a byte array to a stream of bytes followed by carriage return and linefeed characters (\r\n). This is the default (de)serializer and can be used with telnet as a client, for example.

The ByteArraySingleTerminatorSerializer*, converts a byte array to a stream of bytes followed by a single termination character (default 0x00).

The ByteArrayLfSerializer*, converts a byte array to a stream of bytes followed by a single linefeed character (0x0a).

The ByteArrayStxEtxSerializer*, converts a byte array to a stream of bytes preceded by an STX (0x02) and followed by an ETX (0x03).

The ByteArrayLengthHeaderSerializer, converts a byte array to a stream of bytes preceded by a binary length in network byte order (big endian). This a very efficient deserializer because it does not have to parse every byte looking for a termination character sequence. It can also be used for payloads containing binary data; the above serializers only support text in the payload. The default size of the length header is 4 bytes (Integer), allowing for messages up to (2^31 - 1) bytes. However, the length header can be a single byte (unsigned) for messages up to 255 bytes, or an unsigned short (2 bytes) for messages up to (2^16 - 1) bytes. If you need any other format for the header, you can subclass this class and provide implementations for the readHeader and writeHeader methods. The absolute maximum data size supported is (2^31 - 1) bytes.

The ByteArrayRawSerializer*, converts a byte array to a stream of bytes and adds no additional message demarcation data; with this (de)serializer, the end of a message is indicated by the client closing the socket in an orderly fashion. When using this serializer, message reception will hang until the client closes the socket, or a timeout occurs; a timeout will NOT result in a message. When this serializer is being used, and the client is a Spring Integration application, the client must use a connection factory that is configured with single-use=true - this causes the adapter to close the socket after sending the message; the serializer will not, itself, close the connection. This serializer should only be used with connection factories used by channel adapters (not gateways), and the connection factories should be used by either an inbound or outbound adapter, and not both. Also see ByteArrayElasticRawDeserializer below.

[Note]Note

Before version 4.2.2, when using NIO, this serializer treated a timeout (during read) as an end of file and the data read so far was emitted as a message. This is unreliable and should not be used to delimit messages; it now treats such conditions as an exception. In the unlikely event you are using it this way, the previous behavior can be restored by setting the treatTimeoutAsEndOfMessage constructor argument to true.

Each of these is a subclass of AbstractByteArraySerializer which implements both org.springframework.core.serializer.Serializer and org.springframework.core.serializer.Deserializer. For backwards compatibility, connections using any subclass of AbstractByteArraySerializer for serialization will also accept a String which will be converted to a byte array first. Each of these (de)serializers converts an input stream containing the corresponding format to a byte array payload.

To avoid memory exhaustion due to a badly behaved client (one that does not adhere to the protocol of the configured serializer), these serializers impose a maximum message size. If the size is exceeded by an incoming message, an exception will be thrown. The default maximum message size is 2048 bytes, and can be increased by setting the maxMessageSize property. If you are using the default (de)serializer and wish to increase the maximum message size, you must declare it as an explicit bean with the property set and configure the connection factory to use that bean.

The classes marked with * above use an intermediate buffer and copy the decoded data to a final buffer of the correct size. Starting with version 4.3, these can be configured with a poolSize property to allow these raw buffers to be reused instead of being allocated and discarded for each message, which is the default behavior. Setting the property to a negative value will create a pool that has no bounds. If the pool is bounded, you can also set the poolWaitTimeout property (milliseconds) after which an exception is thrown if no buffer becomes available; it defaults to infinity. Such an exception will cause the socket to be closed.

If you wish to use the same mechanism in custom deserializers, subclass AbstractPooledBufferByteArraySerializer instead of its super class AbstractByteArraySerializer, and implement doDeserialize() instead of deserialize(). The buffer will be returned to the pool automatically. AbstractPooledBufferByteArraySerializer also provides a convenient utility method copyToSizedArray().

Version 5.0 added the ByteArrayElasticRawDeserializer. This is similar to the deserializer side of ByteArrayRawSerializer above, except it is not necessary to set a maxMessageSize. Internally, it uses a ByteArrayOutputStream which allows the buffer to grow as needed. The client must close the socket in an orderly manner to signal end of message.

The MapJsonSerializer uses a Jackson ObjectMapper to convert between a Map and JSON. This can be used in conjunction with a MessageConvertingTcpMessageMapper and a MapMessageConverter to transfer selected headers and the payload in a JSON format.

[Note]Note

The Jackson ObjectMapper cannot demarcate messages in the stream. Therefore, the MapJsonSerializer needs to delegate to another (de)serializer to handle message demarcation. By default, a ByteArrayLfSerializer is used, resulting in messages with the format <json><LF> on the wire, but you can configure it to use others instead.

The final standard serializer is org.springframework.core.serializer.DefaultSerializer which can be used to convert Serializable objects using java serialization.org.springframework.core.serializer.DefaultDeserializer is provided for inbound deserialization of streams containing Serializable objects.

To implement a custom (de)serializer pair, implement the org.springframework.core.serializer.Deserializer and org.springframework.core.serializer.Serializer interfaces.

If you do not wish to use the default (de)serializer (ByteArrayCrLfSerializer), you must supply serializer and deserializer attributes on the connection factory (example below).

<bean id="javaSerializer"
      class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
      class="org.springframework.core.serializer.DefaultDeserializer" />

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    deserializer="javaDeserializer"
    serializer="javaSerializer"/>

A server connection factory that uses java.net.Socket connections and uses Java serialization on the wire.

For full details of the attributes available on connection factories, see the reference at the end of this section.

By default, reverse DNS lookups are done on inbound packets to convert IP addresses to hostnames for use in message headers. In environments where DNS is not configured, this can cause connection delays. This default behavior can be overridden by setting the lookup-host attribute to "false".

[Note]Note

It is possible to modify the creation of and/or attributes of sockets - see Section 32.10, “SSL/TLS Support”. As is noted there, such modifications are possible whether or not SSL is being used.

32.3.1 TCP Caching Client Connection Factory

As noted above, TCP sockets can be single-use (one request/response) or shared. Shared sockets do not perform well with outbound gateways, in high-volume environments, because the socket can only process one request/response at a time.

To improve performance, users could use collaborating channel adapters instead of gateways, but that requires application-level message correlation. See Section 32.8, “TCP Message Correlation” for more information.

Spring Integration 2.2 introduced a caching client connection factory, where a pool of shared sockets is used, allowing a gateway to process multiple concurrent requests with a pool of shared connections.

32.3.2 TCP Failover Client Connection Factory

It is now possible to configure a TCP connection factory that supports failover to one or more other servers. When sending a message, the factory will iterate over all its configured factories until either the message can be sent, or no connection can be found. Initially, the first factory in the configured list is used; if a connection subsequently fails the next factory will become the current factory.

<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
    <constructor-arg>
        <list>
            <ref bean="clientFactory1"/>
            <ref bean="clientFactory2"/>
        </list>
    </constructor-arg>
</bean>
[Note]Note

When using the failover connection factory, the singleUse property must be consistent between the factory itself and the list of factories it is configured to use.

32.3.3 TCP Thread Affinity Connection Factory

Spring Integration version 5.0 introduced this connection factory. It binds a connection to the calling thread and the same connection is reused each time that thread sends a message. This continues until the connection is closed (by the server or network) or until the thread calls the releaseConnection() method. The connections themselves are provided by another client factory implementation; which must be configured to provide non-shared (single-use) connections so that each thread gets a connection.

Example configuration:

@Bean
public TcpNetClientConnectionFactory cf() {
    TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
            Integer.parseInt(System.getProperty(PORT)));
    cf.setSingleUse(true);
    return cf;
}

@Bean
public ThreadAffinityClientConnectionFactory tacf() {
    return new ThreadAffinityClientConnectionFactory(cf());
}

@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
    TcpOutboundGateway outGate = new TcpOutboundGateway();
    outGate.setConnectionFactory(tacf());
    outGate.setReplyChannelName("toString");
    return outGate;
}

32.4 TCP Connection Interceptors

Connection factories can be configured with a reference to a TcpConnectionInterceptorFactoryChain. Interceptors can be used to add behavior to connections, such as negotiation, security, and other setup. No interceptors are currently provided by the framework but, for an example, see the InterceptedSharedConnectionTests in the source repository.

The HelloWorldInterceptor used in the test case works as follows:

When configured with a client connection factory, when the first message is sent over a connection that is intercepted, the interceptor sends Hello over the connection, and expects to receive world!. When that occurs, the negotiation is complete and the original message is sent; further messages that use the same connection are sent without any additional negotiation.

When configured with a server connection factory, the interceptor requires the first message to be Hello and, if it is, returns world!. Otherwise it throws an exception causing the connection to be closed.

All TcpConnection methods are intercepted. Interceptor instances are created for each connection by an interceptor factory. If an interceptor is stateful, the factory should create a new instance for each connection; if there is no state, the same interceptor can wrap each connection. Interceptor factories are added to the configuration of an interceptor factory chain, which is provided to a connection factory using the interceptor-factory attribute. Interceptors must extend TcpConnectionInterceptorSupport; factories must implement the TcpConnectionInterceptorFactory interface. TcpConnectionInterceptorSupport is provided with passthrough methods; by extending this class, you only need to implement those methods you wish to intercept.

<bean id="helloWorldInterceptorFactory"
    class="o.s.i.ip.tcp.connection.TcpConnectionInterceptorFactoryChain">
    <property name="interceptors">
        <array>
            <bean class="o.s.i.ip.tcp.connection.HelloWorldInterceptorFactory"/>
        </array>
    </property>
</bean>

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="12345"
    using-nio="true"
    single-use="true"
    interceptor-factory-chain="helloWorldInterceptorFactory"/>

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="12345"
    single-use="true"
    so-timeout="100000"
    using-nio="true"
    interceptor-factory-chain="helloWorldInterceptorFactory"/>

Configuring a connection interceptor factory chain.

32.5 TCP Connection Events

Beginning with version 3.0, changes to TcpConnection s are reported by TcpConnectionEvent s. TcpConnectionEvent is a subclass of ApplicationEvent and thus can be received by any ApplicationListener defined in the ApplicationContext, for example Event Inbound Channel Adapter.

TcpConnectionEvents have the following properties:

  • connectionId - the connection identifier which can be used in a message header to send data to the connection
  • connectionFactoryName - the bean name of the connection factory the connection belongs to
  • throwable - the Throwable (for TcpConnectionExceptionEvent events only)
  • source - the TcpConnection; this can be used, for example, to determine the remote IP Address with getHostAddress() (cast required)

In addition, since version 4.0 the standard deserializers discussed in Section 32.3, “TCP Connection Factories” now emit TcpDeserializationExceptionEvent s when problems are encountered decoding the data stream. These events contain the exception, the buffer that was in the process of being built, and an offset into the buffer (if available) at the point the exception occurred. Applications can use a normal ApplicationListener, or see Section 13.1, “Receiving Spring Application Events”, to capture these events, allowing analysis of the problem.

Starting with versions 4.0.7, 4.1.3, TcpConnectionServerExceptionEvent s are published whenever an unexpected exception occurs on a server socket (such as a BindException when the server socket is in use). These events have a reference to the connection factory and the cause.

Starting with version 4.2, TcpConnectionFailedCorrelationEvent s are published whenever an endpoint (inbound gateway or collaborating outbound channel adapter) receives a message that cannot be routed to a connection because the ip_connectionId header is invalid. Outbound gateways also publish this event when a late reply is received (the sender thread has timed out). The event contains the connection id as well as an exception in the cause property that contains the failed message.

Starting with version 4.3, a TcpConnectionServerListeningEvent is emitted when a server connection factory is started. This is useful when the factory is configured to listen on port 0, meaning that the operating system chooses the port. It can also be used instead of polling isListening(), if you need to wait before starting some other process that will connect to the socket.

[Important]Important

To avoid delaying the listening thread from accepting connections, the event is published on a separate thread.

Starting with version 4.3.2, a TcpConnectionFailedEvent is emitted whenever a client connection can’t be created. The source of the event is the connection factory which can be used to determine the host and port to which the connection could not be established.

32.6 TCP Adapters

TCP inbound and outbound channel adapters that utilize the above connection factories are provided. These adapters have attributes connection-factory and channel. The channel attribute specifies the channel on which messages arrive at an outbound adapter and on which messages are placed by an inbound adapter. The connection-factory attribute indicates which connection factory is to be used to manage connections for the adapter. While both inbound and outbound adapters can share a connection factory, server connection factories are always owned by an inbound adapter; client connection factories are always owned by an outbound adapter. One, and only one, adapter of each type may get a reference to a connection factory.

<bean id="javaSerializer"
      class="org.springframework.core.serializer.DefaultSerializer"/>
<bean id="javaDeserializer"
      class="org.springframework.core.serializer.DefaultDeserializer"/>

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    deserializer="javaDeserializer"
    serializer="javaSerializer"
    using-nio="true"
    single-use="true"/>

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="#{server.port}"
    single-use="true"
    so-timeout="10000"
    deserializer="javaDeserializer"
    serializer="javaSerializer"/>

<int:channel id="input" />

<int:channel id="replies">
    <int:queue/>
</int:channel>

<int-ip:tcp-outbound-channel-adapter id="outboundClient"
    channel="input"
    connection-factory="client"/>

<int-ip:tcp-inbound-channel-adapter id="inboundClient"
    channel="replies"
    connection-factory="client"/>

<int-ip:tcp-inbound-channel-adapter id="inboundServer"
    channel="loop"
    connection-factory="server"/>

<int-ip:tcp-outbound-channel-adapter id="outboundServer"
    channel="loop"
    connection-factory="server"/>

<int:channel id="loop"/>

In this configuration, messages arriving in channel input are serialized over connections created by client received at the server and placed on channel loop. Since loop is the input channel for outboundServer the message is simply looped back over the same connection and received by inboundClient and deposited in channel replies. Java serialization is used on the wire.

Normally, inbound adapters use a type="server" connection factory, which listens for incoming connection requests. In some cases, it is desirable to establish the connection in reverse, whereby the inbound adapter connects to an external server and then waits for inbound messages on that connection.

This topology is supported by using client-mode="true" on the inbound adapter. In this case, the connection factory must be of type client and must have single-use set to false.

Two additional attributes are used to support this mechanism: retry-interval specifies (in milliseconds) how often the framework will attempt to reconnect after a connection failure. scheduler is used to supply a TaskScheduler used to schedule the connection attempts, and to test that the connection is still active.

For an outbound adapter, the connection is normally established when the first message is sent. client-mode="true" on an outbound adapter will cause the connection to be established when the adapter is started. Adapters are automatically started by default. Again, the connection factory must be of type client and have single-use set to false and retry-interval and scheduler are also supported. If a connection fails, it will be re-established either by the scheduler or when the next message is sent.

For both inbound and outbound, if the adapter is started, you may force the adapter to establish a connection by sending a <control-bus /> command: @adapter_id.retryConnection() and examine the current state with @adapter_id.isConnected().

32.7 TCP Gateways

The inbound TCP gateway TcpInboundGateway and outbound TCP gateway TcpOutboundGateway use a server and client connection factory respectively. Each connection can process a single request/response at a time.

The inbound gateway, after constructing a message with the incoming payload and sending it to the requestChannel, waits for a response and sends the payload from the response message by writing it to the connection.

[Note]Note

For the inbound gateway, care must be taken to retain, or populate, the ip_connectionId header because it is used to correlate the message to a connection. Messages that originate at the gateway will automatically have the header set. If the reply is constructed as a new message, you will need to set the header. The header value can be captured from the incoming message.

As with inbound adapters, inbound gateways normally use a type="server" connection factory, which listens for incoming connection requests. In some cases, it is desirable to establish the connection in reverse, whereby the inbound gateway connects to an external server and then waits for, and replies to, inbound messages on that connection.

This topology is supported by using client-mode="true" on the inbound gateway. In this case, the connection factory must be of type client and must have single-use set to false.

Two additional attributes are used to support this mechanism: retry-interval specifies (in milliseconds) how often the framework will attempt to reconnect after a connection failure. scheduler is used to supply a TaskScheduler used to schedule the connection attempts, and to test that the connection is still active.

If the gateway is started, you may force the gateway to establish a connection by sending a <control-bus /> command: @adapter_id.retryConnection() and examine the current state with @adapter_id.isConnected().

The outbound gateway, after sending a message over the connection, waits for a response and constructs a response message and puts in on the reply channel. Communications over the connections are single-threaded. Users should be aware that only one message can be handled at a time and, if another thread attempts to send a message before the current response has been received, it will block until any previous requests are complete (or time out). If, however, the client connection factory is configured for single-use connections each new request gets its own connection and is processed immediately.

<int-ip:tcp-inbound-gateway id="inGateway"
    request-channel="tcpChannel"
    reply-channel="replyChannel"
    connection-factory="cfServer"
    reply-timeout="10000"/>

A simple inbound TCP gateway; if a connection factory configured with the default (de)serializer is used, messages will be \r\n delimited data and the gateway can be used by a simple client such as telnet.

<int-ip:tcp-outbound-gateway id="outGateway"
    request-channel="tcpChannel"
    reply-channel="replyChannel"
    connection-factory="cfClient"
    request-timeout="10000"
    remote-timeout="10000"/> <!-- or e.g.
remote-timeout-expression="headers['timeout']" -->

A simple outbound TCP gateway.

32.8 TCP Message Correlation

32.8.1 Overview

One goal of the IP Endpoints is to provide communication with systems other than another Spring Integration application. For this reason, only message payloads are sent and received, by default. Since 3.0, headers can be transferred, using JSON, Java serialization, or with custom Serializer s and Deserializer s; see Section 32.8.4, “Transferring Headers” for more information. No message correlation is provided by the framework, except when using the gateways, or collaborating channel adapters on the server side. In the paragraphs below we discuss the various correlation techniques available to applications. In most cases, this requires specific application-level correlation of messages, even when message payloads contain some natural correlation data (such as an order number).

32.8.2 Gateways

The gateways will automatically correlate messages. However, an outbound gateway should only be used for relatively low-volume use. When the connection factory is configured for a single shared connection to be used for all message pairs (single-use="false"), only one message can be processed at a time. A new message will have to wait until the reply to the previous message has been received. When a connection factory is configured for each new message to use a new connection (single-use="true"), the above restriction does not apply. While this may give higher throughput than a shared connection environment, it comes with the overhead of opening and closing a new connection for each message pair.

Therefore, for high-volume messages, consider using a collaborating pair of channel adapters. However, you will need to provide collaboration logic.

Another solution, introduced in Spring Integration 2.2, is to use a CachingClientConnectionFactory, which allows the use of a pool of shared connections.

32.8.3 Collaborating Outbound and Inbound Channel Adapters

To achieve high-volume throughput (avoiding the pitfalls of using gateways as mentioned above) you may consider configuring a pair of collaborating outbound and inbound channel adapters. Collaborating adapters can also be used (server-side or client-side) for totally asynchronous communication (rather than with request/reply semantics). On the server side, message correlation is automatically handled by the adapters because the inbound adapter adds a header allowing the outbound adapter to determine which connection to use to send the reply message.

[Note]Note

On the server side, care must be taken to populate the ip_connectionId header because it is used to correlate the message to a connection. Messages that originate at the inbound adapter will automatically have the header set. If you wish to construct other messages to send, you will need to set the header. The header value can be captured from an incoming message.

On the client side, the application will have to provide its own correlation logic, if needed. This can be done in a number of ways.

If the message payload has some natural correlation data, such as a transaction id or an order number, AND there is no need to retain any information (such as a reply channel header) from the original outbound message, the correlation is simple and would done at the application level in any case.

If the message payload has some natural correlation data, such as a transaction id or an order number, but there is a need to retain some information (such as a reply channel header) from the original outbound message, you may need to retain a copy of the original outbound message (perhaps by using a publish-subscribe channel) and use an aggregator to recombine the necessary data.

For either of the previous two paragraphs, if the payload has no natural correlation data, you may need to provide a transformer upstream of the outbound channel adapter to enhance the payload with such data. Such a transformer may transform the original payload to a new object containing both the original payload and some subset of the message headers. Of course, live objects (such as reply channels) from the headers can not be included in the transformed payload.

If such a strategy is chosen you will need to ensure the connection factory has an appropriate serializer/deserializer pair to handle such a payload, such as the DefaultSerializer/Deserializer which use java serialization, or a custom serializer and deserializer. The ByteArray*Serializer options mentioned in Section 32.3, “TCP Connection Factories”, including the default ByteArrayCrLfSerializer, do not support such payloads, unless the transformed payload is a String or byte[],

[Note]Note

Before the 2.2 release, when a client connection factory was used by collaborating channel adapters, the so-timeout attribute defaulted to the default reply timeout (10 seconds). This meant that if no data were received by the inbound adapter for this period of time, the socket was closed.

This default behavior was not appropriate in a truly asynchronous environment, so it now defaults to an infinite timeout. You can reinstate the previous default behavior by setting the so-timeout attribute on the client connection factory to 10000 milliseconds.

32.8.4 Transferring Headers

TCP is a streaming protocol; Serializers and Deserializers are used to demarcate messages within the stream. Prior to 3.0, only message payloads (String or byte[]) could be transferred over TCP. Beginning with 3.0, you can now transfer selected headers as well as the payload. It is important to understand, though, that "live" objects, such as the replyChannel header cannot be serialized.

Sending header information over TCP requires some additional configuration.

The first step is to provide the ConnectionFactory with a MessageConvertingTcpMessageMapper using the mapper attribute. This mapper delegates to any MessageConverter implementation to convert the message to/from some object that can be (de)serialized by the configured serializer and deserializer.

A MapMessageConverter is provided, which allows the specification of a list of headers that will be added to a Map object, along with the payload. The generated Map has two entries: payload and headers. The headers entry is itself a Map containing the selected headers.

The second step is to provide a (de)serializer that can convert between a Map and some wire format. This can be a custom (de)Serializer, which would typically be needed if the peer system is not a Spring Integration application.

A MapJsonSerializer is provided that will convert a Map to/from JSON. This uses a Spring Integration JsonObjectMapper to perform this function. You can provide a custom JsonObjectMapper if needed. By default, the serializer inserts a linefeed 0x0a character between objects. See the JavaDocs for more information.

[Note]Note

At the time of writing, the JsonObjectMapper uses whichever version of Jackson is on the classpath.

You can also use standard Java serialization of the Map, using the DefaultSerializer and DefaultDeserializer.

The following example shows the configuration of a connection factory that transfers the correlationId, sequenceNumber, and sequenceSize headers using JSON.

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="12345"
    mapper="mapper"
    serializer="jsonSerializer"
    deserializer="jsonSerializer"/>

<bean id="mapper"
      class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
    <constructor-arg name="messageConverter">
        <bean class="o.sf.integration.support.converter.MapMessageConverter">
            <property name="headerNames">
                <list>
                    <value>correlationId</value>
                    <value>sequenceNumber</value>
                    <value>sequenceSize</value>
                </list>
            </property>
        </bean>
    </constructor-arg>
</bean>

<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />

A message sent with the above configuration, with payload foo would appear on the wire like so:

{"headers":{"correlationId":"bar","sequenceSize":5,"sequenceNumber":1},"payload":"foo"}

32.9 A Note About NIO

Using NIO (see using-nio in Section 32.12, “IP Configuration Attributes”) avoids dedicating a thread to read from each socket. For a small number of sockets, you will likely find that not using NIO, together with an async handoff (e.g. to a QueueChannel), will perform as well as, or better than, using NIO.

Consider using NIO when handling a large number of connections. However, the use of NIO has some other ramifications. A pool of threads (in the task executor) is shared across all the sockets; each incoming message is assembled and sent to the configured channel as a separate unit of work on a thread selected from that pool. Two sequential messages arriving on the same socket might be processed by different threads. This means that the order in which the messages are sent to the channel is indeterminate; the strict ordering of the messages arriving on the socket is not maintained.

For some applications, this is not an issue; for others it is. If strict ordering is required, consider setting using-nio to false and using async handoff.

Alternatively, you may choose to insert a resequencer downstream of the inbound endpoint to return the messages to their proper sequence. Set apply-sequence to true on the connection factory, and messages arriving on a TCP connection will have sequenceNumber and correlationId headers set. The resequencer uses these headers to return the messages to their proper sequence.

Pool Size

The pool size attribute is no longer used; previously, it specified the size of the default thread pool when a task-executor was not specified. It was also used to set the connection backlog on server sockets. The first function is no longer needed (see below); the second function is replaced by the backlog attribute.

Previously, when using a fixed thread pool task executor (which was the default), with NIO, it was possible to get a deadlock and processing would stop. The problem occurred when a buffer was full, a thread reading from the socket was trying to add more data to the buffer, and there were no threads available to make space in the buffer. This only occurred with a very small pool size, but it could be possible under extreme conditions. Since 2.2, two changes have eliminated this problem. First, the default task executor is a cached thread pool executor. Second, deadlock detection logic has been added such that if thread starvation occurs, instead of deadlocking, an exception is thrown, thus releasing the deadlocked resources.

[Important]Important

Now that the default task executor is unbounded, it is possible that an out of memory condition might occur with high rates of incoming messages, if message processing takes extended time. If your application exhibits this type of behavior, you are advised to use a pooled task executor with an appropriate pool size, but see the next section.

32.9.1 Thread Pool Task Executor with CALLER_RUNS Policy

There are some important considerations when using a fixed thread pool with the CallerRunsPolicy (CALLER_RUNS when using the <task/> namespace) and the queue capacity is small.

The following does not apply if you are not using a fixed thread pool.

With NIO connections there are 3 distinct task types; the IO Selector processing is performed on one dedicated thread - detecting events, accepting new connections, and dispatching the IO read operations to other threads, using the task executor. When an IO reader thread (to which the read operation is dispatched) reads data, it hands off to another thread to assemble the incoming message; large messages may take several reads to complete. These "assembler" threads can block waiting for data. When a new read event occurs, the reader determines if this socket already has an assembler and runs a new one if not. When the assembly process is complete, the assembler thread is returned to the pool.

This can cause a deadlock when the pool is exhausted and the CALLER_RUNS rejection policy is in use, and the task queue is full. When the pool is empty and there is no room in the queue, the IO selector thread receives an OP_READ event and dispatches the read using the executor; the queue is full, so the selector thread itself starts the read process; now, it detects that there is not an assembler for this socket and, before it does the read, fires off an assembler; again, the queue is full, and the selector thread becomes the assembler. The assembler is now blocked awaiting the data to be read, which will never happen. The connection factory is now deadlocked because the selector thread can’t handle new events.

We must avoid the selector (or reader) threads performing the assembly task to avoid this deadlock. It is desirable to use seperate pools for the IO and assembly operations.

The framework provides a CompositeExecutor, which allows the configuration of two distinct executors; one for performing IO operations, and one for message assembly. In this environment, an IO thread can never become an assembler thread, and the deadlock cannot occur.

In addition, the task executors should be configured to use a AbortPolicy (ABORT when using <task>). When an IO cannot be completed, it is deferred for a short time and retried continually until it can be completed and an assembler allocated. By default, the delay is 100ms but it can be changed using the readDelay property on the connection factory (read-delay when configuring with the XML namespace).

Example configuration of the composite executor is shown below.

@Bean
private CompositeExecutor compositeExecutor() {
    ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
    ioExec.setCorePoolSize(4);
    ioExec.setMaxPoolSize(10);
    ioExec.setQueueCapacity(0);
    ioExec.setThreadNamePrefix("io-");
    ioExec.setRejectedExecutionHandler(new AbortPolicy());
    ioExec.initialize();
    ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
    assemblerExec.setCorePoolSize(4);
    assemblerExec.setMaxPoolSize(10);
    assemblerExec.setQueueCapacity(0);
    assemblerExec.setThreadNamePrefix("assembler-");
    assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
    assemblerExec.initialize();
    return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg ref="io"/>
    <constructor-arg ref="assembler"/>
</bean>

<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="io-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="8" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="assembler-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="10" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
</bean>

32.10 SSL/TLS Support

32.10.1 Overview

Secure Sockets Layer/Transport Layer Security is supported. When using NIO, the JDK 5+ SSLEngine feature is used to handle handshaking after the connection is established. When not using NIO, standard SSLSocketFactory and SSLServerSocketFactory objects are used to create connections. A number of strategy interfaces are provided to allow significant customization; default implementations of these interfaces provide for the simplest way to get started with secure communications.

32.10.2 Getting Started

Regardless of whether NIO is being used, you need to configure the ssl-context-support attribute on the connection factory. This attribute references a <bean/> definition that describes the location and passwords for the required key stores.

SSL/TLS peers require two keystores each; a keystore containing private/public key pairs identifying the peer; a truststore, containing the public keys for peers that are trusted. See the documentation for the keytool utility provided with the JDK. The essential steps are

  1. Create a new key pair and store in a keystore.
  2. Export the public key.
  3. Import the public key into the peer’s truststore.

Repeat for the other peer.

[Note]Note

It is common in test cases to use the same key stores on both peers, but this should be avoided for production.

After establishing the key stores, the next step is to indicate their locations to the TcpSSLContextSupport bean, and provide a reference to that bean to the connection factory.

<bean id="sslContextSupport"
    class="o.sf.integration.ip.tcp.connection.support.DefaultTcpSSLContextSupport">
    <constructor-arg value="client.ks"/>
    <constructor-arg value="client.truststore.ks"/>
    <constructor-arg value="secret"/>
    <constructor-arg value="secret"/>
</bean>

<ip:tcp-connection-factory id="clientFactory"
    type="client"
    host="localhost"
    port="1234"
    ssl-context-support="sslContextSupport" />

The DefaulTcpSSLContextSupport class also has an optional protocol property, which can be SSL or TLS (default).

The keystore file names (first two constructor arguments) use the Spring Resource abstraction; by default the files will be located on the classpath, but this can be overridden by using the file: prefix, to find the files on the filesystem instead.

Starting with version 4.3.6, when using NIO, you can specify an ssl-handshake-timeout (seconds) on the connection factory. This timeout (default 30) is used during SSL handshake when waiting for data; if the timeout is exceeded, the process is aborted and the socket closed.

32.11 Advanced Techniques

32.11.1 Strategy Interfaces

In many cases, the configuration described above is all that is needed to enable secure communication over TCP/IP. However, a number of strategy interfaces are provided to allow customization and modification of socket factories and sockets.

  • TcpSSLContextSupport
  • TcpSocketFactorySupport
  • TcpSocketSupport
  • TcpNetConnectionSupport
  • TcpNioConnectionSupport

TcpSSLContextSupport. 

public interface TcpSSLContextSupport {

    SSLContext getSSLContext() throws Exception;

}

Implementations of this interface are responsible for creating an SSLContext. The implementation provided by the framework is the DefaultTcpSSLContextSupport described above. If you require different behavior, implement this interface and provide the connection factory with a reference to a bean of your class' implementation.

TcpSocketFactorySupport. 

public interface TcpSocketFactorySupport {

    ServerSocketFactory getServerSocketFactory();

    SocketFactory getSocketFactory();

}

Implementations of this interface are responsible for obtaining references to ServerSocketFactory and SocketFactory. Two implementations are provided; the first is DefaultTcpNetSocketFactorySupport for non-SSL sockets (when no ssl-context-support attribute is defined); this simply uses the JDK’s default factories. The second implementation is DefaultTcpNetSSLSocketFactorySupport; this is used, by default, when an ssl-context-support attribute is defined; it uses the SSLContext created by that bean to create the socket factories.

[Note]Note

This interface only applies if using-nio is "false"; socket factories are not used by NIO.

TcpSocketSupport. 

public interface TcpSocketSupport {

    void postProcessServerSocket(ServerSocket serverSocket);

    void postProcessSocket(Socket socket);

}

Implementations of this interface can modify sockets after they are created, and after all configured attributes have been applied, but before the sockets are used. This applies whether or not NIO is being used. For example, you could use an implementation of this interface to modify the supported cipher suites on an SSL socket, or you could add a listener that gets notified after SSL handshaking is complete. The sole implementation provided by the framework is the DefaultTcpSocketSupport which does not modify the sockets in any way

To supply your own implementation of TcpSocketFactorySupport or TcpSocketSupport, provide the connection factory with references to beans of your custom type using the socket-factory-support and socket-support attributes, respectively.

TcpNetConnectionSupport. 

public interface TcpNetConnectionSupport {

	TcpNetConnection createNewConnection(Socket socket,
			boolean server, boolean lookupHost,
			ApplicationEventPublisher applicationEventPublisher,
			String connectionFactoryName) throws Exception;

}

This interface is invoked to create TcpNetConnection objects (or objects from subclasses). The framework provides a single implementation DefatulTcpNetConnectionSupport which creates simple TcpNetConnection objects by default. It has two properties pushbackCapable and pushbackBufferSize; when push back is enabled, the implementation returns a subclass that wraps the connection’s InputStream in a PushbackInputStream. Aligned with the PushbackInputStream default, the buffer size defaults to 1. This enables deserializers to "unread" (push back) bytes into the stream. The following is a trivial example of how it might be used in a delegating deserializer which "peeks" at the first byte to determine which deserializer to invoke:

public class CompositeDeserializer implements Deserializer<byte[]> {

    private final ByteArrayStxEtxSerializer stxEtx = new ByteArrayStxEtxSerializer();

    private final ByteArrayCrLfSerializer crlf = new ByteArrayCrLfSerializer();

    @Override
    public byte[] deserialize(InputStream inputStream) throws IOException {
        PushbackInputStream pbis = (PushbackInputStream) inputStream;
        int first = pbis.read();
        if (first < 0) {
            throw new SoftEndOfStreamException();
        }
        pbis.unread(first);
        if (first == ByteArrayStxEtxSerializer.STX) {
            this.receivedStxEtx = true;
            return this.stxEtx.deserialize(pbis);
        }
        else {
            this.receivedCrLf = true;
            return this.crlf.deserialize(pbis);
        }
    }

}

TcpNioConnectionSupport. 

public interface TcpNioConnectionSupport {

    TcpNioConnection createNewConnection(SocketChannel socketChannel,
            boolean server, boolean lookupHost,
            ApplicationEventPublisher applicationEventPublisher,
            String connectionFactoryName) throws Exception;

}

This interface is invoked to create TcpNioConnection objects (or objects from subclasses). Two implementations are provided DefaultTcpNioSSLConnectionSupport and DefaultTcpNioConnectionSupport which are used depending on whether SSL is in use or not. A common use case would be to subclass DefaultTcpNioSSLConnectionSupport and override postProcessSSLEngine; see the example below. As with the DefatulTcpNetConnectionSupport, these implementations also support push back.

32.11.2 Example: Enabling SSL Client Authentication

To enable client certificate authentication when using SSL, the technique depends on whether NIO is in use or not. When NIO is not being used, provide a custom TcpSocketSupport implementation to post-process the server socket:

serverFactory.setTcpSocketSupport(new DefaultTcpSocketSupport() {

    @Override
    public void postProcessServerSocket(ServerSocket serverSocket) {
        ((SSLServerSocket) serverSocket).setNeedClientAuth(true);
    }

});

(When using XML configuration, provide a reference to your bean using the socket-support attribute).

When using NIO, provide a custom TcpNioSslConnectionSupport implementation to post-process the SSLEngine.

@Bean
public DefaultTcpNioSSLConnectionSupport tcpNioConnectionSupport() {
    return new DefaultTcpNioSSLConnectionSupport(serverSslContextSupport) {

            @Override
            protected void postProcessSSLEngine(SSLEngine sslEngine) {
                sslEngine.setNeedClientAuth(true);
            }

    }
}

@Bean
public TcpNioServerConnectionFactory server() {
    ...
    serverFactory.setTcpNioConnectionSupport(tcpNioConnectionSupport());
    ...
}

(When using XML configuration, since version 4.3.7, provide a reference to your bean using the nio-connection-support attribute).

32.12 IP Configuration Attributes

Table 32.1. Connection Factory Attributes

Attribute NameClient?Server?Allowed ValuesAttribute Description

type

Y

Y

client, server

Determines whether the connection factory is a client or server.

host

Y

N

 

The host name or ip address of the destination.

port

Y

Y

 

The port.

serializer

Y

Y

 

An implementation of Serializer used to serialize the payload. Defaults to ByteArrayCrLfSerializer

deserializer

Y

Y

 

An implementation of Deserializer used to deserialize the payload. Defaults to ByteArrayCrLfSerializer

using-nio

Y

Y

true, false

Whether or not connection uses NIO. Refer to the java.nio package for more information. See Section 32.9, “A Note About NIO”. Default false.

using-direct-buffers

Y

N

true, false

When using NIO, whether or not the connection uses direct buffers. Refer to java.nio.ByteBuffer documentation for more information. Must be false if using-nio is false.

apply-sequence

Y

Y

true, false

When using NIO, it may be necessary to resequence messages. When this attribute is set to true, correlationId and sequenceNumber headers will be added to received messages. See Section 32.9, “A Note About NIO”. Default false.

so-timeout

Y

Y

 

Defaults to 0 (infinity), except for server connection factories with single-use="true". In that case, it defaults to the default reply timeout (10 seconds).

so-send-buffer-size

Y

Y

 

See java.net.Socket. setSendBufferSize().

so-receive-buffer- size

Y

Y

 

See java.net.Socket. setReceiveBufferSize().

so-keep-alive

Y

Y

true, false

See java.net.Socket. setKeepAlive().

so-linger

Y

Y

 

Sets linger to true with supplied value. See java.net.Socket. setSoLinger().

so-tcp-no-delay

Y

Y

true, false

See java.net.Socket. setTcpNoDelay().

so-traffic-class

Y

Y

 

See java.net.Socket. setTrafficClass().

local-address

N

Y

 

On a multi-homed system, specifies an IP address for the interface to which the socket will be bound.

task-executor

Y

Y

 

Specifies a specific Executor to be used for socket handling. If not supplied, an internal cached thread executor will be used. Needed on some platforms that require the use of specific task executors such as a WorkManagerTaskExecutor.

single-use

Y

Y

true, false

Specifies whether a connection can be used for multiple messages. If true, a new connection will be used for each message.

pool-size

N

N

 

This attribute is no longer used. For backward compatibility, it sets the backlog but users should use backlog to specify the connection backlog in server factories

backlog

N

Y

 

Sets the connection backlog for server factories.

lookup-host

Y

Y

true, false

Specifies whether reverse lookups are done on IP addresses to convert to host names for use in message headers. If false, the IP address is used instead. Defaults to true.

interceptor-factory-chain

Y

Y

 

See Section 32.4, “TCP Connection Interceptors”

ssl-context-support

Y

Y

 

See Section 32.10, “SSL/TLS Support”

socket-factory-support

Y

Y

 

See Section 32.10, “SSL/TLS Support”

socket-support

Y

Y

 

See Section 32.10, “SSL/TLS Support”

nio-connection-support

Y

Y

 

See Section 32.11, “Advanced Techniques”

read-delay

Y

Y

long > 0

The delay (in milliseconds) before retrying a read after the previous attempt failed due to insufficient threads. Default 100. Only applies if using-nio is true.


Table 32.2. UDP Inbound Channel Adapter Attributes

Attribute NameAllowed ValuesAttribute Description

port

 

The port on which the adapter listens.

multicast

true, false

Whether or not the udp adapter uses multicast.

multicast-address

 

When multicast is true, the multicast address to which the adapter joins.

pool-size

 

Specifies the concurrency. Specifies how many packets can be handled concurrently. It only applies if task-executor is not configured. Defaults to 5.

task-executor

 

Specifies a specific Executor to be used for socket handling. If not supplied, an internal pooled executor will be used. Needed on some platforms that require the use of specific task executors such as a WorkManagerTaskExecutor. See pool-size for thread requirements.

receive-buffer-size

 

The size of the buffer used to receive DatagramPackets. Usually set to the MTU size. If a smaller buffer is used than the size of the sent packet, truncation can occur. This can be detected by means of the check-length attribute..

check-length

true, false

Whether or not a udp adapter expects a data length field in the packet received. Used to detect packet truncation.

so-timeout

 

See java.net.DatagramSocket setSoTimeout() methods for more information.

so-send-buffer-size

 

Used for udp acknowledgment packets. See java.net.DatagramSocket setSendBufferSize() methods for more information.

so-receive-buffer- size

 

See java.net.DatagramSocket setReceiveBufferSize() for more information.

local-address

 

On a multi-homed system, specifies an IP address for the interface to which the socket will be bound.

error-channel

 

If an Exception is thrown by a downstream component, the MessagingException message containing the exception and failed message is sent to this channel.

lookup-host

true, false

Specifies whether reverse lookups are done on IP addresses to convert to host names for use in message headers. If false, the IP address is used instead. Defaults to true.


Table 32.3. UDP Outbound Channel Adapter Attributes

Attribute NameAllowed ValuesAttribute Description

host

 

The host name or ip address of the destination. For multicast udp adapters, the multicast address.

port

 

The port on the destination.

multicast

true, false

Whether or not the udp adapter uses multicast.

acknowledge

true, false

Whether or not a udp adapter requires an acknowledgment from the destination. when enabled, requires setting the following 4 attributes.

ack-host

 

When acknowledge is true, indicates the host or ip address to which the acknowledgment should be sent. Usually the current host, but may be different, for example when Network Address Transaction (NAT) is being used.

ack-port

 

When acknowledge is true, indicates the port to which the acknowledgment should be sent. The adapter listens on this port for acknowledgments.

ack-timeout

 

When acknowledge is true, indicates the time in milliseconds that the adapter will wait for an acknowledgment. If an acknowledgment is not received in time, the adapter will throw an exception.

min-acks-for- success

 

Defaults to 1. For multicast adapters, you can set this to a larger value, requiring acknowledgments from multiple destinations.

check-length

true, false

Whether or not a udp adapter includes a data length field in the packet sent to the destination.

time-to-live

 

For multicast adapters, specifies the time to live attribute for the MulticastSocket; controls the scope of the multicasts. Refer to the Java API documentation for more information.

so-timeout

 

See java.net.DatagramSocket setSoTimeout() methods for more information.

so-send-buffer-size

 

See java.net.DatagramSocket setSendBufferSize() methods for more information.

so-receive-buffer- size

 

Used for udp acknowledgment packets. See java.net.DatagramSocket setReceiveBufferSize() methods for more information.

local-address

 

On a multi-homed system, for the UDP adapter, specifies an IP address for the interface to which the socket will be bound for reply messages. For a multicast adapter it is also used to determine which interface the multicast packets will be sent over.

task-executor

 

Specifies a specific Executor to be used for acknowledgment handling. If not supplied, an internal single threaded executor will be used. Needed on some platforms that require the use of specific task executors such as a WorkManagerTaskExecutor. One thread will be dedicated to handling acknowledgments (if the acknowledge option is true).

destination-expression

SpEL expression

A SpEL expression to be evaluated to determine which SocketAddress to use as a destination address for the outgoing UDP packets.

socket-expression

SpEL expression

A SpEL expression to be evaluated to determine which datagram socket use for sending outgoing UDP packets.


Table 32.4. TCP Inbound Channel Adapter Attributes

Attribute NameAllowed ValuesAttribute Description

channel

 

The channel to which inbound messages will be sent.

connection-factory

 

If the connection factory has a type server, the factory is owned by this adapter. If it has a type client, it is owned by an outbound channel adapter and this adapter will receive any incoming messages on the connection created by the outbound adapter.

error-channel

 

If an Exception is thrown by a downstream component, the MessagingException message containing the exception and failed message is sent to this channel.

client-mode

true, false

When true, the inbound adapter will act as a client, with respect to establishing the connection and then receive incoming messages on that connection. Default = false. Also see retry-interval and scheduler. The connection factory must be of type client and have single-use set to false.

retry-interval

 

When in client-mode, specifies the number of milliseconds to wait between connection attempts, or after a connection failure. Default 60,000 (60 seconds).

scheduler

true, false

Specifies a TaskScheduler to use for managing the client-mode connection. Defaults to a ThreadPoolTaskScheduler with a pool size of `.


Table 32.5. TCP Outbound Channel Adapter Attributes

Attribute NameAllowed ValuesAttribute Description

channel

 

The channel on which outbound messages arrive.

connection-factory

 

If the connection factory has a type client, the factory is owned by this adapter. If it has a type server, it is owned by an inbound channel adapter and this adapter will attempt to correlate messages to the connection on which an original inbound message was received.

client-mode

true, false

When true, the outbound adapter will attempt to establish the connection as soon as it is started. When false, the connection is established when the first message is sent. Default = false. Also see retry-interval and scheduler. The connection factory must be of type client and have single-use set to false.

retry-interval

 

When in client-mode, specifies the number of milliseconds to wait between connection attempts, or after a connection failure. Default 60,000 (60 seconds).

scheduler

true, false

Specifies a TaskScheduler to use for managing the client-mode connection. Defaults to a ThreadPoolTaskScheduler with a pool size of `.


Table 32.6. TCP Inbound Gateway Attributes

Attribute NameAllowed ValuesAttribute Description

connection-factory

 

The connection factory must be of type server.

request-channel

 

The channel to which incoming messages will be sent.

reply-channel

 

The channel on which reply messages may arrive. Usually replies will arrive on a temporary reply channel added to the inbound message header

reply-timeout

 

The time in milliseconds for which the gateway will wait for a reply. Default 1000 (1 second).

error-channel

 

If an Exception is thrown by a downstream component, the MessagingException message containing the exception and failed message is sent to this channel; any reply from that flow will then be returned as a response by the gateway.

client-mode

true, false

When true, the inbound gateway will act as a client, with respect to establishing the connection and then receive (and reply to) incoming messages on that connection. Default = false. Also see retry-interval and scheduler. The connection factory must be of type client and have single-use set to false.

retry-interval

 

When in client-mode, specifies the number of milliseconds to wait between connection attempts, or after a connection failure. Default 60,000 (60 seconds).

scheduler

true, false

Specifies a TaskScheduler to use for managing the client-mode connection. Defaults to a ThreadPoolTaskScheduler with a pool size of `.


Table 32.7. TCP Outbound Gateway Attributes

Attribute NameAllowed ValuesAttribute Description

connection-factory

 

The connection factory must be of type client.

request-channel

 

The channel on which outgoing messages will arrive.

reply-channel

 

Optional. The channel to which reply messages may be sent.

remote-timeout

 

The time in milliseconds for which the gateway will wait for a reply from the remote system. Mutually exclusive with remote-timeout-expression. Default: 10000 (10 seconds). Note: in versions prior to 4.2 this value defaulted to reply-timeout (if set).

remote-timeout-expression

 

A SpEL expression, evaluated against the message to determine the time in milliseconds for which the gateway will wait for a reply from the remote system. Mutually exclusive with remote-timeout.

request-timeout

 

If a single-use connection factory is not being used, The time in milliseconds for which the gateway will wait to get access to the shared connection.

reply-timeout

 

The time in milliseconds for which the gateway will wait when sending the reply to the reply-channel. Only applies if the reply-channel might block, such as a bounded QueueChannel that is currently full.


32.13 IP Message Headers

IP Message Headers.  The following MessageHeader s are used by this module:

Header NameIpHeaders ConstantDescription

ip_hostname

HOSTNAME

The host name from which a TCP message or UDP packet was received. If lookupHost is false, this will contain the ip address.

ip_address

IP_ADDRESS

The ip address from which a TCP message or UDP packet was received.

ip_port

PORT

The remote port for a UDP packet.

ip_localInetAddress

IP_LOCAL_ADDRESS

The local InetAddress to which the socket is connected (since version 4.2.5).

ip_ackTo

ACKADDRESS

The remote ip address to which UDP application-level acks will be sent. The framework includes acknowledgment information in the data packet.

ip_ackId

ACK_ID

A correlation id for UDP application-level acks. The framework includes acknowledgment information in the data packet.

ip_tcp_remotePort

REMOTE_PORT

The remote port for a TCP connection.

ip_connectionId

CONNECTION_ID

A unique identifier for a TCP connection; set by the framework for inbound messages; when sending to a server-side inbound channel adapter, or replying to an inbound gateway, this header is required so the endpoint can determine which connection to send the message to.

ip_actualConnectionId

ACTUAL_ CONNECTION_ID

For information only - when using a cached or failover client connection factory, contains the actual underlying connection id.

contentType

MessageHeaders. CONTENT_TYPE

An optional content type for inbound messages; see below. Note that, unlike the other header constants, this constant is in the class MessageHeaders not IpHeaders.

For inbound messages, ip_hostname, ip_address, ip_tcp_remotePort and ip_connectionId are mapped by the default TcpHeaderMapper. Set the mapper’s addContentTypeHeader property to true and the mapper will set the contentType header (application/octet-stream;charset="UTF-8") by default. You can change the default by setting the contentType property. Users can add additional headers by subclassing TcpHeaderMapper and overriding the method supplyCustomHeaders. For example, when using SSL, properties of the SSLSession can be added by obtaining the session object from the TcpConnection object which is provided as an argument to the supplyCustomHeaders method.

For outbound messages, String payloads are converted to byte[] using the default (UTF-8) charset. Set the charset property to change the default.

When customizing the mapper properties, or subclassing, declare the mapper as a bean and provide an instance to the connection factory using the mapper property

32.14 Annotation-Based Configuration

The following example from the samples repository is used to illustrate some of the configuration options when using annotations instead of XML.

@EnableIntegration 1
@IntegrationComponentScan 2
@Configuration
public static class Config {

    @Value(${some.port})
    private int port;

    @MessagingGateway(defaultRequestChannel="toTcp") 3
    public interface Gateway {

        String viaTcp(String in);

    }

    @Bean
    @ServiceActivator(inputChannel="toTcp") 4
    public MessageHandler tcpOutGate(AbstractClientConnectionFactory connectionFactory) {
        TcpOutboundGateway gate = new TcpOutboundGateway();
        gate.setConnectionFactory(connectionFactory);
        gate.setOutputChannelName("resultToString");
        return gate;
    }

    @Bean 5
    public TcpInboundGateway tcpInGate(AbstractServerConnectionFactory connectionFactory)  {
        TcpInboundGateway inGate = new TcpInboundGateway();
        inGate.setConnectionFactory(connectionFactory);
        inGate.setRequestChannel(fromTcp());
        return inGate;
    }

    @Bean
    public MessageChannel fromTcp() {
        return new DirectChannel();
    }

    @MessageEndpoint
    public static class Echo { 6

        @Transformer(inputChannel="fromTcp", outputChannel="toEcho")
        public String convert(byte[] bytes) {
            return new String(bytes);
        }

        @ServiceActivator(inputChannel="toEcho")
        public String upCase(String in) {
            return in.toUpperCase();
        }

        @Transformer(inputChannel="resultToString")
        public String convertResult(byte[] bytes) {
            return new String(bytes);
        }

    }

    @Bean
    public AbstractClientConnectionFactory clientCF() { 7
        return new TcpNetClientConnectionFactory("localhost", this.port);
    }

    @Bean
    public AbstractServerConnectionFactory serverCF() { 8
        return new TcpNetServerConnectionFactory(this.port);
    }

}

1

Standard Spring Integration annotation enabling the infrastructure for an integration application.

2

Searches for @MessagingGateway interfaces.

3

The entry point to the client-side of the flow. The calling application can @Autowired this Gateway bean and invoke its method.

4

Outbound endpoints consist of a MessageHandler and a consumer that wraps it. In this scenario, the @ServiceActivator configures the endpoint according to the channel type.

5

Inbound endpoints (in the TCP/UDP module) are all message-driven so just need to be declared as simple @Bean s.

6

This class provides a number of POJO methods for use in this sample flow (a @Transformer and @ServiceActivator on the server side, and a @Transformer on the client side).

7

The client-side connection factory.

8

The server-side connection factory.