TCP Connection Factories
Overview
For TCP, the configuration of the underlying connection is provided by using a connection factory. Two types of connection factory are provided: a client connection factory and a server connection factory. Client connection factories establish outgoing connections. Server connection factories listen for incoming connections.
An outbound channel adapter uses a client connection factory, but you can also provide a reference to a client connection factory to an inbound channel adapter. That adapter receives any incoming messages that are received on connections created by the outbound adapter.
An inbound channel adapter or gateway uses a server connection factory. (In fact, the connection factory cannot function without one). You can also provide a reference to a server connection factory to an outbound adapter. You can then use that adapter to send replies to incoming messages on the same connection.
Reply messages are routed to the connection only if the reply contains the ip_connectionId header that was inserted into the original message by the connection factory.
|
This is the extent of message correlation performed when sharing connection factories between inbound and outbound adapters. Such sharing allows for asynchronous two-way communication over TCP. By default, only payload information is transferred using TCP. Therefore, any message correlation must be performed by downstream components such as aggregators or other endpoints. Support for transferring selected headers was introduced in version 3.0. For more information, see TCP Message Correlation. |
You may give a reference to a connection factory to a maximum of one adapter of each type.
Spring Integration provides connection factories that use java.net.Socket
and java.nio.channel.SocketChannel
.
The following example shows a simple server connection factory that uses java.net.Socket
connections:
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"/>
The following example shows a simple server connection factory that uses java.nio.channel.SocketChannel
connections:
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
using-nio="true"/>
Starting with Spring Integration version 4.2, if the server is configured to listen on a random port (by setting the port to 0 ), you can get the actual port chosen by the OS by using getPort() .
Also, getServerSocketAddress() lets you get the complete SocketAddress .
See the Javadoc for the TcpServerConnectionFactory interface for more information.
|
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"/>
The following example shows a client connection factory that uses java.net.Socket
connections and creates a new connection for each message:
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"
using-nio=true/>
Starting with version 5.2, the client connection factories support the property connectTimeout
, specified in seconds, which defaults to 60.
Message Demarcation (Serializers and Deserializers)
TCP is a streaming protocol. This means that some structure has to be provided to data transported over TCP so that the receiver can demarcate the data into discrete messages. Connection factories are configured to use serializers and deserializers to convert between the message payload and the bits that are sent over TCP. This is accomplished by providing a deserializer and a serializer for inbound and outbound messages, respectively. Spring Integration provides a number of standard serializers and deserializers.
ByteArrayCrlfSerializer
* converts a byte array to a stream of bytes followed by carriage return and linefeed characters (\r\n
).
This is the default serializer (and deserializer) and can be used (for example) with telnet as a client.
The ByteArraySingleTerminatorSerializer
* converts a byte array to a stream of bytes followed by a single termination character (the default is 0x00
).
The ByteArrayLfSerializer
* converts a byte array to a stream of bytes followed by a single linefeed character (0x0a
).
The ByteArrayStxEtxSerializer
* converts a byte array to a stream of bytes preceded by an STX (0x02
) and followed by an ETX (0x03
).
The ByteArrayLengthHeaderSerializer
converts a byte array to a stream of bytes preceded by a binary length in network byte order (big endian).
This an efficient deserializer because it does not have to parse every byte to look for a termination character sequence.
It can also be used for payloads that contain binary data.
The preceding serializers support only text in the payload.
The default size of the length header is four bytes (an Integer), allowing for messages up to (2^31 - 1) bytes.
However, the length
header can be a single byte (unsigned) for messages up to 255 bytes, or an unsigned short (2 bytes) for messages up to (2^16 - 1) bytes.
If you need any other format for the header, you can subclass ByteArrayLengthHeaderSerializer
and provide implementations for the readHeader
and writeHeader
methods.
The absolute maximum data size is (2^31 - 1) bytes.
Starting with version 5.2, the header value can include the length of the header in addition to the payload.
Set the inclusive
property to enable that mechanism (it must be set to the same for producers and consumers).
The ByteArrayRawSerializer
*, converts a byte array to a stream of bytes and adds no additional message demarcation data.
With this serializer (and deserializer), the end of a message is indicated by the client closing the socket in an orderly fashion.
When using this serializer, message reception hangs until the client closes the socket or a timeout occurs.
A timeout does not result in a message.
When this serializer is being used and the client is a Spring Integration application, the client must use a connection factory that is configured with single-use="true"
.
Doing so causes the adapter to close the socket after sending the message.
The serializer does not, by itself, close the connection.
You should use this serializer only with the connection factories used by channel adapters (not gateways), and the connection factories should be used by either an inbound or outbound adapter but not both.
See also ByteArrayElasticRawDeserializer
, later in this section.
However, since version 5.2, the outbound gateway has a new property closeStreamAfterSend
; this allows the use of raw serializers/deserializers because the EOF is signaled to the server, while leaving the connection open to receive the reply.
Before version 4.2.2, when using non-blocking I/O (NIO), this serializer treated a timeout (during read) as an end of file, and the data read so far was emitted as a message.
This is unreliable and should not be used to delimit messages.
It now treats such conditions as an exception.
In the unlikely event that you use it this way, you can restore the previous behavior by setting the treatTimeoutAsEndOfMessage constructor argument to true .
|
Each of these is a subclass of AbstractByteArraySerializer
, which implements both org.springframework.core.serializer.Serializer
and org.springframework.core.serializer.Deserializer
.
For backwards compatibility, connections that use any subclass of AbstractByteArraySerializer
for serialization also accept a String
that is first converted to a byte array.
Each of these serializers and deserializers converts an input stream that contains the corresponding format to a byte array payload.
To avoid memory exhaustion due to a badly behaved client (one that does not adhere to the protocol of the configured serializer), these serializers impose a maximum message size.
If an incoming message exceeds this size, an exception is thrown.
The default maximum message size is 2048 bytes.
You can increase it by setting the maxMessageSize
property.
If you use the default serializer or deserializer and wish to increase the maximum message size, you must declare the maximum message size as an explicit bean with the maxMessageSize
property set and configure the connection factory to use that bean.
The classes marked with * earlier in this section use an intermediate buffer and copy the decoded data to a final buffer of the correct size.
Starting with version 4.3, you can configure these buffers by setting a poolSize
property to let these raw buffers be reused instead of being allocated and discarded for each message, which is the default behavior.
Setting the property to a negative value creates a pool that has no bounds.
If the pool is bounded, you can also set the poolWaitTimeout
property (in milliseconds), after which an exception is thrown if no buffer becomes available.
It defaults to infinity.
Such an exception causes the socket to be closed.
If you wish to use the same mechanism in custom deserializers, you can extend AbstractPooledBufferByteArraySerializer
(instead of its super class, AbstractByteArraySerializer
) and implement doDeserialize()
instead of deserialize()
.
The buffer is automatically returned to the pool.
AbstractPooledBufferByteArraySerializer
also provides a convenient utility method: copyToSizedArray()
.
Version 5.0 added the ByteArrayElasticRawDeserializer
.
This is similar to the deserializer side of ByteArrayRawSerializer
above, except that it is not necessary to set a maxMessageSize
.
Internally, it uses a ByteArrayOutputStream
that lets the buffer grow as needed.
The client must close the socket in an orderly manner to signal end of message.
This deserializer should only be used when the peer is trusted; it is susceptible to a DoS attach due to out of memory conditions. |
The MapJsonSerializer
uses a Jackson ObjectMapper
to convert between a Map
and JSON.
You can use this serializer in conjunction with a MessageConvertingTcpMessageMapper
and a MapMessageConverter
to transfer selected headers and the payload in JSON.
The Jackson ObjectMapper cannot demarcate messages in the stream.
Therefore, the MapJsonSerializer needs to delegate to another serializer or deserializer to handle message demarcation.
By default, a ByteArrayLfSerializer is used, resulting in messages with a format of <json><LF> on the wire, but you can configure it to use others instead.
(The next example shows how to do so.)
|
The final standard serializer is org.springframework.core.serializer.DefaultSerializer
, which you can use to convert serializable objects with Java serialization.
org.springframework.core.serializer.DefaultDeserializer
is provided for inbound deserialization of streams that contain serializable objects.
If you do not wish to use the default serializer and deserializer (ByteArrayCrLfSerializer
), you must set the serializer
and deserializer
attributes on the connection factory.
The following example shows how to do so:
<bean id="javaSerializer"
class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
class="org.springframework.core.serializer.DefaultDeserializer" />
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
deserializer="javaDeserializer"
serializer="javaSerializer"/>
A server connection factory that uses java.net.Socket
connections and uses Java serialization on the wire.
For full details of the attributes available on connection factories, see the reference at the end of this section.
By default, reverse DNS lookups are not performed on inbound packets: in environments where DNS is not configured (e.g. Docker containers), this can cause connection delays.
To convert IP addresses to host names for use in message headers, the default behavior can be overridden by setting the lookup-host
attribute to true
.
You can also modify the attributes of sockets and socket factories. See SSL/TLS Support for more information. As noted there, such modifications are possible if SSL is being used, or not. |
Custom Serializers and Deserializers
If your data is not in a format supported by one of the standard deserializers, you can implement your own; you can also implement a custom serializer.
To implement a custom serializer and deserializer pair, implement the org.springframework.core.serializer.Deserializer
and org.springframework.core.serializer.Serializer
interfaces.
When the deserializer detects a closed input stream between messages, it must throw a SoftEndOfStreamException
; this is a signal to the framework to indicate that the close was "normal".
If the stream is closed while decoding a message, some other exception should be thrown instead.
Starting with version 5.2, SoftEndOfStreamException
is now a RuntimeException
instead of extending IOException
.
TCP Caching Client Connection Factory
As noted earlier, TCP sockets can be 'single-use' (one request or response) or shared. Shared sockets do not perform well with outbound gateways in high-volume environments, because the socket can only process one request or response at a time.
To improve performance, you can use collaborating channel adapters instead of gateways, but that requires application-level message correlation. See TCP Message Correlation for more information.
Spring Integration 2.2 introduced a caching client connection factory, which uses a pool of shared sockets, letting a gateway process multiple concurrent requests with a pool of shared connections.
TCP Failover Client Connection Factory
You can configure a TCP connection factory that supports failover to one or more other servers. When sending a message, the factory iterates over all its configured factories until either the message can be sent or no connection can be found. Initially, the first factory in the configured list is used. If a connection subsequently fails, the next factory becomes the current factory. The following example shows how to configure a failover client connection factory:
<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
<constructor-arg>
<list>
<ref bean="clientFactory1"/>
<ref bean="clientFactory2"/>
</list>
</constructor-arg>
</bean>
When using the failover connection factory, the singleUse property must be consistent between the factory itself and the list of factories it is configured to use.
|
The connection factory has two properties related to failing back, when used with a shared connection (singleUse=false
):
-
refreshSharedInterval
-
closeOnRefresh
Consider the following scenario based on the above configuration:
Let’s say clientFactory1
cannot establish a connection but clientFactory2
can.
When the failCF
getConnection()
method is called after the refreshSharedInterval
has passed, we will again attempt to connect using clientFactory1
; if successful, the connection to clientFactory2
will be closed.
If closeOnRefresh
is false
, the "old" connection will remain open and may be reused in future if the first factory fails once more.
Set refreshSharedInterval
to only attempt to reconnect with the first factory after that time has expired; setting it to Long.MAX_VALUE
(default) if you only want to fail back to the first factory when the current connection fails.
Set closeOnRefresh
to close the "old" connection after a refresh actually creates a new connection.
These properties do not apply if any of the delegate factories is a CachingClientConnectionFactory because the connection caching is handled there; in that case the list of connection factories will always be consulted to get a connection.
|
Starting with version 5.3, these default to Long.MAX_VALUE
and true
so the factory only attempts to fail back when the current connection fails.
To revert to the default behavior of previous versions, set them to 0
and false
.
Also see Testing Connections.
TCP Thread Affinity Connection Factory
Spring Integration version 5.0 introduced this connection factory.
It binds a connection to the calling thread, and the same connection is reused each time that thread sends a message.
This continues until the connection is closed (by the server or the network) or until the thread calls the releaseConnection()
method.
The connections themselves are provided by another client factory implementation, which must be configured to provide non-shared (single-use) connections so that each thread gets a connection.
The following example shows how to configure a TCP thread affinity connection factory:
@Bean
public TcpNetClientConnectionFactory cf() {
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
Integer.parseInt(System.getProperty(PORT)));
cf.setSingleUse(true);
return cf;
}
@Bean
public ThreadAffinityClientConnectionFactory tacf() {
return new ThreadAffinityClientConnectionFactory(cf());
}
@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
TcpOutboundGateway outGate = new TcpOutboundGateway();
outGate.setConnectionFactory(tacf());
outGate.setReplyChannelName("toString");
return outGate;
}