Message

The Spring Integration Message is a generic container for data. Any object can be provided as the payload, and each Message instance includes headers containing user-extensible properties as key-value pairs.

The Message Interface

The following listing shows the definition of the Message interface:

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();

}

The Message interface is a core part of the API. By encapsulating the data in a generic wrapper, the messaging system can pass it around without any knowledge of the data’s type. As an application evolves to support new types or when the types themselves are modified or extended, the messaging system is not affected. On the other hand, when some component in the messaging system does require access to information about the Message, such metadata can typically be stored to and retrieved from the metadata in the message headers.

Message Headers

Just as Spring Integration lets any Object be used as the payload of a Message, it also supports any Object types as header values. In fact, the MessageHeaders class implements the java.util.Map_ interface, as the following class definition shows:

public final class MessageHeaders implements Map<String, Object>, Serializable {
  ...
}
Even though the MessageHeaders class implements Map, it is effectively a read-only implementation. Any attempt to put a value in the Map results in an UnsupportedOperationException. The same applies for remove and clear. Since messages may be passed to multiple consumers, the structure of the Map cannot be modified. Likewise, the message’s payload Object can not be set after the initial creation. However, the mutability of the header values themselves (or the payload Object) is intentionally left as a decision for the framework user.

As an implementation of Map, the headers can be retrieved by calling get(..) with the name of the header. Alternatively, you can provide the expected Class as an additional parameter. Even better, when retrieving one of the pre-defined values, convenient getters are available. The following example shows each of these three options:

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

The following table describes the pre-defined message headers:

Table 1. Pre-defined Message Headers
Header Name Header Type Usage
 MessageHeaders.ID
 java.util.UUID

An identifier for this message instance. Changes each time a message is mutated.

 MessageHeaders.
TIMESTAMP
 java.lang.Long

The time the message was created. Changes each time a message is mutated.

 MessageHeaders.
REPLY_CHANNEL
 java.lang.Object
(String or
MessageChannel)

A channel to which a reply (if any) is sent when no explicit output channel is configured and there is no ROUTING_SLIP or the ROUTING_SLIP is exhausted. If the value is a String, it must represent a bean name or have been generated by a ChannelRegistry.

 MessageHeaders.
ERROR_CHANNEL
 java.lang.Object
(String or
MessageChannel)

A channel to which errors are sent. If the value is a String, it must represent a bean name or have been generated by a ChannelRegistry.

Many inbound and outbound adapter implementations also provide or expect certain headers, and you can configure additional user-defined headers. Constants for these headers can be found in those modules where such headers exist — for example. AmqpHeaders, JmsHeaders, and so on.

MessageHeaderAccessor API

Starting with Spring Framework 4.0 and Spring Integration 4.0, the core messaging abstraction has been moved to the spring-messaging module, and the MessageHeaderAccessor API has been introduced to provide additional abstraction over messaging implementations. All (core) Spring Integration-specific message headers constants are now declared in the IntegrationMessageHeaderAccessor class. The following table describes the pre-defined message headers:

Table 2. Pre-defined Message Headers
Header Name Header Type Usage
 IntegrationMessageHeaderAccessor.
CORRELATION_ID
 java.lang.Object

Used to correlate two or more messages.

 IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER
 java.lang.Integer

Usually a sequence number with a group of messages with a SEQUENCE_SIZE but can also be used in a <resequencer/> to resequence an unbounded group of messages.

 IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE
 java.lang.Integer

The number of messages within a group of correlated messages.

 IntegrationMessageHeaderAccessor.
EXPIRATION_DATE
 java.lang.Long

Indicates when a message is expired. Not used by the framework directly but can be set with a header enricher and used in a <filter/> that is configured with an UnexpiredMessageSelector.

 IntegrationMessageHeaderAccessor.
PRIORITY
 java.lang.Integer

Message priority — for example, within a PriorityChannel.

 IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE
 java.lang.Boolean

True if a message was detected as a duplicate by an idempotent receiver interceptor. See Idempotent Receiver Enterprise Integration Pattern.

 IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE
 java.io.Closeable

This header is present if the message is associated with a Closeable that should be closed when message processing is complete. An example is the Session associated with a streamed file transfer using FTP, SFTP, and so on.

 IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT
 java.lang.
AtomicInteger

If a message-driven channel adapter supports the configuration of a RetryTemplate, this header contains the current delivery attempt.

 IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK
 o.s.i.support.
Acknowledgment
Callback

If an inbound endpoint supports it, a call back to accept, reject, or requeue a message. See Deferred Acknowledgment Pollable Message Source and MQTT Manual Acks.

Convenient typed getters for some of these headers are provided on the IntegrationMessageHeaderAccessor class, as the following example shows:

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...

The following table describes headers that also appear in the IntegrationMessageHeaderAccessor but are generally not used by user code (that is, they are generally used by internal parts of Spring Integration — their inclusion here is for completeness):

