This version is still in development and is not considered stable yet. For the latest stable version, please use Spring Integration 6.4.0! |
TCP Message Correlation
One goal of the IP endpoints is to provide communication with systems other than Spring Integration applications. For this reason, only message payloads are sent and received by default. Since 3.0, you can transfer headers by using JSON, Java serialization, or custom serializers and deserializers. See Transferring Headers for more information. No message correlation is provided by the framework (except when using the gateways) or collaborating channel adapters on the server side. Later in this document, we discuss the various correlation techniques available to applications. In most cases, this requires specific application-level correlation of messages, even when message payloads contain some natural correlation data (such as an order number).
Gateways
Gateways automatically correlate messages. However, you should use an outbound gateway for relatively low-volume applications. When you configure the connection factory to use a single shared connection for all message pairs ('single-use="false"'), only one message can be processed at a time. A new message has to wait until the reply to the previous message has been received. When a connection factory is configured for each new message to use a new connection ('single-use="true"'), this restriction does not apply. While this setting can give higher throughput than a shared connection environment, it comes with the overhead of opening and closing a new connection for each message pair.
Therefore, for high-volume messages, consider using a collaborating pair of channel adapters. However, to do so, you need to provide collaboration logic.
Another solution, introduced in Spring Integration 2.2, is to use a CachingClientConnectionFactory
, which allows the use of a pool of shared connections.
Collaborating Outbound and Inbound Channel Adapters
To achieve high-volume throughput (avoiding the pitfalls of using gateways, as mentioned earlier) you can configure a pair of collaborating outbound and inbound channel adapters. You can also use collaborating adapters (server-side or client-side) for totally asynchronous communication (rather than with request-reply semantics). On the server side, message correlation is automatically handled by the adapters, because the inbound adapter adds a header that allows the outbound adapter to determine which connection to use when sending the reply message.
On the server side, you must populate the ip_connectionId header, because it is used to correlate the message to a connection.
Messages that originate at the inbound adapter automatically have the header set.
If you wish to construct other messages to send, you need to set the header.
You can get the header value from an incoming message.
|
On the client side, the application must provide its own correlation logic, if needed. You can do so in a number of ways.
If the message payload has some natural correlation data (such as a transaction ID or an order number) and you have no need to retain any information (such as a reply channel header) from the original outbound message, the correlation is simple and would be done at the application level in any case.
If the message payload has some natural correlation data (such as a transaction ID or an order number), but you need to retain some information (such as a reply channel header) from the original outbound message, you can retain a copy of the original outbound message (perhaps by using a publish-subscribe channel) and use an aggregator to recombine the necessary data.
For either of the previous two scenarios, if the payload has no natural correlation data, you can provide a transformer upstream of the outbound channel adapter to enhance the payload with such data. Such a transformer may transform the original payload to a new object that contains both the original payload and some subset of the message headers. Of course, live objects (such as reply channels) from the headers cannot be included in the transformed payload.
If you choose such a strategy, you need to ensure the connection factory has an appropriate serializer-deserializer pair to handle such a payload (such as DefaultSerializer
and DefaultDeserializer
, which use java serialization, or a custom serializer and deserializer).
The ByteArray*Serializer
options mentioned in TCP Connection Factories, including the default ByteArrayCrLfSerializer
, do not support such payloads unless the transformed payload is a String
or byte[]
.
Before the 2.2 release, when collaborating channel adapters used a client connection factory, the This default behavior was not appropriate in a truly asynchronous environment, so it now defaults to an infinite timeout.
You can reinstate the previous default behavior by setting the |
Starting with version 5.4, multiple outbound channel adapters and one TcpInboundChannelAdapter
can share the same connection factory.
This allows an application to support both request/reply and arbitrary server → client messaging.
See TCP Gateways for more information.
Transferring Headers
TCP is a streaming protocol.
Serializers
and Deserializers
demarcate messages within the stream.
Prior to 3.0, only message payloads (String
or byte[]
) could be transferred over TCP.
Beginning with 3.0, you can transfer selected headers as well as the payload.
However, “live” objects, such as the replyChannel
header, cannot be serialized.
Sending header information over TCP requires some additional configuration.
The first step is to provide the ConnectionFactory
with a MessageConvertingTcpMessageMapper
that uses the mapper
attribute.
This mapper delegates to any MessageConverter
implementation to convert the message to and from some object that can be serialized and deserialized by the configured serializer
and deserializer
.
Spring Integration provides a MapMessageConverter
, which allows the specification of a list of headers that are added to a Map
object, along with the payload.
The generated Map has two entries: payload
and headers
.
The headers
entry is itself a Map
and contains the selected headers.
The second step is to provide a serializer and a deserializer that can convert between a Map
and some wire format.
This can be a custom Serializer
or Deserializer
, which you typically need if the peer system is not a Spring Integration application.
Spring Integration provides a MapJsonSerializer
to convert a Map
to and from JSON.
It uses a Spring Integration JsonObjectMapper
.
You can provide a custom JsonObjectMapper
if needed.
By default, the serializer inserts a linefeed (0x0a
) character between objects.
See the Javadoc for more information.
The JsonObjectMapper uses whichever version of Jackson is on the classpath.
|
You can also use standard Java serialization of the Map
, by using the DefaultSerializer
and DefaultDeserializer
.
The following example shows the configuration of a connection factory that transfers the correlationId
, sequenceNumber
, and sequenceSize
headers by using JSON:
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
mapper="mapper"
serializer="jsonSerializer"
deserializer="jsonSerializer"/>
<bean id="mapper"
class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
<constructor-arg name="messageConverter">
<bean class="o.sf.integration.support.converter.MapMessageConverter">
<property name="headerNames">
<list>
<value>correlationId</value>
<value>sequenceNumber</value>
<value>sequenceSize</value>
</list>
</property>
</bean>
</constructor-arg>
</bean>
<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />
A message sent with the preceding configuration, with a payload of 'something' would appear on the wire as follows:
{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}