Message Transformation
Transformer
Message transformers play a very important role in enabling the loose-coupling of message producers and message consumers.
Rather than requiring every message-producing component to know what type is expected by the next consumer, you can add transformers between those components.
Generic transformers, such as one that converts a String
to an XML Document, are also highly reusable.
For some systems, it may be best to provide a canonical data model, but Spring Integration’s general philosophy is not to require any particular format. Rather, for maximum flexibility, Spring Integration aims to provide the simplest possible model for extension. As with the other endpoint types, the use of declarative configuration in XML or Java annotations enables simple POJOs to be adapted for the role of message transformers. The rest of this chapter describes these configuration options.
For the sake of maximizing flexibility, Spring does not require XML-based message payloads. Nevertheless, the framework does provide some convenient transformers for dealing with XML-based payloads if that is indeed the right choice for your application. For more information on those transformers, see XML Support - Dealing with XML Payloads. |
Configuring a Transformer with XML
The <transformer>
element is used to create a message-transforming endpoint.
In addition to input-channel
and output-channel
attributes, it requires a ref
attribute.
The ref
may either point to an object that contains the @Transformer
annotation on a single method (see Configuring a Transformer with Annotations), or it may be combined with an explicit method name value provided in the method
attribute.
<int:transformer id="testTransformer" ref="testTransformerBean" input-channel="inChannel"
method="transform" output-channel="outChannel"/>
<beans:bean id="testTransformerBean" class="org.foo.TestTransformer" />
Using a ref
attribute is generally recommended if the custom transformer handler implementation can be reused in other <transformer>
definitions.
However, if the custom transformer handler implementation should be scoped to a single definition of the <transformer>
, you can define an inner bean definition, as the following example shows:
<int:transformer id="testTransformer" input-channel="inChannel" method="transform"
output-channel="outChannel">
<beans:bean class="org.foo.TestTransformer"/>
</transformer>
Using both the ref attribute and an inner handler definition in the same <transformer> configuration is not allowed, as it creates an ambiguous condition and results in an exception being thrown.
|
If the ref attribute references a bean that extends AbstractMessageProducingHandler (such as transformers provided by the framework itself), the configuration is optimized by injecting the output channel into the handler directly.
In this case, each ref must be to a separate bean instance (or a prototype -scoped bean) or use the inner <bean/> configuration type.
If you inadvertently reference the same message handler from multiple beans, you get a configuration exception.
|
When using a POJO, the method that is used for transformation may expect either the Message
type or the payload type of inbound messages.
It may also accept message header values either individually or as a full map by using the @Header
and @Headers
parameter annotations, respectively.
The return value of the method can be any type.
If the return value is itself a Message
, that is passed along to the transformer’s output channel.
As of Spring Integration 2.0, a message transformer’s transformation method can no longer return null
.
Returning null
results in an exception, because a message transformer should always be expected to transform each source message into a valid target message.
In other words, a message transformer should not be used as a message filter, because there is a dedicated <filter>
option for that.
However, if you do need this type of behavior (where a component might return null
and that should not be considered an error), you could use a service activator.
Its requires-reply
value is false
by default, but that can be set to true
in order to have exceptions thrown for null
return values, as with the transformer.
Transformers and Spring Expression Language (SpEL)
Like routers, aggregators, and other components, as of Spring Integration 2.0, transformers can also benefit from SpEL support whenever transformation logic is relatively simple. The following example shows how to use a SpEL expression:
<int:transformer input-channel="inChannel"
output-channel="outChannel"
expression="payload.toUpperCase() + '- [' + T(System).currentTimeMillis() + ']'"/>
The preceding example transforms the payload without writing a custom transformer.
Our payload (assumed to be a String
) is upper-cased, concatenated with the current timestamp, and has some formatting applied.
Common Transformers
Spring Integration provides a few transformer implementations.
Object-to-String Transformer
Because it is fairly common to use the toString()
representation of an Object
, Spring Integration provides an ObjectToStringTransformer
whose output is a Message
with a String payload
.
That String
is the result of invoking the toString()
operation on the inbound Message’s payload.
The following example shows how to declare an instance of the object-to-string transformer:
<int:object-to-string-transformer input-channel="in" output-channel="out"/>
A potential use for this transformer would be sending some arbitrary object to the 'outbound-channel-adapter' in the file
namespace.
Whereas that channel adapter only supports String
, byte-array, or java.io.File
payloads by default, adding this transformer immediately before the adapter handles the necessary conversion.
That works fine as long as the result of the toString()
call is what you want to be written to the file.
Otherwise, you can provide a custom POJO-based transformer by using the generic 'transformer' element shown previously.
When debugging, this transformer is not typically necessary, since the logging-channel-adapter is capable of logging the message payload.
See Wire Tap for more detail.
|
The object-to-string transformer is very simple.
It invokes
For more sophistication (such as selection of the charset dynamically, at runtime), you can use a SpEL expression-based transformer instead, as the following example shows:
|
If you need to serialize an Object
to a byte array or deserialize a byte array back into an Object
, Spring Integration provides symmetrical serialization transformers.
These use standard Java serialization by default, but you can provide an implementation of Spring Serializer
or Deserializer
strategies by using the serializer
and deserializer
attributes, respectively.
The following example shows to use Spring’s serializer and deserializer:
<int:payload-serializing-transformer input-channel="objectsIn" output-channel="bytesOut"/>
<int:payload-deserializing-transformer input-channel="bytesIn" output-channel="objectsOut"
allow-list="com.mycom.*,com.yourcom.*"/>
When deserializing data from untrusted sources, you should consider adding a allow-list of package and class patterns.
By default, all classes are deserialized.
|
Object
-to-Map
and Map
-to-Object
Transformers
Spring Integration also provides Object
-to-Map
and Map
-to-Object
transformers, which use the JSON to serialize and de-serialize the object graphs.
The object hierarchy is introspected to the most primitive types (String
, int
, and so on).
The path to this type is described with SpEL, which becomes the key
in the transformed Map
.
The primitive type becomes the value.
Consider the following example:
public class Parent{
private Child child;
private String name;
// setters and getters are omitted
}
public class Child{
private String name;
private List<String> nickNames;
// setters and getters are omitted
}
The two classes in the preceding example are transformed to the following Map
:
{person.name=George, person.child.name=Jenna, person.child.nickNames[0]=Jen ...}
The JSON-based Map
lets you describe the object structure without sharing the actual types, which lets you restore and rebuild the object graph into a differently typed object graph, as long as you maintain the structure.
For example, the preceding structure could be restored back to the following object graph by using the Map
-to-Object
transformer:
public class Father {
private Kid child;
private String name;
// setters and getters are omitted
}
public class Kid {
private String name;
private List<String> nickNames;
// setters and getters are omitted
}
If you need to create a “structured” map, you can provide the flatten
attribute.
The default is 'true'.
If you set it to 'false', the structure is a Map
of Map
objects.
Consider the following example:
public class Parent {
private Child child;
private String name;
// setters and getters are omitted
}
public class Child {
private String name;
private List<String> nickNames;
// setters and getters are omitted
}
The two classes in the preceding example are transformed to the following Map
:
{name=George, child={name=Jenna, nickNames=[Bimbo, ...]}}
To configure these transformers, Spring Integration provides namespace support for Object-to-Map, as the following example shows:
<int:object-to-map-transformer input-channel="directInput" output-channel="output"/>
You can also set the flatten
attribute to false, as follows:
<int:object-to-map-transformer input-channel="directInput" output-channel="output" flatten="false"/>
Spring Integration provides namespace support for Map-to-Object, as the following example shows:
<int:map-to-object-transformer input-channel="input"
output-channel="output"
type="org.something.Person"/>
Alterately, you could use a ref
attribute and a prototype-scoped bean, as the following example shows:
<int:map-to-object-transformer input-channel="inputA"
output-channel="outputA"
ref="person"/>
<bean id="person" class="org.something.Person" scope="prototype"/>
The 'ref' and 'type' attributes are mutually exclusive.
Also, if you use the 'ref' attribute, you must point to a 'prototype' scoped bean.
Otherwise, a BeanCreationException is thrown.
|
Starting with version 5.0, you can supply the ObjectToMapTransformer
with a customized JsonObjectMapper
— for when you need special formats for dates or nulls for empty collections (and other uses).
See JSON Transformers for more information about JsonObjectMapper
implementations.
Stream Transformer
The StreamTransformer
transforms InputStream
payloads to a byte[]
( or a String
if a charset
is provided).
The following example shows how to use the stream-transformer
element in XML:
<int:stream-transformer input-channel="directInput" output-channel="output"/> <!-- byte[] -->
<int:stream-transformer id="withCharset" charset="UTF-8"
input-channel="charsetChannel" output-channel="output"/> <!-- String -->
The following example shows how to use the StreamTransformer
class and the @Transformer
annotation to configure a stream transformer in Java:
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public StreamTransformer streamToBytes() {
return new StreamTransformer(); // transforms to byte[]
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public StreamTransformer streamToString() {
return new StreamTransformer("UTF-8"); // transforms to String
}
JSON Transformers
Spring Integration provides Object-to-JSON and JSON-to-Object transformers. The following pair of examples show how to declare them in XML:
<int:object-to-json-transformer input-channel="objectMapperInput"/>
<int:json-to-object-transformer input-channel="objectMapperInput"
type="foo.MyDomainObject"/>
By default, the transformers in the preceding listing use a vanilla JsonObjectMapper
.
It is based on an implementation from the classpath.
You can provide your own custom JsonObjectMapper
implementation with appropriate options or based on a required library (such as GSON), as the following example shows:
<int:json-to-object-transformer input-channel="objectMapperInput"
type="something.MyDomainObject" object-mapper="customObjectMapper"/>
Beginning with version 3.0, the |
You may wish to consider using a FactoryBean
or a factory method to create the JsonObjectMapper
with the required characteristics.
The following example shows how to use such a factory:
public class ObjectMapperFactory {
public static Jackson2JsonObjectMapper getMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
return new Jackson2JsonObjectMapper(mapper);
}
}
The following example shows how to do the same thing in XML
<bean id="customObjectMapper" class="something.ObjectMapperFactory"
factory-method="getMapper"/>
Beginning with version 2.2, the It you wish to set the |
Beginning with version 3.0, the ObjectToJsonTransformer
adds headers, reflecting the source type, to the message.
Similarly, the JsonToObjectTransformer
can use those type headers when converting the JSON to an object.
These headers are mapped in the AMQP adapters so that they are entirely compatible with the Spring-AMQP JsonMessageConverter
.
This enables the following flows to work without any special configuration:
-
…→amqp-outbound-adapter---→
-
---→amqp-inbound-adapter→json-to-object-transformer→…
Where the outbound adapter is configured with a
JsonMessageConverter
and the inbound adapter uses the defaultSimpleMessageConverter
. -
…→object-to-json-transformer→amqp-outbound-adapter---→
-
---→amqp-inbound-adapter→…
Where the outbound adapter is configured with a
SimpleMessageConverter
and the inbound adapter uses the defaultJsonMessageConverter
. -
…→object-to-json-transformer→amqp-outbound-adapter---→
-
---→amqp-inbound-adapter→json-to-object-transformer→
Where both adapters are configured with a
SimpleMessageConverter
.
When using the headers to determine the type, you should not provide a class attribute, because it takes precedence over the headers.
|
In addition to JSON Transformers, Spring Integration provides a built-in #jsonPath
SpEL function for use in expressions.
For more information see Spring Expression Language (SpEL).
Since version 3.0, Spring Integration also provides a built-in #xpath
SpEL function for use in expressions.
For more information see #xpath SpEL Function.
Beginning with version 4.0, the ObjectToJsonTransformer
supports the resultType
property, to specify the node JSON representation.
The result node tree representation depends on the implementation of the provided JsonObjectMapper
.
By default, the ObjectToJsonTransformer
uses a Jackson2JsonObjectMapper
and delegates the conversion of the object to the node tree to the ObjectMapper#valueToTree
method.
The node JSON representation provides efficiency for using the JsonPropertyAccessor
when the downstream message flow uses SpEL expressions with access to the properties of the JSON data.
See Property Accessors for more information.
Beginning with version 5.1, the resultType
can be configured as BYTES
to produce a message with the byte[]
payload for convenience when working with downstream handlers which operate with this data type.
Starting with version 5.2, the JsonToObjectTransformer
can be configured with a ResolvableType
to support generics during deserialization with the target JSON processor.
Also this component now consults request message headers first for the presence of the JsonHeaders.RESOLVABLE_TYPE
or JsonHeaders.TYPE_ID
and falls back to the configured type otherwise.
The ObjectToJsonTransformer
now also populates a JsonHeaders.RESOLVABLE_TYPE
header based on the request message payload for any possible downstream scenarios.
Starting with version 5.2.6, the JsonToObjectTransformer
can be supplied with a valueTypeExpression
to resolve a ResolvableType
for the payload to convert from JSON at runtime against the request message.
By default it consults JsonHeaders
in the request message.
If this expression returns null
or ResolvableType
building throws a ClassNotFoundException
, the transformer falls back to the provided targetType
.
This logic is present as an expression because JsonHeaders
may not have real class values, but rather some type ids which have to be mapped to target classes according some external registry.
Apache Avro Transformers
Version 5.2 added simple transformers to transform to/from Apache Avro.
They are unsophisticated in that there is no schema registry; the transformers simply use the schema embedded in the SpecificRecord
implementation generated from the Avro schema.
Messages sent to the SimpleToAvroTransformer
must have a payload that implements SpecificRecord
; the transformer can handle multiple types.
The SimpleFromAvroTransformer
must be configured with a SpecificRecord
class which is used as the default type to deserialize.
You can also specify a SpEL expression to determine the type to deserialize using the setTypeExpression
method.
The default SpEL expression is headers[avro_type]
(AvroHeaders.TYPE
) which, by default, is populated by the SimpleToAvroTransformer
with the fully qualified class name of the source class.
If the expression returns null
, the defaultType
is used.
The SimpleToAvroTransformer
also has a setTypeExpression
method.
This allows decoupling of the producer and consumer where the sender can set the header to some token representing the type and the consumer then maps that token to a type.
Configuring a Transformer with Annotations
You can add the @Transformer
annotation to methods that expect either the Message
type or the message payload type.
The return value is handled in the exact same way as described earlier in the section describing the <transformer>
element.
The following example shows how to use the @Transformer
annotation to transform a String
into an Order
:
@Transformer
Order generateOrder(String productId) {
return new Order(productId);
}
Transformer methods can also accept the @Header
and @Headers
annotations, as documented in Annotation Support
.
The following examples shows how to use the @Header
annotation:
@Transformer
Order generateOrder(String productId, @Header("customerName") String customer) {
return new Order(productId, customer);
}
See also Advising Endpoints Using Annotations.
Header Filter
Sometimes, your transformation use case might be as simple as removing a few headers. For such a use case, Spring Integration provides a header filter that lets you specify certain header names that should be removed from the output message (for example, removing headers for security reasons or a value that was needed only temporarily). Basically, the header filter is the opposite of the header enricher. The latter is discussed in Header Enricher. The following example defines a header filter:
<int:header-filter input-channel="inputChannel"
output-channel="outputChannel" header-names="lastName, state"/>
As you can see, configuration of a header filter is quite simple.
It is a typical endpoint with input and output channels and a header-names
attribute.
That attribute accepts the names of the headers (delimited by commas if there are multiple) that need to be removed.
So, in the preceding example, the headers named 'lastName' and 'state' are not present on the outbound message.
Codec-Based Transformers
See Codec.
Content Enricher
At times, you may have a requirement to enhance a request with more information than was provided by the target system. The data enricher pattern describes various scenarios as well as the component (Enricher) that lets you address such requirements.
The Spring Integration Core
module includes two enrichers:
It also includes three adapter-specific header enrichers:
See the adapter-specific sections of this reference manual to learn more about those adapters.
For more information regarding expressions support, see Spring Expression Language (SpEL).
Header Enricher
If you need do nothing more than add headers to a message and the headers are not dynamically determined by the message content, referencing a custom implementation of a transformer may be overkill.
For that reason, Spring Integration provides support for the header enricher pattern.
It is exposed through the <header-enricher>
element.
The following example shows how to use it:
<int:header-enricher input-channel="in" output-channel="out">
<int:header name="foo" value="123"/>
<int:header name="bar" ref="someBean"/>
</int:header-enricher>
The header enricher also provides helpful sub-elements to set well known header names, as the following example shows:
<int:header-enricher input-channel="in" output-channel="out">
<int:error-channel ref="applicationErrorChannel"/>
<int:reply-channel ref="quoteReplyChannel"/>
<int:correlation-id value="123"/>
<int:priority value="HIGHEST"/>
<routing-slip value="channel1; routingSlipRoutingStrategy; request.headers[myRoutingSlipChannel]"/>
<int:header name="bar" ref="someBean"/>
</int:header-enricher>
The preceding configuration shows that, for well known headers (such as errorChannel
, correlationId
, priority
, replyChannel
, routing-slip
, and others), instead of using generic <header>
sub-elements where you would have to provide both header 'name' and 'value', you can use convenient sub-elements to set those values directly.
Starting with version 4.1, the header enricher provides a routing-slip
sub-element.
See Routing Slip for more information.
POJO Support
Often, a header value cannot be defined statically and has to be determined dynamically based on some content in the message.
That is why the header enricher lets you also specify a bean reference by using the ref
and method
attributes.
The specified method calculates the header value.
Consider the following configuration and a bean with a method that modifies a String
:
<int:header-enricher input-channel="in" output-channel="out">
<int:header name="something" method="computeValue" ref="myBean"/>
</int:header-enricher>
<bean id="myBean" class="thing1.thing2.MyBean"/>
public class MyBean {
public String computeValue(String payload){
return payload.toUpperCase() + "_US";
}
}
You can also configure your POJO as an inner bean, as the following example shows:
<int:header-enricher input-channel="inputChannel" output-channel="outputChannel">
<int:header name="some_header">
<bean class="org.MyEnricher"/>
</int:header>
</int:header-enricher>
You can similarly point to a Groovy script, as the following example shows:
<int:header-enricher input-channel="inputChannel" output-channel="outputChannel">
<int:header name="some_header">
<int-groovy:script location="org/SampleGroovyHeaderEnricher.groovy"/>
</int:header>
</int:header-enricher>
SpEL Support
In Spring Integration 2.0, we introduced the convenience of the Spring Expression Language (SpEL) to help configure many different components. The header enricher is one of them. Look again at the POJO example shown earlier. You can see that the computation logic to determine the header value is pretty simple. A natural question would be: "Is there an even simpler way to accomplish this?". That is where SpEL shows its true power. Consider the following example:
<int:header-enricher input-channel="in" output-channel="out">
<int:header name="foo" expression="payload.toUpperCase() + '_US'"/>
</int:header-enricher>
By using SpEL for such simple cases, you no longer have to provide a separate class and configure it in the application context.
All you need do is configured the expression
attribute with a valid SpEL expression.
The 'payload' and 'headers' variables are bound to the SpEL evaluation context, giving you full access to the incoming message.
Configuring a Header Enricher with Java Configuration
The following two examples show how to use Java Configuration for header enrichers:
@Bean
@Transformer(inputChannel = "enrichHeadersChannel", outputChannel = "emailChannel")
public HeaderEnricher enrichHeaders() {
Map<String, ? extends HeaderValueMessageProcessor<?>> headersToAdd =
Collections.singletonMap("emailUrl",
new StaticHeaderValueMessageProcessor<>(this.imapUrl));
HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
return enricher;
}
@Bean
@Transformer(inputChannel="enrichHeadersChannel", outputChannel="emailChannel")
public HeaderEnricher enrichHeaders() {
Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
headersToAdd.put("emailUrl", new StaticHeaderValueMessageProcessor<String>(this.imapUrl));
Expression expression = new SpelExpressionParser().parseExpression("payload.from[0].toString()");
headersToAdd.put("from",
new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
return enricher;
}
The first example adds a single literal header. The second example adds two headers, a literal header and one based on a SpEL expression.
Configuring a Header Enricher with the Java DSL
The following example shows Java DSL Configuration for a header enricher:
@Bean
public IntegrationFlow enrichHeadersInFlow() {
return f -> f
...
.enrichHeaders(h -> h.header("emailUrl", this.emailUrl)
.headerExpression("from", "payload.from[0].toString()"))
.handle(...);
}
Header Channel Registry
Starting with Spring Integration 3.0, a new sub-element <int:header-channels-to-string/>
is available.
It has no attributes.
This new sub-element converts existing replyChannel
and errorChannel
headers (when they are a MessageChannel
) to a String
and stores the channels in a registry for later resolution, when it is time to send a reply or handle an error.
This is useful for cases where the headers might be lost — for example, when serializing a message into a message store or when transporting the message over JMS.
If the header does not already exist or it is not a MessageChannel
, no changes are made.
Using this functionality requires the presence of a HeaderChannelRegistry
bean.
By default, the framework creates a DefaultHeaderChannelRegistry
with the default expiry (60 seconds).
Channels are removed from the registry after this time.
To change this behavior, define a bean with an id
of integrationHeaderChannelRegistry
and configure the required default delay by using a constructor argument (in milliseconds).
Since version 4.1, you can set a property called removeOnGet
to true
on the <bean/>
definition, and the mapping entry is removed immediately on first use.
This might be useful in a high-volume environment and when the channel is only used once, rather than waiting for the reaper to remove it.
The HeaderChannelRegistry
has a size()
method to determine the current size of the registry.
The runReaper()
method cancels the current scheduled task and runs the reaper immediately.
The task is then scheduled to run again based on the current delay.
These methods can be invoked directly by getting a reference to the registry, or you can send a message with, for example, the following content to a control bus:
"@integrationHeaderChannelRegistry.runReaper()"
This sub-element is a convenience, and is the equivalent of specifying the following configuration:
<int:reply-channel
expression="@integrationHeaderChannelRegistry.channelToChannelName(headers.replyChannel)"
overwrite="true" />
<int:error-channel
expression="@integrationHeaderChannelRegistry.channelToChannelName(headers.errorChannel)"
overwrite="true" />
Starting with version 4.1, you can now override the registry’s configured reaper delay so that the the channel mapping is retained for at least the specified time, regardless of the reaper delay. The following example shows how to do so:
<int:header-enricher input-channel="inputTtl" output-channel="next">
<int:header-channels-to-string time-to-live-expression="120000" />
</int:header-enricher>
<int:header-enricher input-channel="inputCustomTtl" output-channel="next">
<int:header-channels-to-string
time-to-live-expression="headers['channelTTL'] ?: 120000" />
</int:header-enricher>
In the first case, the time to live for every header channel mapping will be two minutes. In the second case, the time to live is specified in the message header and uses an Elvis operator to use two minutes if there is no header.
Payload Enricher
In certain situations, the header enricher, as discussed earlier, may not be sufficient and payloads themselves may have to be enriched with additional information. For example, order messages that enter the Spring Integration messaging system have to look up the order’s customer based on the provided customer number and then enrich the original payload with that information.
Spring Integration 2.1 introduced the payload enricher.
The payload enricher defines an endpoint that passes a Message
to the exposed request channel and then expects a reply message.
The reply message then becomes the root object for evaluation of expressions to enrich the target payload.
The payload enricher provides full XML namespace support through the enricher
element.
In order to send request messages, the payload enricher has a request-channel
attribute that lets you dispatch messages to a request channel.
Basically, by defining the request channel, the payload enricher acts as a gateway, waiting for the message sent to the request channel to return. The enricher then augments the message’s payload with the data provided by the reply message.
When sending messages to the request channel, you also have the option to send only a subset of the original payload by using the request-payload-expression
attribute.
The enriching of payloads is configured through SpEL expressions, providing a maximum degree of flexibility.
Therefore, you can not only enrich payloads with direct values from the reply channel’s Message
, but you can use SpEL expressions to extract a subset from that message or to apply additional inline transformations, letting you further manipulate the data.
If you need only to enrich payloads with static values, you need not provide the request-channel
attribute.
Enrichers are a variant of transformers. In many cases, you could use a payload enricher or a generic transformer implementation to add additional data to your message payloads. You should familiarize yourself with all transformation-capable components that are provided by Spring Integration and carefully select the implementation that semantically fits your business case best. |
Configuration
The following example shows all available configuration options for the payload enricher:
<int:enricher request-channel="" (1)
auto-startup="true" (2)
id="" (3)
order="" (4)
output-channel="" (5)
request-payload-expression="" (6)
reply-channel="" (7)
error-channel="" (8)
send-timeout="" (9)
should-clone-payload="false"> (10)
<int:poller></int:poller> (11)
<int:property name="" expression="" null-result-expression="'Could not determine the name'"/> (12)
<int:property name="" value="23" type="java.lang.Integer" null-result-expression="'0'"/>
<int:header name="" expression="" null-result-expression=""/> (13)
<int:header name="" value="" overwrite="" type="" null-result-expression=""/>
</int:enricher>
1 | Channel to which a message is sent to get the data to use for enrichment. Optional. |
2 | Lifecycle attribute signaling whether this component should be started during the application context startup. Defaults to true. Optional. |
3 | ID of the underlying bean definition, which is either an EventDrivenConsumer or a PollingConsumer .
Optional. |
4 | Specifies the order for invocation when this endpoint is connected as a subscriber to a channel. This is particularly relevant when that channel is using a “failover” dispatching strategy. It has no effect when this endpoint is itself a polling consumer for a channel with a queue. Optional. |
5 | Identifies the message channel where a message is sent after it is being processed by this endpoint. Optional. |
6 | By default, the original message’s payload is used as payload that is sent to the request-channel .
By specifying a SpEL expression as the value for the request-payload-expression attribute, you can use a subset of the original payload, a header value, or any other resolvable SpEL expression as the basis for the payload that is sent to the request-channel.
For the expression evaluation, the full message is available as the 'root object'.
For instance, the following SpEL expressions (among others) are possible: payload.something , headers.something , new java.util.Date() , 'thing1' + 'thing2' |
7 | Channel where a reply message is expected. This is optional. Typically, the auto-generated temporary reply channel suffices. Optional. |
8 | The channel to which an ErrorMessage is sent if an Exception occurs downstream of the request-channel .
This enables you to return an alternative object to use for enrichment.
If it is not set, an Exception is thrown to the caller.
Optional. |
9 | Maximum amount of time in milliseconds to wait when sending a message to the channel, if the channel might block.
For example, a queue channel can block until space is available, if its maximum capacity has been reached.
Internally, the send timeout is set on the MessagingTemplate and ultimately applied when invoking the send operation on the MessageChannel .
By default, the send timeout is set to '-1', which can cause the send operation on the MessageChannel , depending on the implementation, to block indefinitely.
Optional. |
10 | Boolean value indicating whether any payload that implements Cloneable should be cloned prior to sending the message to the request channel for acquiring the enriching data.
The cloned version would be used as the target payload for the ultimate reply.
The default is false .
Optional. |
11 | Lets you configure a message poller if this endpoint is a polling consumer. Optional. |
12 | Each property sub-element provides the name of a property (through the mandatory name attribute).
That property should be settable on the target payload instance.
Exactly one of the value or expression attributes must be provided as well — the former for a literal value to set and the latter for a SpEL expression to be evaluated.
The root object of the evaluation context is the message that was returned from the flow initiated by this enricher — the input message if there is no request channel or the application context (using the @<beanName>.<beanProperty> SpEL syntax).
Starting with version 4.0, when specifying a value attribute, you can also specify an optional type attribute.
When the destination is a typed setter method, the framework coerces the value appropriately (as long as a PropertyEditor ) exists to handle the conversion.
If, however, the target payload is a Map , the entry is populated with the value without conversion.
The type attribute lets you, for example, convert a String containing a number to an Integer value in the target payload.
Starting with version 4.1, you can also specify an optional null-result-expression attribute.
When the enricher returns null, it is evaluated, and the output of the evaluation is returned instead. |
13 | Each header sub-element provides the name of a message header (through the mandatory name attribute).
Exactly one of the value or expression attributes must also be provided — the former for a literal value to set and the latter for a SpEL expression to be evaluated.
The root object of the evaluation context is the message that was returned from the flow initiated by this enricher — the input message if there is no request channel or the application context (using the '@<beanName>.<beanProperty>' SpEL syntax).
Note that, similarly to the <header-enricher> , the <enricher> element’s header element has type and overwrite attributes.
However, a key difference is that, with the <enricher> , the overwrite attribute is true by default, to be consistent with the <enricher> element’s <property> sub-element.
Starting with version 4.1, you can also specify an optional null-result-expression attribute.
When the enricher returns null, it is evaluated, and the output of the evaluation is returned instead. |
Examples
This section contains several examples of using a payload enricher in various situations.
The code samples shown here are part of the Spring Integration Samples project. See Spring Integration Samples. |
In the following example, a User
object is passed as the payload of the Message
:
<int:enricher id="findUserEnricher"
input-channel="findUserEnricherChannel"
request-channel="findUserServiceChannel">
<int:property name="email" expression="payload.email"/>
<int:property name="password" expression="payload.password"/>
</int:enricher>
The User
has several properties, but only the username
is set initially.
The enricher’s request-channel
attribute is configured to pass the User
to the findUserServiceChannel
.
Through the implicitly set reply-channel
, a User
object is returned and, by using the property
sub-element, properties from the reply are extracted and used to enrich the original payload.
How Do I Pass Only a Subset of Data to the Request Channel?
When using a request-payload-expression
attribute, a single property of the payload instead of the full message can be passed on to the request channel.
In the following example, the username property is passed on to the request channel:
<int:enricher id="findUserByUsernameEnricher"
input-channel="findUserByUsernameEnricherChannel"
request-channel="findUserByUsernameServiceChannel"
request-payload-expression="payload.username">
<int:property name="email" expression="payload.email"/>
<int:property name="password" expression="payload.password"/>
</int:enricher>
Keep in mind that, although only the username is passed, the resulting message to the request channel contains the full set of MessageHeaders
.
How Can I Enrich Payloads that Consist of Collection Data?
In the following example, instead of a User
object, a Map
is passed in:
<int:enricher id="findUserWithMapEnricher"
input-channel="findUserWithMapEnricherChannel"
request-channel="findUserByUsernameServiceChannel"
request-payload-expression="payload.username">
<int:property name="user" expression="payload"/>
</int:enricher>
The Map
contains the username under the username
map key.
Only the username
is passed on to the request channel.
The reply contains a full User
object, which is ultimately added to the Map
under the user
key.
How Can I Enrich Payloads with Static Information without Using a Request Channel?
The following example does not use a request channel at all but solely enriches the message’s payload with static values:
<int:enricher id="userEnricher"
input-channel="input">
<int:property name="user.updateDate" expression="new java.util.Date()"/>
<int:property name="user.firstName" value="William"/>
<int:property name="user.lastName" value="Shakespeare"/>
<int:property name="user.age" value="42"/>
</int:enricher>
Note that the word, 'static', is used loosely here. You can still use SpEL expressions for setting those values.
Claim Check
In earlier sections, we covered several content enricher components that can help you deal with situations where a message is missing a piece of data. We also discussed content filtering, which lets you remove data items from a message. However, there are times when we want to hide data temporarily. For example, in a distributed system, we may receive a message with a very large payload. Some intermittent message processing steps may not need access to this payload and some may only need to access certain headers, so carrying the large message payload through each processing step may cause performance degradation, may produce a security risk, and may make debugging more difficult.
The store in library (or claim check) pattern describes a mechanism that lets you store data in a well known place while maintaining only a pointer (a claim check) to where that data is located. You can pass that pointer around as the payload of a new message, thereby letting any component within the message flow get the actual data as soon as it needs it. This approach is very similar to the certified mail process, where you get a claim check in your mailbox and then have to go to the post office to claim your actual package. It is also the same idea as baggage claim after a flight or in a hotel.
Spring Integration provides two types of claim check transformers:
-
Incoming Claim Check Transformer
-
Outgoing Claim Check Transformer
Convenient namespace-based mechanisms are available to configure them.
Incoming Claim Check Transformer
An incoming claim check transformer transforms an incoming message by storing it in the message store identified by its message-store
attribute.
The following example defines an incoming claim check transformer:
<int:claim-check-in id="checkin"
input-channel="checkinChannel"
message-store="testMessageStore"
output-channel="output"/>
In the preceding configuration, the message that is received on the input-channel
is persisted to the message store identified with the message-store
attribute and indexed with a generated ID.
That ID is the claim check for that message.
The claim check also becomes the payload of the new (transformed) message that is sent to the output-channel
.
Now, assume that at some point you do need access to the actual message. You can access the message store manually and get the contents of the message, or you can use the same approach (creating a transformer) except that now you transform the Claim Check to the actual message by using an outgoing claim check transformer.
The following listing provides an overview of all available parameters of an incoming claim check transformer:
<int:claim-check-in auto-startup="true" (1)
id="" (2)
input-channel="" (3)
message-store="messageStore" (4)
order="" (5)
output-channel="" (6)
send-timeout=""> (7)
<int:poller></int:poller> (8)
</int:claim-check-in>
1 | Lifecycle attribute signaling whether this component should be started during application context startup.
It defaults to true .
This attribute is not available inside a Chain element.
Optional. |
2 | ID identifying the underlying bean definition (MessageTransformingHandler ).
This attribute is not available inside a Chain element.
Optional. |
3 | The receiving message channel of this endpoint.
This attribute is not available inside a Chain element.
Optional. |
4 | Reference to the MessageStore to be used by this claim check transformer.
If not specified, the default reference is to a bean named messageStore .
Optional. |
5 | Specifies the order for invocation when this endpoint is connected as a subscriber to a channel.
This is particularly relevant when that channel uses a failover dispatching strategy.
It has no effect when this endpoint is itself a polling consumer for a channel with a queue.
This attribute is not available inside a Chain element.
Optional. |
6 | Identifies the message channel where the message is sent after being processed by this endpoint.
This attribute is not available inside a Chain element.
Optional. |
7 | Specifies the maximum amount of time (in milliseconds) to wait when sending a reply message to the output channel.
Defaults to -1 — blocking indefinitely.
This attribute is not available inside a Chain element.
Optional. |
8 | Defines a poller.
This element is not available inside a Chain element.
Optional. |
Outgoing Claim Check Transformer
An outgoing claim check transformer lets you transform a message with a claim check payload into a message with the original content as its payload.
<int:claim-check-out id="checkout"
input-channel="checkoutChannel"
message-store="testMessageStore"
output-channel="output"/>
In the preceding configuration, the message received on the input-channel
should have a claim check as its payload.
The outgoing claim check transformer transforms it into a message with the original payload by querying the message store for a message identified by the provided claim check.
It then sends the newly checked-out message to the output-channel
.
The following listing provides an overview of all available parameters of an outgoing claim check transformer:
<int:claim-check-out auto-startup="true" (1)
id="" (2)
input-channel="" (3)
message-store="messageStore" (4)
order="" (5)
output-channel="" (6)
remove-message="false" (7)
send-timeout=""> (8)
<int:poller></int:poller> (9)
</int:claim-check-out>
1 | Lifecycle attribute signaling whether this component should be started during application context startup.
It defaults to true .
This attribute is not available inside a Chain element.
Optional. |
2 | ID identifying the underlying bean definition (MessageTransformingHandler ).
This attribute is not available inside a Chain element.
Optional. |
3 | The receiving message channel of this endpoint.
This attribute is not available inside a Chain element.
Optional. |
4 | Reference to the MessageStore to be used by this claim check transformer.
If not specified, the default reference is to a bean named messageStore .
Optional. |
5 | Specifies the order for invocation when this endpoint is connected as a subscriber to a channel.
This is particularly relevant when that channel is using a failover dispatching strategy.
It has no effect when this endpoint is itself a polling consumer for a channel with a queue.
This attribute is not available inside a Chain element.
Optional. |
6 | Identifies the message channel where the message is sent after being processed by this endpoint.
This attribute is not available inside a Chain element.
Optional. |
7 | If set to true , the message is removed from the MessageStore by this transformer.
This setting is useful when Message can be “claimed” only once.
It defaults to false .
Optional. |
8 | Specifies the maximum amount of time (in milliseconds) to wait when sending a reply message to the output channel.
It defaults to -1 — blocking indefinitely.
This attribute is not available inside a Chain element.
Optional. |
9 | Defines a poller.
This element is not available inside a Chain element.
Optional. |
Claim Once
Sometimes, a particular message must be claimed only once.
As an analogy, consider process of handling airplane luggage.
You checking in your luggage on departure and claiming it on arrival.
Once the luggage has been claimed, it can not be claimed again without first checking it back in.
To accommodate such cases, we introduced a remove-message
boolean attribute on the claim-check-out
transformer.
This attribute is set to false
by default.
However, if set to true
, the claimed message is removed from the MessageStore
so that it cannot be claimed again.
This feature has an impact in terms of storage space, especially in the case of the in-memory Map
-based SimpleMessageStore
, where failing to remove messages could ultimately lead to an OutOfMemoryException
.
Therefore, if you do not expect multiple claims to be made, we recommend that you set the remove-message
attribute’s value to true
.
The following example show how to use the remove-message
attribute:
<int:claim-check-out id="checkout"
input-channel="checkoutChannel"
message-store="testMessageStore"
output-channel="output"
remove-message="true"/>
A Word on Message Store
Although we rarely care about the details of the claim checks (as long as they work), you should know that the current implementation of the actual claim check (the pointer) in Spring Integration uses a UUID to ensure uniqueness.
org.springframework.integration.store.MessageStore
is a strategy interface for storing and retrieving messages.
Spring Integration provides two convenient implementations of it:
-
SimpleMessageStore
: An in-memory,Map
-based implementation (the default, good for testing) -
JdbcMessageStore
: An implementation that uses a relational database over JDBC
Codec
Version 4.2 of Spring Integration introduced the Codec
abstraction.
Codecs encode and decode objects to and from byte[]
.
They offer an alternative to Java serialization.
One advantage is that, typically, objects need not implement Serializable
.
We provide one implementation that uses Kryo for serialization, but you can provide your own implementation for use in any of the following components:
-
EncodingPayloadTransformer
-
DecodingTransformer
-
CodecMessageConverter
EncodingPayloadTransformer
This transformer encodes the payload to a byte[]
by using the codec.
It does not affect message headers.
See the Javadoc for more information.
DecodingTransformer
This transformer decodes a byte[]
by using the codec.
It needs to be configured with the Class
to which the object should be decoded (or an expression that resolves to a Class
).
If the resulting object is a Message<?>
, inbound headers are not retained.
See the Javadoc for more information.
CodecMessageConverter
Certain endpoints (such as TCP and Redis) have no concept of message headers.
They support the use of a MessageConverter
, and the CodecMessageConverter
can be used to convert a message to or from a byte[]
for transmission.
See the Javadoc for more information.
Kryo
Currently, this is the only implementation of Codec
, and it provides two kinds of Codec
:
-
PojoCodec
: Used in the transformers -
MessageCodec
: Used in theCodecMessageConverter
The framework provides several custom serializers:
-
FileSerializer
-
MessageHeadersSerializer
-
MutableMessageHeadersSerializer
The first can be used with the PojoCodec
by initializing it with the FileKryoRegistrar
.
The second and third are used with the MessageCodec
, which is initialized with the MessageKryoRegistrar
.
Customizing Kryo
By default, Kryo delegates unknown Java types to its FieldSerializer
.
Kryo also registers default serializers for each primitive type, along with String
, Collection
, and Map
.
FieldSerializer
uses reflection to navigate the object graph.
A more efficient approach is to implement a custom serializer that is aware of the object’s structure and can directly serialize selected primitive fields.
The following example shows such a serializer:
public class AddressSerializer extends Serializer<Address> {
@Override
public void write(Kryo kryo, Output output, Address address) {
output.writeString(address.getStreet());
output.writeString(address.getCity());
output.writeString(address.getCountry());
}
@Override
public Address read(Kryo kryo, Input input, Class<Address> type) {
return new Address(input.readString(), input.readString(), input.readString());
}
}
The Serializer
interface exposes Kryo
, Input
, and Output
, which provide complete control over which fields are included and other internal settings, as described in the Kryo documentation.
When registering your custom serializer, you need a registration ID. The registration IDs are arbitrary. However, in our case, the IDs must be explicitly defined, because each Kryo instance across the distributed application must use the same IDs. Kryo recommends small positive integers and reserves a few ids (value < 10). Spring Integration currently defaults to using 40, 41, and 42 (for the file and message header serializers mentioned earlier). We recommend you start at 60, to allow for expansion in the framework. You can override these framework defaults by configuring the registrars mentioned earlier. |
Using a Custom Kryo Serializer
If you need custom serialization, see the Kryo documentation, because you need to use the native API to do the customization.
For an example, see the MessageCodec
implementation.
Implementing KryoSerializable
If you have write access to the domain object source code, you can implement KryoSerializable
as described here.
In this case, the class provides the serialization methods itself and no further configuration is required.
However benchmarks have shown this is not quite as efficient as registering a custom serializer explicitly.
The following example shows a custom Kryo serializer:
public class Address implements KryoSerializable {
...
@Override
public void write(Kryo kryo, Output output) {
output.writeString(this.street);
output.writeString(this.city);
output.writeString(this.country);
}
@Override
public void read(Kryo kryo, Input input) {
this.street = input.readString();
this.city = input.readString();
this.country = input.readString();
}
}
You can also use this technique to wrap a serialization library other than Kryo.
Using the @DefaultSerializer
Annotation
Kryo also provides a @DefaultSerializer
annotation, as described here.
@DefaultSerializer(SomeClassSerializer.class)
public class SomeClass {
// ...
}
If you have write access to the domain object, this may be a simpler way to specify a custom serializer. Note that this does not register the class with an ID, which may make the technique unhelpful for certain situations.