Table 3. Pre-defined Message Headers
Header Name Header Type Usage
 IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS
 java.util.
List<List<Object>>

A stack of correlation data used when nested correlation is needed (for example, splitter→…​→splitter→…​→aggregator→…​→aggregator).

 IntegrationMessageHeaderAccessor.
ROUTING_SLIP
 java.util.
Map<List<Object>, Integer>

See Routing Slip.

Message ID Generation

When a message transitions through an application, each time it is mutated (for example, by a transformer) a new message ID is assigned. The message ID is a UUID. Beginning with Spring Integration 3.0, the default strategy used for IS generation is more efficient than the previous java.util.UUID.randomUUID() implementation. It uses simple random numbers based on a secure random seed instead of creating a secure random number each time.

A different UUID generation strategy can be selected by declaring a bean that implements org.springframework.util.IdGenerator in the application context.

Only one UUID generation strategy can be used in a classloader. This means that, if two or more application contexts run in the same classloader, they share the same strategy. If one of the contexts changes the strategy, it is used by all contexts. If two or more contexts in the same classloader declare a bean of type org.springframework.util.IdGenerator, they must all be an instance of the same class. Otherwise, the context attempting to replace a custom strategy fails to initialize. If the strategy is the same, but parameterized, the strategy in the first context to be initialized is used.

In addition to the default strategy, two additional IdGenerators are provided. org.springframework.util.JdkIdGenerator uses the previous UUID.randomUUID() mechanism. You can use o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator when a UUID is not really needed and a simple incrementing value is sufficient.

Read-only Headers

The MessageHeaders.ID and MessageHeaders.TIMESTAMP are read-only headers and cannot be overridden.

Since version 4.3.2, the MessageBuilder provides the readOnlyHeaders(String…​ readOnlyHeaders) API to customize a list of headers that should not be copied from an upstream Message. Only the MessageHeaders.ID and MessageHeaders.TIMESTAMP are read only by default. The global spring.integration.readOnly.headers property (see Global Properties) is provided to customize DefaultMessageBuilderFactory for framework components. This can be useful when you would like do not populate some out-of-the-box headers, such as contentType by the ObjectToJsonTransformer (see JSON Transformers).

When you try to build a new message using MessageBuilder, this kind of header is ignored and a particular INFO message is emitted to logs.

Starting with version 5.0, Messaging Gateway, Header Enricher, Content Enricher and Header Filter do not let you configure the MessageHeaders.ID and MessageHeaders.TIMESTAMP header names when DefaultMessageBuilderFactory is used, and they throw BeanInitializationException.

Header Propagation

When messages are processed (and modified) by message-producing endpoints (such as a service activator), in general, inbound headers are propagated to the outbound message. One exception to this is a transformer, when a complete message is returned to the framework. In that case, the user code is responsible for the entire outbound message. When a transformer just returns the payload, the inbound headers are propagated. Also, a header only propagated if it does not already exist in the outbound message, letting you change header values as needed.

Starting with version 4.3.10, you can configure message handlers (that modify messages and produce output) to suppress the propagation of specific headers. To configure the header(s) you do not want to be copied, call the setNotPropagatedHeaders() or addNotPropagatedHeaders() methods on the MessageProducingMessageHandler abstract class.

You can also globally suppress propagation of specific message headers by setting the readOnlyHeaders property in META-INF/spring.integration.properties to a comma-delimited list of headers.

Starting with version 5.0, the setNotPropagatedHeaders() implementation on the AbstractMessageProducingHandler applies simple patterns (xxx*, *xxx, *xxx*, or xxx*yyy) to allow filtering headers with a common suffix or prefix. See PatternMatchUtils Javadoc for more information. When one of the patterns is * (asterisk), no headers are propagated. All other patterns are ignored. In that case, the service activator behaves the same way as a transformer and any required headers must be supplied in the Message returned from the service method. The notPropagatedHeaders() option is available in the ConsumerEndpointSpec for the Java DSL It is also available for XML configuration of the <service-activator> component as a not-propagated-headers attribute.

Header propagation suppression does not apply to those endpoints that do not modify the message, such as bridges and routers.

Message Implementations

The base implementation of the Message interface is GenericMessage<T>, and it provides two constructors, shown in the following listing:

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

When a Message is created, a random unique ID is generated. The constructor that accepts a Map of headers copies the provided headers to the newly created Message.

There is also a convenient implementation of Message designed to communicate error conditions. This implementation takes a Throwable object as its payload, as the following example shows:

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

Note that this implementation takes advantage of the fact that the GenericMessage base class is parameterized. Therefore, as shown in both examples, no casting is necessary when retrieving the Message payload Object.

The mentioned Message class implementations are immutable. In some cases, when mutability is not a concern and the logic of application is well-designed to avoid concurrent modifications, a MutableMessage can be used.

The MessageBuilder Helper Class

