The Spring Integration Message
is a generic container for data. Any object can
be provided as the payload, and each Message
also includes headers containing
user-extensible properties as key-value pairs.
Here is the definition of the Message
interface:
public interface Message<T> { T getPayload(); MessageHeaders getHeaders(); }
The Message
is obviously a very important part of the API. By encapsulating the
data in a generic wrapper, the messaging system can pass it around without any knowledge of the data's type. As
an application evolves to support new types, or when the types themselves are modified and/or extended, the
messaging system will not be affected by such changes. On the other hand, when some component in the messaging
system does require access to information about the Message
,
such metadata can typically be stored to and retrieved from the metadata in the Message Headers.
Just as Spring Integration allows any Object to be used as the payload of a Message, it also supports any Object
types as header values. In fact, the MessageHeaders
class implements the
java.util.Map interface:
public final class MessageHeaders implements Map<String, Object>, Serializable { ... }
Note | |
---|---|
Even though the MessageHeaders implements Map, it is effectively a read-only implementation. Any attempt to
put a value in the Map will result in an UnsupportedOperationException .
The same applies for remove and clear. Since Messages may be passed to
multiple consumers, the structure of the Map cannot be modified. Likewise, the Message's payload Object can not
be set after the initial creation. However, the mutability of the header values themselves
(or the payload Object) is intentionally left as a decision for the framework user.
|
As an implementation of Map, the headers can obviously be retrieved by calling get(..)
with the name of the header. Alternatively, you can provide the expected Class as an
additional parameter. Even better, when retrieving one of the pre-defined values, convenient getters are
available. Here is an example of each of these three options:
Object someValue = message.getHeaders().get("someKey"); CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class); Long timestamp = message.getHeaders().getTimestamp();
The following Message headers are pre-defined:
Table 4.1. Pre-defined Message Headers
Header Name | Header Type | Usage |
---|---|---|
MessageHeaders.ID | java.util.UUID | An identifier for this message instance. Changes each time a message is mutated. |
MessageHeaders.TIMESTAMP | java.lang.Long | The time the message was created. Changes each time a message is mutated. |
MessageHeaders.REPLY_CHANNEL | java.lang.Object (String or MessageChannel) | A channel to which a reply (if any) will be sent when no explicit output
channel is configured and there is no ROUTING_SLIP or the
ROUTING_SLIP is exhausted. If the value is a String
it must represent a bean name, or have been generated by a ChannelRegistry. |
MessageHeaders.ERROR_CHANNEL | java.lang.Object (String or MessageChannel) | A channel to which errors will be sent. If the value is a String
it must represent a bean name, or have been generated by a ChannelRegistry. |
Many inbound and outbound adapter implementations will also provide and/or expect certain headers, and additional
user-defined headers can also be configured. Constants for these headers can be found in those modules where such
headers exist, for example AmqpHeaders
, JmsHeaders
etc.
Starting with Spring Framework 4.0 and Spring Integration 4.0, the core Messaging abstraction has been moved to
the spring-messaging module and the new MessageHeaderAccessor
API
has been introduced to provide additional abstraction over Messaging implementations. All (core) Spring
Integration specific Message Headers constants are now declared in the
IntegrationMessageHeaderAccessor
class:
Table 4.2. Pre-defined Message Headers
Header Name | Header Type | Usage |
---|---|---|
IntegrationMessageHeaderAccessor.CORRELATION_ID | java.lang.Object | Used to correlate two or more messages. |
IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER | java.lang.Integer | Usually a sequence number with a group of messages with a
SEQUENCE_SIZE but can also
be used in a <resequencer/> to resequence an unbounded
group of messages. |
IntegrationMessageHeaderAccessor.SEQUENCE_SIZE | java.lang.Integer | The number of messages within a group of correlated messages. |
IntegrationMessageHeaderAccessor.EXPIRATION_DATE | java.lang.Long | Indicates when a message is expired. Not used by the framework directly
but can be set with a header enricher and used in a <filter/>
configured with an UnexpiredMessageSelector . |
IntegrationMessageHeaderAccessor.PRIORITY | java.lang.Integer | Message priority; for example within a PriorityChannel |
IntegrationMessageHeaderAccessor.DUPLICATE_MESSAGE | java.lang.Boolean | True if a message was detected as a duplicate by an idempotent receiver interceptor. See Section 7.7.7, “Idempotent Receiver Enterprise Integration Pattern”. |
Convenient typed getters for some of these headers are provided on the
IntegrationMessageHeaderAccessor
class:
IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message); int sequenceNumber = accessor.getSequenceNumber(); Object correlationId = accessor.getCorrelationId(); ...
The following headers also appear in the
IntegrationMessageHeaderAccessor
but are generally not used by
user code; their inclusion here is for completeness:
Table 4.3. Pre-defined Message Headers
Header Name | Header Type | Usage |
---|---|---|
IntegrationMessageHeaderAccessor.SEQUENCE_DETAILS | java.util.List< List<Object>> | A stack of correlation data used when nested correlation is needed (e.g.
splitter->...->splitter->...->aggregator->...->aggregator ). |
IntegrationMessageHeaderAccessor.ROUTING_SLIP | java.util.Map< List<Object>, Integer> | See the section called “Routing Slip”. |
When a message transitions through an application, each time it is
mutated (e.g. by a transformer) a new message id is assigned. The message id is
a UUID
. Beginning with Spring Integration 3.0,
the default strategy used for id generation is more efficient than the previous
java.util.UUID.randomUUID()
implementation. It uses simple random
numbers based on a secure random seed, instead of creating a secure random
number each time.
A different UUID generation strategy can be selected by declaring a bean that implements
org.springframework.util.IdGenerator
in the application context.
Important | |
---|---|
Only one UUID generation strategy can be used in a classloader. This means that if
two or more application contexts are running in the same classloader, they will share
the same strategy. If one of the contexts changes the strategy, it will be used by
all contexts. If two or more contexts in the same classloader declare a bean of type
org.springframework.util.IdGenerator , they must all be an instance
of the same class, otherwise the context attempting to replace a custom strategy will
fail to initialize. If the strategy is the same, but parameterized, the strategy in the
first context to initialize will be used.
|
In addition to the default strategy, two additional IdGenerators
are provided; org.springframework.util.JdkIdGenerator
uses the previous
UUID.randomUUID()
mechanism;
org.springframework.integration.support.IdGenerators.SimpleIncrementingIdGenerator
can be used in cases where a UUID is not really needed and a simple incrementing
value is sufficient.
The base implementation of the Message
interface is
GenericMessage<T>
, and it provides two constructors:
new GenericMessage<T>(T payload); new GenericMessage<T>(T payload, Map<String, Object> headers)
When a Message is created, a random unique id will be generated. The constructor that accepts a Map of headers will copy the provided headers to the newly created Message.
There is also a convenient implementation of Message
designed to communicate
error conditions. This implementation takes Throwable
object as its payload:
ErrorMessage message = new ErrorMessage(someThrowable);
Throwable t = message.getPayload();
Notice that this implementation takes advantage of the fact that the GenericMessage
base class is parameterized. Therefore, as shown in both examples, no casting is necessary when retrieving
the Message payload Object.
You may notice that the Message interface defines retrieval methods for its payload and headers but no setters.
The reason for this is that a Message cannot be modified after its initial creation. Therefore, when a Message
instance is sent to multiple consumers (e.g. through a Publish Subscribe Channel), if one of those consumers
needs to send a reply with a different payload type, it will need to create a new Message. As a result, the
other consumers are not affected by those changes. Keep in mind, that multiple consumers may access the same
payload instance or header value, and whether such an instance is itself immutable is a decision left to the
developer. In other words, the contract for Messages is similar to that of an
unmodifiable Collection, and the MessageHeaders' map further exemplifies that; even though
the MessageHeaders class implements java.util.Map
, any attempt to invoke a
put operation (or 'remove' or 'clear') on the MessageHeaders will result in an
UnsupportedOperationException
.
Rather than requiring the creation and population of a Map to pass into the GenericMessage constructor, Spring
Integration does provide a far more convenient way to construct Messages: MessageBuilder
.
The MessageBuilder provides two factory methods for creating Messages from either an existing Message or with a
payload Object. When building from an existing Message, the headers and payload of that
Message will be copied to the new Message:
Message<String> message1 = MessageBuilder.withPayload("test") .setHeader("foo", "bar") .build(); Message<String> message2 = MessageBuilder.fromMessage(message1).build(); assertEquals("test", message2.getPayload()); assertEquals("bar", message2.getHeaders().get("foo"));
If you need to create a Message with a new payload but still want to copy the headers from an existing Message, you can use one of the 'copy' methods.
Message<String> message3 = MessageBuilder.withPayload("test3") .copyHeaders(message1.getHeaders()) .build(); Message<String> message4 = MessageBuilder.withPayload("test4") .setHeader("foo", 123) .copyHeadersIfAbsent(message1.getHeaders()) .build(); assertEquals("bar", message3.getHeaders().get("foo")); assertEquals(123, message4.getHeaders().get("foo"));
Notice that the copyHeadersIfAbsent
does not overwrite existing values. Also, in the
second example above, you can see how to set any user-defined header with setHeader
.
Finally, there are set methods available for the predefined headers as well as a non-destructive method for
setting any header (MessageHeaders also defines constants for the pre-defined header names).
Message<Integer> importantMessage = MessageBuilder.withPayload(99) .setPriority(5) .build(); assertEquals(5, importantMessage.getHeaders().getPriority()); Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage) .setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2) .build(); assertEquals(2, lessImportantMessage.getHeaders().getPriority());
The priority
header is only considered when using a PriorityChannel
(as described in the next chapter). It is defined as java.lang.Integer.