TCP Message Correlation

One goal of the IP endpoints is to provide communication with systems other than Spring Integration applications. For this reason, only message payloads are sent and received by default. Since 3.0, you can transfer headers by using JSON, Java serialization, or custom serializers and deserializers. See Transferring Headers for more information. No message correlation is provided by the framework (except when using the gateways) or collaborating channel adapters on the server side. Later in this document, we discuss the various correlation techniques available to applications. In most cases, this requires specific application-level correlation of messages, even when message payloads contain some natural correlation data (such as an order number).

Gateways

Gateways automatically correlate messages. However, you should use an outbound gateway for relatively low-volume applications. When you configure the connection factory to use a single shared connection for all message pairs ('single-use="false"'), only one message can be processed at a time. A new message has to wait until the reply to the previous message has been received. When a connection factory is configured for each new message to use a new connection ('single-use="true"'), this restriction does not apply. While this setting can give higher throughput than a shared connection environment, it comes with the overhead of opening and closing a new connection for each message pair.

Therefore, for high-volume messages, consider using a collaborating pair of channel adapters. However, to do so, you need to provide collaboration logic.

Another solution, introduced in Spring Integration 2.2, is to use a CachingClientConnectionFactory, which allows the use of a pool of shared connections.

Collaborating Outbound and Inbound Channel Adapters

To achieve high-volume throughput (avoiding the pitfalls of using gateways, as mentioned earlier) you can configure a pair of collaborating outbound and inbound channel adapters. You can also use collaborating adapters (server-side or client-side) for totally asynchronous communication (rather than with request-reply semantics). On the server side, message correlation is automatically handled by the adapters, because the inbound adapter adds a header that allows the outbound adapter to determine which connection to use when sending the reply message.

On the server side, you must populate the ip_connectionId header, because it is used to correlate the message to a connection. Messages that originate at the inbound adapter automatically have the header set. If you wish to construct other messages to send, you need to set the header. You can get the header value from an incoming message.

On the client side, the application must provide its own correlation logic, if needed. You can do so in a number of ways.

If the message payload has some natural correlation data (such as a transaction ID or an order number) and you have no need to retain any information (such as a reply channel header) from the original outbound message, the correlation is simple and would be done at the application level in any case.

If the message payload has some natural correlation data (such as a transaction ID or an order number), but you need to retain some information (such as a reply channel header) from the original outbound message, you can retain a copy of the original outbound message (perhaps by using a publish-subscribe channel) and use an aggregator to recombine the necessary data.

For either of the previous two scenarios, if the payload has no natural correlation data, you can provide a transformer upstream of the outbound channel adapter to enhance the payload with such data. Such a transformer may transform the original payload to a new object that contains both the original payload and some subset of the message headers. Of course, live objects (such as reply channels) from the headers cannot be included in the transformed payload.

If you choose such a strategy, you need to ensure the connection factory has an appropriate serializer-deserializer pair to handle such a payload (such as DefaultSerializer and DefaultDeserializer, which use java serialization, or a custom serializer and deserializer). The ByteArray*Serializer options mentioned in TCP Connection Factories, including the default ByteArrayCrLfSerializer, do not support such payloads unless the transformed payload is a String or byte[].

Before the 2.2 release, when collaborating channel adapters used a client connection factory, the so-timeout attribute defaulted to the default reply timeout (10 seconds). This meant that, if no data were received by the inbound adapter for this period of time, the socket was closed.

This default behavior was not appropriate in a truly asynchronous environment, so it now defaults to an infinite timeout. You can reinstate the previous default behavior by setting the so-timeout attribute on the client connection factory to 10000 milliseconds.

Starting with version 5.4, multiple outbound channel adapters and one TcpInboundChannelAdapter can share the same connection factory. This allows an application to support both request/reply and arbitrary server → client messaging. See TCP Gateways for more information.

Transferring Headers

TCP is a streaming protocol. Serializers and Deserializers demarcate messages within the stream. Prior to 3.0, only message payloads (String or byte[]) could be transferred over TCP. Beginning with 3.0, you can transfer selected headers as well as the payload. However, “live” objects, such as the replyChannel header, cannot be serialized.

Sending header information over TCP requires some additional configuration.

The first step is to provide the ConnectionFactory with a MessageConvertingTcpMessageMapper that uses the mapper attribute. This mapper delegates to any MessageConverter implementation to convert the message to and from some object that can be serialized and deserialized by the configured serializer and deserializer.

Spring Integration provides a MapMessageConverter, which allows the specification of a list of headers that are added to a Map object, along with the payload. The generated Map has two entries: payload and headers. The headers entry is itself a Map and contains the selected headers.

The second step is to provide a serializer and a deserializer that can convert between a Map and some wire format. This can be a custom Serializer or Deserializer, which you typically need if the peer system is not a Spring Integration application.

Spring Integration provides a MapJsonSerializer to convert a Map to and from JSON. It uses a Spring Integration JsonObjectMapper. You can provide a custom JsonObjectMapper if needed. By default, the serializer inserts a linefeed (0x0a) character between objects. See the Javadoc for more information.

The JsonObjectMapper uses whichever version of Jackson is on the classpath.

You can also use standard Java serialization of the Map, by using the DefaultSerializer and DefaultDeserializer.

The following example shows the configuration of a connection factory that transfers the correlationId, sequenceNumber, and sequenceSize headers by using JSON:

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="12345"
    mapper="mapper"
    serializer="jsonSerializer"
    deserializer="jsonSerializer"/>

<bean id="mapper"
      class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
    <constructor-arg name="messageConverter">
        <bean class="o.sf.integration.support.converter.MapMessageConverter">
            <property name="headerNames">
                <list>
                    <value>correlationId</value>
                    <value>sequenceNumber</value>
                    <value>sequenceSize</value>
                </list>
            </property>
        </bean>
    </constructor-arg>
</bean>

<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />

A message sent with the preceding configuration, with a payload of 'something' would appear on the wire as follows:

{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}