Spring Integration provides channel adapters for receiving and sending messages over internet protocols. Both UDP (User Datagram Protocol) and TCP (Transmission Control Protocol) adapters are provided. Each adapter provides for one-way communication over the underlying protocol. In addition, Spring Integration provides simple inbound and outbound TCP gateways. These are used when two-way communication is needed.
You need to include this dependency into your project:
Maven.
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-ip</artifactId> <version>5.1.2.RELEASE</version> </dependency>
Gradle.
compile "org.springframework.integration:spring-integration-ip:5.1.2.RELEASE"
Two flavors each of UDP inbound and outbound channel adapters are provided:
UnicastSendingMessageHandler
sends a datagram packet to a single destination.
UnicastReceivingChannelAdapter
receives incoming datagram packets.
MulticastSendingMessageHandler
sends (broadcasts) datagram packets to a multicast address.
MulticastReceivingChannelAdapter
receives incoming datagram packets by joining to a multicast address.
TCP inbound and outbound channel adapters are provided:
TcpSendingMessageHandler
sends messages over TCP.
TcpReceivingChannelAdapter
receives messages over TCP.
An inbound TCP gateway is provided. It allows for simple request-response processing. While the gateway can support any number of connections, each connection can only be processed serially. The thread that reads from the socket waits for, and sends, the response before reading again. If the connection factory is configured for single use connections, the connection is closed after the socket times out.
An outbound TCP gateway is provided. It allows for simple request-response processing. If the associated connection factory is configured for single-use connections, a new connection is immediately created for each new request. Otherwise, if the connection is in use, the calling thread blocks on the connection until either a response is received or a timeout or I/O error occurs.
The TCP and UDP inbound channel adapters and the TCP inbound gateway support the error-channel
attribute.
This provides the same basic functionality as described in Section 10.4.1, “Enter the GatewayProxyFactoryBean
”.
This section describes how to configure and use the UDP adapters.
The following example configures a UDP outbound channel adapter:
<int-ip:udp-outbound-channel-adapter id="udpOut" host="somehost" port="11111" multicast="false" channel="exampleChannel"/>
Tip | |
---|---|
When setting |
UDP is an efficient but unreliable protocol.
Spring Integration adds two attributes to improve reliability: check-length
and acknowledge
.
When check-length
is set to true
, the adapter precedes the message data with a length field (four bytes in network byte order).
This enables the receiving side to verify the length of the packet received.
If a receiving system uses a buffer that is too short to contain the packet, the packet can be truncated.
The length
header provides a mechanism to detect this.
Starting with version 4.3, you can set the port
to 0
, in which case the operating system chooses the port.
The chosen port can be discovered by invoking getPort()
after the adapter is started and isListening()
returns true
.
The preceding example shows an outbound channel adapter that adds length checking to the datagram packets:
<int-ip:udp-outbound-channel-adapter id="udpOut" host="somehost" port="11111" multicast="false" check-length="true" channel="exampleChannel"/>
Tip | |
---|---|
The recipient of the packet must also be configured to expect a length to precede the actual data.
For a Spring Integration UDP inbound channel adapter, set its |
The second reliability improvement allows an application-level acknowledgment protocol to be used. The receiver must send an acknowledgment to the sender within a specified time.
The following example shows an outbound channel adapter that adds length checking to the datagram packets and waits for an acknowledgment:
<int-ip:udp-outbound-channel-adapter id="udpOut" host="somehost" port="11111" multicast="false" check-length="true" acknowledge="true" ack-host="thishost" ack-port="22222" ack-timeout="10000" channel="exampleChannel"/>
Tip | |
---|---|
Setting |
Tip | |
---|---|
When multicast is true, an additional attribute ( |
Starting with version 4.3, you can set the ackPort
to 0
, in which case the operating system chooses the port.
The following example shows how to configure an outbound UDP adapter with Java:
@Bean @ServiceActivator(inputChannel = "udpOut") public UnicastSendingMessageHandler handler() { return new UnicastSendingMessageHandler("localhost", 11111); }
(or MulticastSendingChannelAdapter
for multicast).
The following example shows how to configure an outbound UDP adapter with the Java DSL:
@Bean public IntegrationFlow udpOutFlow() { return IntegrationFlows.from("udpOut") .handle(Udp.outboundAdapter("localhost", 1234)) .get(); }
The following example shows how to configure a basic unicast inbound udp channel adapter.
<int-ip:udp-inbound-channel-adapter id="udpReceiver" channel="udpOutChannel" port="11111" receive-buffer-size="500" multicast="false" check-length="true"/>
The following example shows how to configure a basic multicast inbound udp channel adapter:
<int-ip:udp-inbound-channel-adapter id="udpReceiver" channel="udpOutChannel" port="11111" receive-buffer-size="500" multicast="true" multicast-address="225.6.7.8" check-length="true"/>
By default, reverse DNS lookups are done on inbound packets to convert IP addresses to hostnames for use in message headers.
In environments where DNS is not configured, this can cause delays.
You can override this default behavior by setting the lookup-host
attribute to false
.
The following example shows how to configure an inbound UDP adapter with Java:
@Bean public UnicastReceivingChannelAdapter udpIn() { UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(11111); adapter.setOutputChannelName("udpChannel"); return adapter; }
The following example shows how to configure an inbound UDP adapter with the Java DSL:
@Bean public IntegrationFlow udpIn() { return IntegrationFlows.from(Udp.inboundAdapter(11111)) .channel("udpChannel") .get(); }
Starting with version 5.0.2, a UdpServerListeningEvent
is emitted when an inbound adapter is started and has begun listening.
This is useful when the adapter is configured to listen on port 0, meaning that the operating system chooses the port.
It can also be used instead of polling isListening()
, if you need to wait before starting some other process that will
connect to the socket.
The <int-ip:udp-outbound-channel-adapter>
(UnicastSendingMessageHandler
) has destination-expression
and socket-expression
options.
You can use the destination-expression
as a runtime alternative to the hardcoded host
-port
pair to determine the destination address for the outgoing datagram packet against a requestMessage
(with the root object for the evaluation context).
The expression must evaluate to an URI
, a String
in the URI style (see RFC-2396), or a SocketAddress
.
You can also use the inbound IpHeaders.PACKET_ADDRESS
header for this expression.
In the framework, the DatagramPacketMessageMapper
populates this header when we receive datagrams in the UnicastReceivingChannelAdapter
and convert them to messages.
The header value is exactly the result of DatagramPacket.getSocketAddress()
of the incoming datagram.
With the socket-expression
, the outbound channel adapter can use (for example) an inbound channel adapter socket to send datagrams through the same port which they were received.
It is useful in a scenario where our application works as a UDP server and clients operate behind network address translation (NAT).
This expression must evaluate to a DatagramSocket
.
The requestMessage
is used as the root object for the evaluation context.
You cannot use the socket-expression
parameter with the multicast
and acknowledge
parameters.
The following example shows how to configure a UDP inbound channel adapter with a transformer that converts to upper case and uses a socket:
<int-ip:udp-inbound-channel-adapter id="inbound" port="0" channel="in" /> <int:channel id="in" /> <int:transformer expression="new String(payload).toUpperCase()" input-channel="in" output-channel="out"/> <int:channel id="out" /> <int-ip:udp-outbound-channel-adapter id="outbound" socket-expression="@inbound.socket" destination-expression="headers['ip_packetAddress']" channel="out" />
The following example shows the equivalent configuration with the Java DSL:
@Bean public IntegrationFlow udpEchoUpcaseServer() { return IntegrationFlows.from(Udp.inboundAdapter(11111).id("udpIn")) .<byte[], String>transform(p -> new String(p).toUpperCase()) .handle(Udp.outboundAdapter("headers['ip_packetAddress']") .socketExpression("@udpIn.socket")) .get(); }
For TCP, the configuration of the underlying connection is provided by using a connection factory. Two types of connection factory are provided: a client connection factory and a server connection factory. Client connection factories establish outgoing connections. Server connection factories listen for incoming connections.
An outbound channel adapter uses a client connection factory, but you can also provide a reference to a client connection factory to an inbound channel adapter. That adapter receives any incoming messages that are received on connections created by the outbound adapter.
An inbound channel adapter or gateway uses a server connection factory. (In fact, the connection factory cannot function without one). You can also provide a reference to a server connection factory to an outbound adapter. You can then use that adapter to send replies to incoming messages on the same connection.
Tip | |
---|---|
Reply messages are routed to the connection only if the reply contains the |
Tip | |
---|---|
This is the extent of message correlation performed when sharing connection factories between inbound and outbound adapters. Such sharing allows for asynchronous two-way communication over TCP. By default, only payload information is transferred using TCP. Therefore, any message correlation must be performed by downstream components such as aggregators or other endpoints. Support for transferring selected headers was introduced in version 3.0. For more information, see Section 34.8, “TCP Message Correlation”. |
You may give a reference to a connection factory to a maximum of one adapter of each type.
Spring Integration provides connection factories that use java.net.Socket
and java.nio.channel.SocketChannel
.
The following example shows a simple server connection factory that uses java.net.Socket
connections:
<int-ip:tcp-connection-factory id="server" type="server" port="1234"/>
The following example shows a simple server connection factory that uses java.nio.channel.SocketChannel
connections:
<int-ip:tcp-connection-factory id="server" type="server" port="1234" using-nio="true"/>
Note | |
---|---|
Starting with Spring Integration version 4.2, if the server is configured to listen on a random port (by setting the port to |
<int-ip:tcp-connection-factory id="client" type="client" host="localhost" port="1234" single-use="true" so-timeout="10000"/>
The following example shows a client connection factory that uses java.net.Socket
connections and creates a new connection for each message:
<int-ip:tcp-connection-factory id="client" type="client" host="localhost" port="1234" single-use="true" so-timeout="10000" using-nio=true/>
TCP is a streaming protocol. This means that some structure has to be provided to data transported over TCP so that the receiver can demarcate the data into discrete messages. Connection factories are configured to use serializers and deserializers to convert between the message payload and the bits that are sent over TCP. This is accomplished by providing a deserializer and a serializer for inbound and outbound messages, respectively. Spring Integration provides a number of standard serializers and deserializers.
ByteArrayCrlfSerializer
* converts a byte array to a stream of bytes followed by carriage return and linefeed characters (\r\n
).
This is the default serializer (and deserializer) and can be used (for example) with telnet as a client.
The ByteArraySingleTerminatorSerializer
* converts a byte array to a stream of bytes followed by a single termination character (the default is 0x00
).
The ByteArrayLfSerializer
* converts a byte array to a stream of bytes followed by a single linefeed character (0x0a
).
The ByteArrayStxEtxSerializer
* converts a byte array to a stream of bytes preceded by an STX (0x02
) and followed by an ETX (0x03
).
The ByteArrayLengthHeaderSerializer
converts a byte array to a stream of bytes preceded by a binary length in network byte order (big endian).
This an efficient deserializer because it does not have to parse every byte to look for a termination character sequence.
It can also be used for payloads that contain binary data.
The preceding serializers support only text in the payload.
The default size of the length header is four bytes (an Integer), allowing for messages up to (2^31 - 1) bytes.
However, the length
header can be a single byte (unsigned) for messages up to 255 bytes, or an unsigned short (2 bytes) for messages up to (2^16 - 1) bytes.
If you need any other format for the header, you can subclass ByteArrayLengthHeaderSerializer
and provide implementations for the readHeader
and writeHeader
methods.
The absolute maximum data size is (2^31 - 1) bytes.
The ByteArrayRawSerializer
*, converts a byte array to a stream of bytes and adds no additional message demarcation data.
With this serializer (and deserializer), the end of a message is indicated by the client closing the socket in an orderly fashion.
When using this serializer, message reception hangs until the client closes the socket or a timeout occurs.
A timeout does not result in a message.
When this serializer is being used and the client is a Spring Integration application, the client must use a connection factory that is configured with single-use="true"
.
Doing so causes the adapter to close the socket after sending the message.
The serializer does not, by itself, close the connection.
You should use this serializer only with the connection factories used by channel adapters (not gateways), and the connection factories should be used by either an inbound or outbound adapter but not both.
See also ByteArrayElasticRawDeserializer
, later in this section.
Note | |
---|---|
Before version 4.2.2, when using non-blocking I/O (NIO), this serializer treated a timeout (during read) as an end of file, and the data read so far was emitted as a message.
This is unreliable and should not be used to delimit messages.
It now treats such conditions as an exception.
In the unlikely event that you use it this way, you can restore the previous behavior by setting the |
Each of these is a subclass of AbstractByteArraySerializer
, which implements both org.springframework.core.serializer.Serializer
and org.springframework.core.serializer.Deserializer
.
For backwards compatibility, connections that use any subclass of AbstractByteArraySerializer
for serialization also accept a String
that is first converted to a byte array.
Each of these serializers and deserializers converts an input stream that contains the corresponding format to a byte array payload.
To avoid memory exhaustion due to a badly behaved client (one that does not adhere to the protocol of the configured serializer), these serializers impose a maximum message size.
If an incoming message exceeds this size, an exception is thrown.
The default maximum message size is 2048 bytes.
You can increase it by setting the maxMessageSize
property.
If you use the default serializer or deserializer and wish to increase the maximum message size, you must declare the maximum message size as an explicit bean with the maxMessageSize
property set and configure the connection factory to use that bean.
The classes marked with * earlier in this section use an intermediate buffer and copy the decoded data to a final buffer of the correct
size.
Starting with version 4.3, you can configure these buffers by setting a poolSize
property to let these raw buffers be reused instead of being allocated and discarded for each message, which is the default behavior.
Setting the property to a negative value creates a pool that has no bounds.
If the pool is bounded, you can also set the poolWaitTimeout
property (in milliseconds), after which an exception is thrown if no buffer becomes available.
It defaults to infinity.
Such an exception causes the socket to be closed.
If you wish to use the same mechanism in custom deserializers, you can extend AbstractPooledBufferByteArraySerializer
(instead of its super class, AbstractByteArraySerializer
) and implement doDeserialize()
instead of deserialize()
.
The buffer is automatically returned to the pool.
AbstractPooledBufferByteArraySerializer
also provides a convenient utility method: copyToSizedArray()
.
Version 5.0 added the ByteArrayElasticRawDeserializer
.
This is similar to the deserializer side of ByteArrayRawSerializer
above, except that it is not necessary to set a maxMessageSize
.
Internally, it uses a ByteArrayOutputStream
that lets the buffer grow as needed.
The client must close the socket in an orderly manner to signal end of message.
The MapJsonSerializer
uses a Jackson ObjectMapper
to convert between a Map
and JSON.
You can use this serializer in conjunction with a MessageConvertingTcpMessageMapper
and a MapMessageConverter
to transfer selected headers and the payload in JSON.
Note | |
---|---|
The Jackson |
The final standard serializer is org.springframework.core.serializer.DefaultSerializer
, which you can use to convert serializable objects with Java serialization.
org.springframework.core.serializer.DefaultDeserializer
is provided for inbound deserialization of streams that contain serializable objects.
To implement a custom serializer and deserializer pair, implement the org.springframework.core.serializer.Deserializer
and org.springframework.core.serializer.Serializer
interfaces.
If you do not wish to use the default serializer and deserializer (ByteArrayCrLfSerializer
), you must set the serializer
and deserializer
attributes on the connection factory.
The following example shows how to do so:
<bean id="javaSerializer" class="org.springframework.core.serializer.DefaultSerializer" /> <bean id="javaDeserializer" class="org.springframework.core.serializer.DefaultDeserializer" /> <int-ip:tcp-connection-factory id="server" type="server" port="1234" deserializer="javaDeserializer" serializer="javaSerializer"/>
A server connection factory that uses java.net.Socket
connections and uses Java serialization on the wire.
For full details of the attributes available on connection factories, see the reference at the end of this section.
By default, reverse DNS lookups are done on inbound packets to convert IP addresses to hostnames for use in message headers.
In environments where DNS is not configured, this can cause connection delays.
You can override this default behavior by setting the lookup-host
attribute to false
.
Note | |
---|---|
You can also modify the attributes of sockets and socket factories. See Section 34.10, “SSL/TLS Support”. As noted there, such modifications are possible whether or not SSL is being used. |
As noted earlier, TCP sockets can be single-use (one request or response) or shared. Shared sockets do not perform well with outbound gateways in high-volume environments, because the socket can only process one request or response at a time.
To improve performance, you can use collaborating channel adapters instead of gateways, but that requires application-level message correlation. See Section 34.8, “TCP Message Correlation” for more information.
Spring Integration 2.2 introduced a caching client connection factory, which uses a pool of shared sockets, letting a gateway process multiple concurrent requests with a pool of shared connections.
You can configure a TCP connection factory that supports failover to one or more other servers. When sending a message, the factory iterates over all its configured factories until either the message can be sent or no connection can be found. Initially, the first factory in the configured list is used. If a connection subsequently fails, the next factory becomes the current factory. The following example shows how to configure a failover client connection factory:
<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory"> <constructor-arg> <list> <ref bean="clientFactory1"/> <ref bean="clientFactory2"/> </list> </constructor-arg> </bean>
Note | |
---|---|
When using the failover connection factory, the |
Spring Integration version 5.0 introduced this connection factory.
It binds a connection to the calling thread, and the same connection is reused each time that thread sends a message.
This continues until the connection is closed (by the server or the network) or until the thread calls the releaseConnection()
method.
The connections themselves are provided by another client factory implementation, which must be configured to provide non-shared (single-use) connections so that each thread gets a connection.
The following example shows how to configure a TCP thread affinity connection factory:
@Bean public TcpNetClientConnectionFactory cf() { TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost", Integer.parseInt(System.getProperty(PORT))); cf.setSingleUse(true); return cf; } @Bean public ThreadAffinityClientConnectionFactory tacf() { return new ThreadAffinityClientConnectionFactory(cf()); } @Bean @ServiceActivator(inputChannel = "out") public TcpOutboundGateway outGate() { TcpOutboundGateway outGate = new TcpOutboundGateway(); outGate.setConnectionFactory(tacf()); outGate.setReplyChannelName("toString"); return outGate; }
You can configure connection factories with a reference to a TcpConnectionInterceptorFactoryChain
.
You can use interceptors to add behavior to connections, such as negotiation, security, and other options.
No interceptors are currently provided by the framework, but see InterceptedSharedConnectionTests
in the source repository for an example.
The HelloWorldInterceptor
used in the test case works as follows:
The interceptor is first configured with a client connection factory. When the first message is sent over an intercepted connection, the interceptor sends Hello over the connection and expects to receive world!. When that occurs, the negotiation is complete and the original message is sent. Further messages that use the same connection are sent without any additional negotiation.
When configured with a server connection factory, the interceptor requires the first message to be Hello and, if it is, returns world!. Otherwise it throws an exception that causes the connection to be closed.
All TcpConnection
methods are intercepted.
Interceptor instances are created for each connection by an interceptor factory.
If an interceptor is stateful, the factory should create a new instance for each connection.
If there is no state, the same interceptor can wrap each connection.
Interceptor factories are added to the configuration of an interceptor factory chain, which you can provide to a connection factory by setting the interceptor-factory
attribute.
Interceptors must extend TcpConnectionInterceptorSupport
.
Factories must implement the TcpConnectionInterceptorFactory
interface.
TcpConnectionInterceptorSupport
has passthrough methods.
By extending this class, you only need to implement those methods you wish to intercept.
The following example shows how to configure a connection interceptor factory chain:
<bean id="helloWorldInterceptorFactory" class="o.s.i.ip.tcp.connection.TcpConnectionInterceptorFactoryChain"> <property name="interceptors"> <array> <bean class="o.s.i.ip.tcp.connection.HelloWorldInterceptorFactory"/> </array> </property> </bean> <int-ip:tcp-connection-factory id="server" type="server" port="12345" using-nio="true" single-use="true" interceptor-factory-chain="helloWorldInterceptorFactory"/> <int-ip:tcp-connection-factory id="client" type="client" host="localhost" port="12345" single-use="true" so-timeout="100000" using-nio="true" interceptor-factory-chain="helloWorldInterceptorFactory"/>
Beginning with version 3.0, changes to TcpConnection
instances are reported by TcpConnectionEvent
instances.
TcpConnectionEvent
is a subclass of ApplicationEvent
and can thus be received by any ApplicationListener
defined in the ApplicationContext
— for example an event inbound channel adapter.
TcpConnectionEvents
have the following properties:
connectionId
: The connection identifier, which you can use in a message header to send data to the connection.
connectionFactoryName
: The bean name of the connection factory to which the connection belongs.
throwable
: The Throwable
(for TcpConnectionExceptionEvent
events only).
source
: The TcpConnection
. You can use this, for example, to determine the remote IP Address with getHostAddress()
(cast required).
In addition, since version 4.0, the standard deserializers discussed in Section 34.3, “TCP Connection Factories” now emit TcpDeserializationExceptionEvent
instances when they encounter problems while decoding the data stream.
These events contain the exception, the buffer that was in the process of being built, and an offset into the buffer (if available) at the point where the exception occurred.
Applications can use a normal ApplicationListener
or an ApplicationEventListeningMessageProducer
(see Section 15.1, “Receiving Spring Application Events”) to capture these events, allowing analysis of the problem.
Starting with versions 4.0.7 and 4.1.3, TcpConnectionServerExceptionEvent
instances are published whenever an unexpected exception occurs on a server socket (such as a BindException
when the server socket is in use).
These events have a reference to the connection factory and the cause.
Starting with version 4.2, TcpConnectionFailedCorrelationEvent
instances are published whenever an endpoint (inbound gateway or
collaborating outbound channel adapter) receives a message that cannot be routed to a connection because the
ip_connectionId
header is invalid.
Outbound gateways also publish this event when a late reply is received (the sender thread has timed out).
The event contains the connection ID as well as an exception in the cause
property, which contains the failed message.
Starting with version 4.3, a TcpConnectionServerListeningEvent
is emitted when a server connection factory is started.
This is useful when the factory is configured to listen on port 0, meaning that the operating system chooses the port.
It can also be used instead of polling isListening()
, if you need to wait before starting some other process that connects to the socket.
Important | |
---|---|
To avoid delaying the listening thread from accepting connections, the event is published on a separate thread. |
Starting with version 4.3.2, a TcpConnectionFailedEvent
is emitted whenever a client connection cannot be created.
The source of the event is the connection factory, which you can use to determine the host and port to which the connection could not be established.
TCP inbound and outbound channel adapters that use connection factories mentioned earlier are provided.
These adapters have two relevant attributes: connection-factory
and channel
.
The connection-factory
attribute indicates which connection factory is to be used to manage connections for the adapter.
The channel
attribute specifies the channel on which messages arrive at an outbound adapter and on which messages are placed by an inbound adapter.
While both inbound and outbound adapters can share a connection factory, server connection factories are always "owned
" by an inbound adapter.
Client connection factories are always "owned
" by an outbound adapter.
Only one adapter of each type may get a reference to a connection factory.
The following example shows how to define client and server TCP connection factories:
<bean id="javaSerializer" class="org.springframework.core.serializer.DefaultSerializer"/> <bean id="javaDeserializer" class="org.springframework.core.serializer.DefaultDeserializer"/> <int-ip:tcp-connection-factory id="server" type="server" port="1234" deserializer="javaDeserializer" serializer="javaSerializer" using-nio="true" single-use="true"/> <int-ip:tcp-connection-factory id="client" type="client" host="localhost" port="#{server.port}" single-use="true" so-timeout="10000" deserializer="javaDeserializer" serializer="javaSerializer"/> <int:channel id="input" /> <int:channel id="replies"> <int:queue/> </int:channel> <int-ip:tcp-outbound-channel-adapter id="outboundClient" channel="input" connection-factory="client"/> <int-ip:tcp-inbound-channel-adapter id="inboundClient" channel="replies" connection-factory="client"/> <int-ip:tcp-inbound-channel-adapter id="inboundServer" channel="loop" connection-factory="server"/> <int-ip:tcp-outbound-channel-adapter id="outboundServer" channel="loop" connection-factory="server"/> <int:channel id="loop"/>
In the preceding configuration, messages arriving in the input
channel are serialized over connections created by client
connection factory, received at the server, and placed on the loop
channel.
Since loop
is the input channel for outboundServer
, the message is looped back over the same connection, received by inboundClient
, and deposited in the replies
channel.
Java serialization is used on the wire.
Normally, inbound adapters use a type="server"
connection factory, which listens for incoming connection requests.
In some cases, you may want to establish the connection in reverse, such that the inbound adapter connects to an external server and then waits for inbound messages on that connection.
This topology is supported by setting client-mode="true"
on the inbound adapter.
In this case, the connection factory must be of type client
and must have single-use
set to false
.
Two additional attributes support this mechanism. retry-interval
specifies (in milliseconds) how often the framework attempts to reconnect after a connection failure.
scheduler
supplies a TaskScheduler
to schedule the connection attempts and to test that the connection is still active.
If you don’t provide a scheduler, the framework’s default taskScheduler bean is used.
For an outbound adapter, the connection is normally established when the first message is sent.
client-mode="true"
on an outbound adapter causes the connection to be established when the adapter is started.
By default, adapters are automatically started.
Again, the connection factory must be of type client
and have single-use="false"
.
retry-interval
and scheduler
are also supported.
If a connection fails, it is re-established either by the scheduler or when the next message is sent.
For both inbound and outbound, if the adapter is started, you can force the adapter to establish a connection by sending a <control-bus />
command: @adapter_id.retryConnection()
.
Then you can examine the current state with @adapter_id.isClientModeConnected()
.
The inbound TCP gateway TcpInboundGateway
and outbound TCP gateway TcpOutboundGateway
use a server and client connection factory, respectively.
Each connection can process a single request or response at a time.
The inbound gateway, after constructing a message with the incoming payload and sending it to the requestChannel
, waits for a response and sends the payload from the response message by writing it to the connection.
Note | |
---|---|
For the inbound gateway, you must retain or populate, the |
As with inbound adapters, inbound gateways normally use a type="server"
connection factory, which listens for incoming connection requests.
In some cases, you may want to establish the connection in reverse, such that the inbound gateway connects to an external server and then waits for and replies to inbound messages on that connection.
This topology is supported by using client-mode="true"
on the inbound gateway.
In this case, the connection factory must be of type client
and must have single-use
set to false
.
Two additional attributes support this mechanism.
retry-interval
specifies (in milliseconds) how often the framework tries to reconnect after a connection failure.
scheduler
supplies a TaskScheduler
to schedule the connection attempts and to test that the connection is still active.
If the gateway is started, you may force the gateway to establish a connection by sending a <control-bus /> command: @adapter_id.retryConnection()
and examine the current state with @adapter_id.isClientModeConnected()
.
The outbound gateway, after sending a message over the connection, waits for a response, constructs a response message, and puts it on the reply channel. Communications over the connections are single-threaded. Only one message can be handled at a time. If another thread attempts to send a message before the current response has been received, it blocks until any previous requests are complete (or time out). If, however, the client connection factory is configured for single-use connections, each new request gets its own connection and is processed immediately. The following example configures an inbound TCP gateway:
<int-ip:tcp-inbound-gateway id="inGateway" request-channel="tcpChannel" reply-channel="replyChannel" connection-factory="cfServer" reply-timeout="10000"/>
If a connection factory configured with the default serializer or deserializer is used, messages is \r\n
delimited data and the gateway can be used by a simple client such as telnet.
The following example shows an outbound TCP gateway:
<int-ip:tcp-outbound-gateway id="outGateway" request-channel="tcpChannel" reply-channel="replyChannel" connection-factory="cfClient" request-timeout="10000" remote-timeout="10000"/> <!-- or e.g. remote-timeout-expression="headers['timeout']" -->
client-mode
is not currently available with the outbound gateway.
One goal of the IP endpoints is to provide communication with systems other than Spring Integration applications. For this reason, only message payloads are sent and received by default. Since 3.0, you can transfer headers by using JSON, Java serialization, or custom serializers and deserializers. See Section 34.8.3, “Transferring Headers” for more information. No message correlation is provided by the framework (except when using the gateways) or collaborating channel adapters on the server side. Later in this document, we discuss the various correlation techniques available to applications. In most cases, this requires specific application-level correlation of messages, even when message payloads contain some natural correlation data (such as an order number).
Gateways automatically correlate messages. However, you should use an outbound gateway for relatively low-volume applications. When you configure the connection factory to use a single shared connection for all message pairs (single-use="false"), only one message can be processed at a time. A new message has to wait until the reply to the previous message has been received. When a connection factory is configured for each new message to use a new connection (single-use="true"), this restriction does not apply. While this setting can give higher throughput than a shared connection environment, it comes with the overhead of opening and closing a new connection for each message pair.
Therefore, for high-volume messages, consider using a collaborating pair of channel adapters. However, to do so, you need to provide collaboration logic.
Another solution, introduced in Spring Integration 2.2, is to use a CachingClientConnectionFactory
, which allows the use of a pool of shared connections.
To achieve high-volume throughput (avoiding the pitfalls of using gateways, as mentioned earlier) you can configure a pair of collaborating outbound and inbound channel adapters. You can also use collaborating adapters (server-side or client-side) for totally asynchronous communication (rather than with request-reply semantics). On the server side, message correlation is automatically handled by the adapters, because the inbound adapter adds a header that allows the outbound adapter to determine which connection to use when sending the reply message.
Note | |
---|---|
On the server side, you must populate the |
On the client side, the application must provide its own correlation logic, if needed. You can do so in a number of ways.
If the message payload has some natural correlation data (such as a transaction ID or an order number) and you have no need to retain any information (such as a reply channel header) from the original outbound message, the correlation is simple and would be done at the application level in any case.
If the message payload has some natural correlation data (such as a transaction ID or an order number), but you need to retain some information (such as a reply channel header) from the original outbound message, you can retain a copy of the original outbound message (perhaps by using a publish-subscribe channel) and use an aggregator to recombine the necessary data.
For either of the previous two scenarios, if the payload has no natural correlation data, you can provide a transformer upstream of the outbound channel adapter to enhance the payload with such data. Such a transformer may transform the original payload to a new object that contains both the original payload and some subset of the message headers. Of course, live objects (such as reply channels) from the headers cannot be included in the transformed payload.
If you choose such a strategy, you need to ensure the connection factory has an appropriate serializer-deserializer pair to handle such a payload (such as DefaultSerializer
and DefaultDeserializer
, which use java serialization, or a custom serializer and deserializer).
The ByteArray*Serializer
options mentioned in Section 34.3, “TCP Connection Factories”, including the default ByteArrayCrLfSerializer
, do not support such payloads unless the transformed payload is a String
or byte[]
.
Note | |
---|---|
Before the 2.2 release, when collaborating channel adapters used a client connection factory, the This default behavior was not appropriate in a truly asynchronous environment, so it now defaults to an infinite timeout.
You can reinstate the previous default behavior by setting the |
TCP is a streaming protocol.
Serializers
and Deserializers
demarcate messages within the stream.
Prior to 3.0, only message payloads (String
or byte[]
) could be transferred over TCP.
Beginning with 3.0, you can transfer selected headers as well as the payload.
However, "live
" objects, such as the replyChannel
header, cannot be serialized.
Sending header information over TCP requires some additional configuration.
The first step is to provide the ConnectionFactory
with a MessageConvertingTcpMessageMapper
that uses the mapper
attribute.
This mapper delegates to any MessageConverter
implementation to convert the message to and from some object that can be serialized and deserialized by the configured serializer
and deserializer
.
Spring Integration provides a MapMessageConverter
, which allows the specification of a list of headers that are added to a Map
object, along with the payload.
The generated Map has two entries: payload
and headers
.
The headers
entry is itself a Map
and contains the selected headers.
The second step is to provide a serializer and a deserializer that can convert between a Map
and some wire format.
This can be a custom Serializer
or Deserializer
, which you typically need if the peer system is not a Spring Integration application.
Spring Integration provides a MapJsonSerializer
to convert a Map
to and from JSON.
It uses a Spring Integration JsonObjectMapper
.
You can provide a custom JsonObjectMapper
if needed.
By default, the serializer inserts a linefeed (0x0a
) character between objects.
See the Javadoc for more information.
Note | |
---|---|
The |
You can also use standard Java serialization of the Map
, by using the DefaultSerializer
and DefaultDeserializer
.
The following example shows the configuration of a connection factory that transfers the correlationId
, sequenceNumber
, and sequenceSize
headers by using JSON:
<int-ip:tcp-connection-factory id="client" type="client" host="localhost" port="12345" mapper="mapper" serializer="jsonSerializer" deserializer="jsonSerializer"/> <bean id="mapper" class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper"> <constructor-arg name="messageConverter"> <bean class="o.sf.integration.support.converter.MapMessageConverter"> <property name="headerNames"> <list> <value>correlationId</value> <value>sequenceNumber</value> <value>sequenceSize</value> </list> </property> </bean> </constructor-arg> </bean> <bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />
A message sent with the preceding configuration, with a payload of something would appear on the wire as follows:
{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}
Using NIO (see using-nio
in Section 34.12, “IP Configuration Attributes”) avoids dedicating a thread to read from each socket.
For a small number of sockets, you are likely to find that not using NIO, together with an asynchronous handoff (such as to a QueueChannel
), performs as well as or better than using NIO.
You should consider using NIO when handling a large number of connections. However, the use of NIO has some other ramifications. A pool of threads (in the task executor) is shared across all the sockets. Each incoming message is assembled and sent to the configured channel as a separate unit of work on a thread selected from that pool. Two sequential messages arriving on the same socket might be processed by different threads. This means that the order in which the messages are sent to the channel is indeterminate. Strict ordering of the messages arriving on the socket is not maintained.
For some applications, this is not an issue.
For others, it is a problem.
If you require strict ordering, consider setting using-nio
to false
and using an asynchronous handoff.
Alternatively, you can insert a resequencer downstream of the inbound endpoint to return the messages to their proper sequence.
If you set apply-sequence
to true
on the connection factory, messages arriving on a TCP connection have sequenceNumber
and correlationId
headers set.
The resequencer uses these headers to return the messages to their proper sequence.
The pool size attribute is no longer used.
Previously, it specified the size of the default thread pool when a task-executor was not specified.
It was also used to set the connection backlog on server sockets.
The first function is no longer needed (see the next paragraph).
The second function is replaced by the backlog
attribute.
Previously, when using a fixed thread pool task executor (which was the default) with NIO, it was possible to get a deadlock and processing would stop. The problem occurred when a buffer was full, a thread reading from the socket was trying to add more data to the buffer, and no threads were available to make space in the buffer. This only occurred with a very small pool size, but it could be possible under extreme conditions. Since 2.2, two changes have eliminated this problem. First, the default task executor is a cached thread pool executor. Second, deadlock detection logic has been added such that, if thread starvation occurs, instead of deadlocking, an exception is thrown, thus releasing the deadlocked resources.
Important | |
---|---|
Now that the default task executor is unbounded, it is possible that an out-of-memory condition might occur with high rates of incoming messages, if message processing takes extended time. If your application exhibits this type of behavior, you should use a pooled task executor with an appropriate pool size, but see the next section. |
You should keep in mind some important considerations when you use a fixed thread pool with the CallerRunsPolicy
(CALLER_RUNS
when using the <task/>
namespace) and the queue capacity is small.
The following does not apply if you do not use a fixed thread pool.
With NIO connections, there are three distinct task types. The I/O selector processing is performed on one dedicated thread (detecting events, accepting new connections, and dispatching the I/O read operations to other threads by using the task executor).
When an I/O reader thread (to which the read operation is dispatched) reads data, it hands off to another thread to assemble the incoming message.
Large messages can take several reads to complete.
These "assembler
" threads can block while waiting for data.
When a new read event occurs, the reader determines if this socket already has an assembler and, if not, runs a new one.
When the assembly process is complete, the assembler thread is returned to the pool.
This can cause a deadlock when the pool is exhausted, the CALLER_RUNS
rejection policy is in use, and the task queue is full.
When the pool is empty and there is no room in the queue, the IO selector thread receives an OP_READ
event and dispatches the read by using the executor.
The queue is full, so the selector thread itself starts the read process.
Now it detects that there is no assembler for this socket and, before it does the read, fires off an assembler.
Again, the queue is full, and the selector thread becomes the assembler.
The assembler is now blocked, waiting for the data to be read, which never happens.
The connection factory is now deadlocked because the selector thread cannot handle new events.
To avoid this deadlock, we must avoid the selector (or reader) threads performing the assembly task. We want to use separate pools for the IO and assembly operations.
The framework provides a CompositeExecutor
, which allows the configuration of two distinct executors: one for performing IO operations and one for message assembly.
In this environment, an IO thread can never become an assembler thread, and the deadlock cannot occur.
In addition, the task executors should be configured to use an AbortPolicy
(ABORT
when using <task>
).
When an I/O task cannot be completed, it is deferred for a short time and continually retried until it can be completed and have an assembler allocated.
By default, the delay is 100ms, but you can change it by setting the readDelay
property on the connection factory (read-delay
when configuring with the XML namespace).
The following three examples shows how to configure the composite executor:
@Bean private CompositeExecutor compositeExecutor() { ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor(); ioExec.setCorePoolSize(4); ioExec.setMaxPoolSize(10); ioExec.setQueueCapacity(0); ioExec.setThreadNamePrefix("io-"); ioExec.setRejectedExecutionHandler(new AbortPolicy()); ioExec.initialize(); ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor(); assemblerExec.setCorePoolSize(4); assemblerExec.setMaxPoolSize(10); assemblerExec.setQueueCapacity(0); assemblerExec.setThreadNamePrefix("assembler-"); assemblerExec.setRejectedExecutionHandler(new AbortPolicy()); assemblerExec.initialize(); return new CompositeExecutor(ioExec, assemblerExec); }
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor"> <constructor-arg ref="io"/> <constructor-arg ref="assembler"/> </bean> <task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" /> <task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor"> <constructor-arg> <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="threadNamePrefix" value="io-" /> <property name="corePoolSize" value="4" /> <property name="maxPoolSize" value="8" /> <property name="queueCapacity" value="0" /> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" /> </property> </bean> </constructor-arg> <constructor-arg> <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="threadNamePrefix" value="assembler-" /> <property name="corePoolSize" value="4" /> <property name="maxPoolSize" value="10" /> <property name="queueCapacity" value="0" /> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" /> </property> </bean> </constructor-arg> </bean>
Secure Sockets Layer/Transport Layer Security is supported.
When using NIO, the JDK 5+ SSLEngine
feature is used to handle handshaking after the connection is established.
When not using NIO, standard SSLSocketFactory
and SSLServerSocketFactory
objects are used to create connections.
A number of strategy interfaces are provided to allow significant customization.
The default implementations of these interfaces provide for the simplest way to get started with secure communications.
Regardless of whether you use NIO, you need to configure the ssl-context-support
attribute on the connection factory.
This attribute references a <bean/> definition that describes the location and passwords for the required key stores.
SSL/TLS peers require two keystores each:
A truststore that contains the public keys for peers that are trusted.
See the documentation for the keytool
utility provided with the JDK.
The essential steps are
Note | |
---|---|
It is common in test cases to use the same key stores on both peers, but this should be avoided for production. |
After establishing the key stores, the next step is to indicate their locations to the TcpSSLContextSupport
bean and provide a reference to that bean to the connection factory.
The following example configures an SSL connection:
<bean id="sslContextSupport" class="o.sf.integration.ip.tcp.connection.support.DefaultTcpSSLContextSupport"> <constructor-arg value="client.ks"/> <constructor-arg value="client.truststore.ks"/> <constructor-arg value="secret"/> <constructor-arg value="secret"/> </bean> <ip:tcp-connection-factory id="clientFactory" type="client" host="localhost" port="1234" ssl-context-support="sslContextSupport" />
The DefaulTcpSSLContextSupport
class also has an optional protocol
property, which can be SSL
or TLS
(the default).
The keystore file names (the first two constructor arguments) use the Spring Resource
abstraction.
By default, the files are located on the classpath, but you can override this by using the file:
prefix (to find the files on the filesystem instead).
Starting with version 4.3.6, when you use NIO, you can specify an ssl-handshake-timeout
(in seconds) on the connection factory.
This timeout (the default is 30 seconds) is used during SSL handshake when waiting for data.
If the timeout is exceeded, the process is aborted and the socket is closed.
Starting with version 5.0.8, you can configure whether or not to enable host verification. Starting with version 5.1, it is enabled by default; the mechanism to disable it depends on whether or not you are using NIO.
Host verification is used to ensure the server you are connected to matches information in the certificate, even if the certificate is trusted.
When using NIO, configure the DefaultTcpNioSSLConnectionSupport
, for example.
@Bean public DefaultTcpNioSSLConnectionSupport connectionSupport() { DefaultTcpSSLContextSupport sslContextSupport = new DefaultTcpSSLContextSupport("test.ks", "test.truststore.ks", "secret", "secret"); sslContextSupport.setProtocol("SSL"); DefaultTcpNioSSLConnectionSupport tcpNioConnectionSupport = new DefaultTcpNioSSLConnectionSupport(sslContextSupport, false); return tcpNioConnectionSupport; }
The second constructor argument disables host verification.
The connectionSupport
bean is then injected into the NIO connection factory.
When not using NIO, the configuration is in the TcpSocketSupport
:
connectionFactory.setTcpSocketSupport(new DefaultTcpSocketSupport(false));
Again, the constructor argument disables host verification.
This section covers advanced techniques that you may find to be helpful in certain situations.
In many cases, the configuration described earlier is all that is needed to enable secure communication over TCP/IP. However, Spring Integration provides a number of strategy interfaces to allow customization and modification of socket factories and sockets:
TcpSSLContextSupport
TcpSocketFactorySupport
TcpSocketSupport
TcpNetConnectionSupport
TcpNioConnectionSupport
The following listing shows the TcpSSLContextSupport
strategy interface:
public interface TcpSSLContextSupport { SSLContext getSSLContext() throws Exception; }
Implementations of the TcpSSLContextSupport
interface are responsible for creating an SSLContext
object.
The implementation provided by the framework is the DefaultTcpSSLContextSupport
, described earlier.
If you require different behavior, implement this interface and provide the connection factory with a reference to a bean of your class' implementation.
The following listing shows the definition of the TcpSocketFactorySupport
strategy interface:
public interface TcpSocketFactorySupport { ServerSocketFactory getServerSocketFactory(); SocketFactory getSocketFactory(); }
Implementations of this interface are responsible for obtaining references to ServerSocketFactory
and SocketFactory
.
Two implementations are provided.
The first is DefaultTcpNetSocketFactorySupport
for non-SSL sockets (when no ssl-context-support
attribute is defined).
This uses the JDK’s default factories.
The second implementation is DefaultTcpNetSSLSocketFactorySupport
.
By default, this is used when an ssl-context-support
attribute is defined.
It uses the SSLContext
created by that bean to create the socket factories.
Note | |
---|---|
This interface applies only if |
The following listing shows the definition of the TcpSocketSupport
strategy interface:
public interface TcpSocketSupport { void postProcessServerSocket(ServerSocket serverSocket); void postProcessSocket(Socket socket); }
Implementations of this interface can modify sockets after they are created and after all configured attributes have been applied but before the sockets are used.
This applies whether you use NIO or not.
For example, you could use an implementation of this interface to modify the supported cipher suites on an SSL socket, or you could add a listener that gets notified after SSL handshaking is complete.
The sole implementation provided by the framework is the DefaultTcpSocketSupport
, which does not modify the sockets in any way.
To supply your own implementation of TcpSocketFactorySupport
or TcpSocketSupport
, provide the connection factory with references to beans of your custom type by setting the socket-factory-support
and socket-support
attributes, respectively.
The following listing shows the definition of the TcpNetConnectionSupport
strategy interface:
public interface TcpNetConnectionSupport { TcpNetConnection createNewConnection(Socket socket, boolean server, boolean lookupHost, ApplicationEventPublisher applicationEventPublisher, String connectionFactoryName) throws Exception; }
This interface is invoked to create objects of type TcpNetConnection
(or its subclasses).
The framework provides a single implementation (DefatulTcpNetConnectionSupport
), which, by default, creates simple TcpNetConnection
objects.
It has two properties: pushbackCapable
and pushbackBufferSize
.
When push back is enabled, the implementation returns a subclass that wraps the connection’s InputStream
in a PushbackInputStream
.
Aligned with the PushbackInputStream
default, the buffer size defaults to 1.
This lets deserializers "unread
" (push back) bytes into the stream.
The following trivial example shows how it might be used in a delegating deserializer that "peeks
" at the first byte to determine which deserializer to invoke:
public class CompositeDeserializer implements Deserializer<byte[]> { private final ByteArrayStxEtxSerializer stxEtx = new ByteArrayStxEtxSerializer(); private final ByteArrayCrLfSerializer crlf = new ByteArrayCrLfSerializer(); @Override public byte[] deserialize(InputStream inputStream) throws IOException { PushbackInputStream pbis = (PushbackInputStream) inputStream; int first = pbis.read(); if (first < 0) { throw new SoftEndOfStreamException(); } pbis.unread(first); if (first == ByteArrayStxEtxSerializer.STX) { this.receivedStxEtx = true; return this.stxEtx.deserialize(pbis); } else { this.receivedCrLf = true; return this.crlf.deserialize(pbis); } } }
The following listing shows the definition of the TcpNioConnectionSupport
strategy interface:
public interface TcpNioConnectionSupport { TcpNioConnection createNewConnection(SocketChannel socketChannel, boolean server, boolean lookupHost, ApplicationEventPublisher applicationEventPublisher, String connectionFactoryName) throws Exception; }
This interface is invoked to create TcpNioConnection
objects (or objects from subclasses).
Spring Integration provides two implementations: DefaultTcpNioSSLConnectionSupport
and DefaultTcpNioConnectionSupport
.
Which one is used depends on whether SSL is in use.
A common use case is to subclass DefaultTcpNioSSLConnectionSupport
and override postProcessSSLEngine
.
See the SSL client authentication example.
As with the DefaultTcpNetConnectionSupport
, these implementations also support push back.
To enable client certificate authentication when you use SSL, the technique depends on whether you use NIO.
When you do not NIO , provide a custom TcpSocketSupport
implementation to post-process the server socket:
serverFactory.setTcpSocketSupport(new DefaultTcpSocketSupport() { @Override public void postProcessServerSocket(ServerSocket serverSocket) { ((SSLServerSocket) serverSocket).setNeedClientAuth(true); } });
(When you use XML configuration, provide a reference to your bean by setting the socket-support
attribute).
When you use NIO, provide a custom TcpNioSslConnectionSupport
implementation to post-process the SSLEngine
, as the following example shows:
@Bean public DefaultTcpNioSSLConnectionSupport tcpNioConnectionSupport() { return new DefaultTcpNioSSLConnectionSupport(serverSslContextSupport) { @Override protected void postProcessSSLEngine(SSLEngine sslEngine) { sslEngine.setNeedClientAuth(true); } } } @Bean public TcpNioServerConnectionFactory server() { ... serverFactory.setTcpNioConnectionSupport(tcpNioConnectionSupport()); ... }
(When you use XML configuration, since version 4.3.7, provide a reference to your bean by setting the nio-connection-support
attribute).
The following table describes attributes that you can set to configure IP connections:
Table 34.1. Connection Factory Attributes
Attribute Name | Client? | Server? | Allowed Values | Attribute Description |
---|---|---|---|---|
| Y | Y | client, server | Determines whether the connection factory is a client or a server. |
| Y | N | The host name or IP address of the destination. | |
| Y | Y | The port. | |
| Y | Y | An implementation of | |
| Y | Y | An implementation of | |
| Y | Y |
| Whether or not connection uses NIO.
Refer to the |
| Y | N |
| When using NIO, whether or not the connection uses direct buffers.
Refer to the |
| Y | Y |
| When you use NIO, it may be necessary to resequence messages.
When this attribute is set to |
| Y | Y | Defaults to | |
| Y | Y | See | |
| Y | Y | See | |
| Y | Y |
| See |
| Y | Y | Sets | |
| Y | Y |
| See |
| Y | Y | See | |
| N | Y | On a multi-homed system, specifies an IP address for the interface to which the socket is bound. | |
| Y | Y | Specifies a specific executor to be used for socket handling.
If not supplied, an internal cached thread executor is used.
Needed on some platforms that require the use of specific task executors, such as a | |
| Y | Y |
| Specifies whether a connection can be used for multiple messages.
If |
| N | N | This attribute is no longer used.
For backward compatibility, it sets the backlog, but you should use | |
| N | Y | Sets the connection backlog for server factories. | |
| Y | Y |
| Specifies whether reverse lookups are done on IP addresses to convert to host names for use in message headers.
If false, the IP address is used instead.
Default: |
| Y | Y | ||
| Y | Y | See | |
| Y | Y | See | |
| Y | Y | ||
| Y | Y | ||
| Y | Y | long > 0 | The delay (in milliseconds) before retrying a read after the previous attempt failed due to insufficient threads.
Default: 100.
Only applies if |
The following table describes attributes that you can set to configure UDP inbound channel adapters:
Table 34.2. UDP Inbound Channel Adapter Attributes
Attribute Name | Allowed Values | Attribute Description |
---|---|---|
| The port on which the adapter listens. | |
|
| Whether or not the UDP adapter uses multicast. |
| When multicast is true, the multicast address to which the adapter joins. | |
| Specifies how many packets can be handled concurrently. It only applies if task-executor is not configured. Default: 5. | |
task-executor | Specifies a specific executor to be used for socket handling.
If not supplied, an internal pooled executor is used.
Needed on some platforms that require the use of specific task executors such as a | |
| The size of the buffer used to receive | |
|
| Whether or not a UDP adapter expects a data length field in the packet received. Used to detect packet truncation. |
| See the | |
| Used for UDP acknowledgment packets.
See the setSendBufferSize() methods in | |
| See | |
| On a multi-homed system, specifies an IP address for the interface to which the socket is bound. | |
| If a downstream component throws an exception, the | |
|
| Specifies whether reverse lookups are done on IP addresses to convert to host names for use in message headers.
If |
The following table describes attributes that you can set to configure UDP outbound channel adapters:
Table 34.3. UDP Outbound Channel Adapter Attributes
Attribute Name | Allowed Values | Attribute Description |
---|---|---|
| The host name or ip address of the destination. For multicast udp adapters, the multicast address. | |
| The port on the destination. | |
|
| Whether or not the udp adapter uses multicast. |
|
| Whether or not a UDP adapter requires an acknowledgment from the destination.
When enabled, it requires setting the following four attributes: |
| When | |
| When | |
| When | |
| Defaults to 1. For multicast adapters, you can set this to a larger value, which requires acknowledgments from multiple destinations. | |
|
| Whether or not a UDP adapter includes a data length field in the packet sent to the destination. |
| For multicast adapters, specifies the time-to-live attribute for the | |
| See | |
| See the | |
| Used for UDP acknowledgment packets.
See the | |
local-address | On a multi-homed system, for the UDP adapter, specifies an IP address for the interface to which the socket is bound for reply messages. For a multicast adapter, it also determines which interface the multicast packets are sent over. | |
| Specifies a specific executor to be used for acknowledgment handling.
If not supplied, an internal single threaded executor is used.
Needed on some platforms that require the use of specific task executors, such as a | |
| SpEL expression | A SpEL expression to be evaluated to determine which |
| SpEL expression | A SpEL expression to be evaluated to determine which datagram socket use for sending outgoing UDP packets. |
The following table describes attributes that you can set to configure TCP inbound channel adapters:
Table 34.4. TCP Inbound Channel Adapter Attributes
Attribute Name | Allowed Values | Attribute Description |
---|---|---|
| The channel to which inbound messages is sent. | |
| If the connection factory has a type of | |
| If an exception is thrown by a downstream component, the | |
|
| When |
| When in | |
|
| Specifies a |
The following table describes attributes that you can set to configure TCP outbound channel adapters:
Table 34.5. TCP Outbound Channel Adapter Attributes
Attribute Name | Allowed Values | Attribute Description |
---|---|---|
| The channel on which outbound messages arrive. | |
| If the connection factory has a type of | |
|
| When |
| When in | |
|
| Specifies a |
The following table describes attributes that you can set to configure TCP inbound gateways:
Table 34.6. TCP Inbound Gateway Attributes
Attribute Name | Allowed Values | Attribute Description |
---|---|---|
| The connection factory must be of type server. | |
| The channel to which incoming messages are sent. | |
| The channel on which reply messages may arrive. Usually, replies arrive on a temporary reply channel added to the inbound message header. | |
| The time in milliseconds for which the gateway waits for a reply. Default: 1000 (1 second). | |
| If an exception is thrown by a downstream component, the | |
|
| When |
| When in | |
|
| Specifies a |
The following table describes attributes that you can set to configure TCP outbound gateways:
Table 34.7. TCP Outbound Gateway Attributes
Attribute Name | Allowed Values | Attribute Description |
---|---|---|
| The connection factory must be of type | |
| The channel on which outgoing messages arrive. | |
| Optional. The channel to which reply messages are sent. | |
| The time in milliseconds for which the gateway waits for a reply from the remote system.
Mutually exclusive with | |
| A SpEL expression that is evaluated against the message to determine the time in milliseconds for which the gateway waits for a reply from the remote system.
Mutually exclusive with | |
| If a single-use connection factory is not being used, the time in milliseconds for which the gateway waits to get access to the shared connection. | |
| The time in milliseconds for which the gateway waits when sending the reply to the reply-channel. Only applies if the reply-channel might block (such as a bounded QueueChannel that is currently full). |
IP Message Headers.
This module uses the following MessageHeader
instances:
Header Name | IpHeaders Constant | Description |
---|---|---|
|
| The host name from which a TCP message or UDP packet was received.
If |
|
| The IP address from which a TCP message or UDP packet was received. |
|
| The remote port for a UDP packet. |
ip_localInetAddress |
| The local |
|
| The remote IP address to which UDP application-level acknowledgments are sent. The framework includes acknowledgment information in the data packet. |
|
| A correlation ID for UDP application-level acknowledgments. The framework includes acknowledgment information in the data packet. |
|
| The remote port for a TCP connection. |
|
| A unique identifier for a TCP connection. Set by the framework for inbound messages. When sending to a server-side inbound channel adapter or replying to an inbound gateway, this header is required so that the endpoint can determine the connection to which to send the message. |
|
| For information only. When using a cached or failover client connection factory, it contains the actual underlying connection ID. |
|
| An optional content type for inbound messages
Described after this table.
Note that, unlike the other header constants, this constant is in the |
For inbound messages, ip_hostname
, ip_address
, ip_tcp_remotePort
, and ip_connectionId
are mapped by the default
TcpHeaderMapper
.
If you set the mapper’s addContentTypeHeader
property to true
, the mapper sets the contentType
header (application/octet-stream;charset="UTF-8"
, by default).
You can change the default by setting the contentType
property.
You can add additional headers by subclassing TcpHeaderMapper
and overriding the supplyCustomHeaders
method.
For example, when you use SSL, you can add properties of the SSLSession
by obtaining the session object from the
TcpConnection
object, which is provided as an argument to the supplyCustomHeaders
method.
For outbound messages, String
payloads are converted to byte[]
with the default (UTF-8
) charset.
Set the charset
property to change the default.
When customizing the mapper properties or subclassing, declare the mapper as a bean and provide an instance to the connection factory by using the mapper
property
The following example from the samples repository shows some of the configuration options available when you use annotations instead of XML:
@EnableIntegration @IntegrationComponentScan @Configuration public static class Config { @Value(${some.port}) private int port; @MessagingGateway(defaultRequestChannel="toTcp") public interface Gateway { String viaTcp(String in); } @Bean @ServiceActivator(inputChannel="toTcp") public MessageHandler tcpOutGate(AbstractClientConnectionFactory connectionFactory) { TcpOutboundGateway gate = new TcpOutboundGateway(); gate.setConnectionFactory(connectionFactory); gate.setOutputChannelName("resultToString"); return gate; } @Bean public TcpInboundGateway tcpInGate(AbstractServerConnectionFactory connectionFactory) { TcpInboundGateway inGate = new TcpInboundGateway(); inGate.setConnectionFactory(connectionFactory); inGate.setRequestChannel(fromTcp()); return inGate; } @Bean public MessageChannel fromTcp() { return new DirectChannel(); } @MessageEndpoint public static class Echo { @Transformer(inputChannel="fromTcp", outputChannel="toEcho") public String convert(byte[] bytes) { return new String(bytes); } @ServiceActivator(inputChannel="toEcho") public String upCase(String in) { return in.toUpperCase(); } @Transformer(inputChannel="resultToString") public String convertResult(byte[] bytes) { return new String(bytes); } } @Bean public AbstractClientConnectionFactory clientCF() { return new TcpNetClientConnectionFactory("localhost", this.port); } @Bean public AbstractServerConnectionFactory serverCF() { return new TcpNetServerConnectionFactory(this.port); } }
Standard Spring Integration annotation enabling the infrastructure for an integration application. | |
Searches for | |
The entry point to the client-side of the flow. The calling application can use | |
Outbound endpoints consist of a | |
Inbound endpoints (in the TCP/UDP module) are all message-driven and so only need to be declared as simple | |
This class provides a number of POJO methods for use in this sample flow (a | |
The client-side connection factory. | |
The server-side connection factory. |