You may notice that the Message interface defines retrieval methods for its payload and headers but provides no setters. The reason for this is that a Message cannot be modified after its initial creation. Therefore, when a Message instance is sent to multiple consumers (for example, through a publish-subscribe Channel), if one of those consumers needs to send a reply with a different payload type, it must create a new Message. As a result, the other consumers are not affected by those changes. Keep in mind that multiple consumers may access the same payload instance or header value, and whether such an instance is itself immutable is a decision left to you. In other words, the contract for Message instances is similar to that of an unmodifiable Collection, and the MessageHeaders map further exemplifies that. Even though the MessageHeaders class implements java.util.Map, any attempt to invoke a put operation (or 'remove' or 'clear') on a MessageHeaders instance results in an UnsupportedOperationException.

Rather than requiring the creation and population of a Map to pass into the GenericMessage constructor, Spring Integration does provide a far more convenient way to construct Messages: MessageBuilder. The MessageBuilder provides two factory methods for creating Message instances from either an existing Message or with a payload Object. When building from an existing Message, the headers and payload of that Message are copied to the new Message, as the following example shows:

Message<String> message1 = MessageBuilder.withPayload("test")
        .setHeader("foo", "bar")
        .build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

If you need to create a Message with a new payload but still want to copy the headers from an existing Message, you can use one of the 'copy' methods, as the following example shows:

Message<String> message3 = MessageBuilder.withPayload("test3")
        .copyHeaders(message1.getHeaders())
        .build();

Message<String> message4 = MessageBuilder.withPayload("test4")
        .setHeader("foo", 123)
        .copyHeadersIfAbsent(message1.getHeaders())
        .build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

Note that the copyHeadersIfAbsent method does not overwrite existing values. Also, in the preceding example, you can see how to set any user-defined header with setHeader. Finally, there are set methods available for the predefined headers as well as a non-destructive method for setting any header (MessageHeaders also defines constants for the pre-defined header names).

You can also use MessageBuilder to set the priority of messages, as the following example shows:

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
        .setPriority(5)
        .build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
        .setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
        .build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

The priority header is considered only when using a PriorityChannel (as described in the next chapter). It is defined as a java.lang.Integer.

The MutableMessageBuilder is provided to deal with MutableMessage instances. The logic of this class is to create a MutableMessage or leave it as is and mutate its content via builder methods. This way there is a slight performance gain in the running application, when immutability is not a concern of message exchanges.

Starting with version 6.4, a BaseMessageBuilder class is extracted from the MessageBuilder to simplify an extension for the default message building logic. For example, together with a custom MessageBuilderFactory, a custom BaseMessageBuilder implementation could be used globally in the application context to provide custom Message instances. In particular, the GenericMessage.toString() method can be overridden to hide sensitive information from payload and headers when such a message is logged.

The MessageBuilderFactory abstraction

The MessageBuilderFactory bean with IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME is registered globally into an application context and used everywhere in the framework to create Message instances. By default, it is an instance of DefaultMessageBuilderFactory. Out of the box, the framework also provides a MutableMessageBuilderFactory to create MutableMessage instances in the framework components instead. To customize Message instances creation, a MessageBuilderFactory bean with IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME has to be provided in the target application context to override a default one. For example, a custom MessageBuilderFactory could be registered for an implementation of the BaseMessageBuilder where we would like to provide a GenericMessage extension with overridden toString() to to hide sensitive information from payload and headers when such a message is logged.

Some quick implementation of these classes to demonstrate a personal identifiable information mitigation can be like this:

class PiiMessageBuilderFactory implements MessageBuilderFactory {

	@Override
	public <T> PiiMessageBuilder<T> fromMessage(Message<T> message) {
	    return new PiiMessageBuilder<>(message.getPayload(), message);
	}

	@Override
	public <T> PiiMessageBuilder<T> withPayload(T payload) {
	    return new PiiMessageBuilder<>(payload, null);
	}

}

class PiiMessageBuilder<P> extends BaseMessageBuilder<P, PiiMessageBuilder<P>> {

    public PiiMessageBuilder(P payload, @Nullable Message<P> originalMessage) {
        super(payload, originalMessage);
    }

    @Override
    public Message<P> build() {
        return new PiiMessage<>(getPayload(), getHeaders());
    }

}

class PiiMessage<P> extends GenericMessage<P> {

    @Serial
    private static final long serialVersionUID = -354503673433669578L;

    public PiiMessage(P payload, Map<String, Object> headers) {
        super(payload, headers);
    }

    @Override
    public String toString() {
        return "PiiMessage [payload=" + getPayload() + ", headers=" + maskHeaders(getHeaders()) + ']';
    }

    private static Map<String, Object> maskHeaders(Map<String, Object> headers) {
        return headers.entrySet()
                .stream()
                .map((entry) -> entry.getKey().equals("password") ? Map.entry(entry.getKey(), "******") : entry)
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

}

The this PiiMessageBuilderFactory could be registered as a bean, and whenever the framework logs the message (e.g. in case of errorChannel), the password header will be masked.