Data transformation is one of the core features of any message-driven microservice architecture. Given that in Spring Cloud Stream, such data
is represented as a Spring Message
, such message may have to be transformed to a desired shape/size before reaching its destination. This is required for two reasons:
1. To convert the contents of the incoming message to match the signature of the application-provided handler.
2. To convert the contents of the outgoing message to the wire format.
The wire format is typically byte[]
(i.e., Kafka and Rabbit binders), but is governed by the binder implementation.
In Spring Cloud Stream, message transformation is accomplished with a org.springframework.messaging.converter.MessageConverter
.
![]() | Note |
---|---|
As a supplement to the details to follow you may also want to read the following blog |
To better understand the mechanics and the necessity behind content-type negotiation let’s look at the very simple use case using the following message handler as an example. Also let’s assume that this is the only handler in the application (no internal pipeline) for simplicity.
@StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public String handle(Person person) {..}
The above handler expects Person
type as an argument and will produce String
type as an output. In order for the framework to succeed in passing the incoming
Message
as an argument to this handler it has to somehow transform the payload of the Message
from the wire format to Person
type.
In other words the framework must locate and apply the appropriate MessageConverter
. To accomplish that the framework needs some instructions
from the user. One of these instructions is already provided by the signature of the handler method itself (Person
type), so in theory, that should and in some
cases is enough, but for the majority of the use cases in order to select the appropriate MessageConverter
the framework needs an additional piece of information.
That missing piece is contentType
.
Spring Cloud Stream provides three simple mechanisms to define contentType
and they all come with precedence order:
1. HEADER - the contentType
can be communicated through the Message itself. By simply providing contentType
header you are declaring the content type to use to locate and
apply the appropriate MessageConverter.
2. BINDING - the contentType
can be set per destination binding via spring.cloud.stream.bindings.input.content-type
property. NOTE: the segment input
in the property name
corresponds to the actual name of the destination which is “input” in our case. This approach allows one to declare per-binding the content type to use to locate and
apply the appropriate MessageConverter.
3. DEFAULT - in the event contentType
is not present in the Message header and/or binding, the default application/json
content type will be used to
locate and apply the appropriate MessageConverter.
As mentioned, the above also demonstrates the order of precedence in the event there is a tie. For example, header provided content type takes precedence over any other content type. The same applies for content type set per binding which essentially allows one to override the default content type. But it also provides a sensible default which was determined from the community feedback.
Another reason for making application/json
the default stems from the interoperability requirements driven by distributed microservices architectures where producer and consumer not only
run in different JVMs, but can also run on different non-JVM platforms.
Once the non-void handler method returns and unless the return value is already a Message
, the new Message
is constructed with return vlaue as the payload while inheriting
headers from the input Message
less the ones defined/filtered by SpringIntegrationProperties.messageHandlerNotPropagatedHeaders
.
By default, there is only one header set there - contentType
. This means that the new Message
will not have contentType
header set, thus ensuring that the contentType
can evolve. You can always opt out to returning a Message
from the handler method where you can inject any header you wish.
If there is an internal pipeline the Message
is sent to the next handler going through the same process of conversion, or if there is no internal
pipeline or you’ve reached the end of it the Message
is sent back to the output destination.
As it was mentioned, for the framework to select the appropriate MessageConverter it requires argument type and optionally content type information.
The logic for selecting the appropriate MessageConverter
resides with the argument resolvers (HandlerMethodArgumentResolvers
), right before the invocation of the user
defined handler method (that is when the actual argument type is known to the framework).
If argument type does NOT match the type of the current payload the framework delegates to the stack of the
pre-configured MessageConverters
to see if any one of them can convert the payload. As you can see the Object fromMessage(Message<?> message, Class<?> targetClass);
operation of the MessageConverter takes targetClass
as one of its arguments. The framework also ensures that the provided Message
always contains contentType
header
in the event one was not there already (injects the default one or the one set per binding).
That is the mechanism by which framework determines if message can be converted to a target type - contentType
and argumenyt type.
If no appropriate MessageConverter
is found the exception is thrown at which time you can add custom MessageConverter
(more on this later).
But what if the payload type matches the target type declared by the handler method? In this cases there is obviously nothing to convert and the
payload will be passed unmodified. While this sounds pretty straight forward and logical, keep in mind handler methods that take Message<?>
and/or Object
as an
argument. By doing so you are essentially forfeiting the conversion process by declaring the target type to be Object
which is an instanceof
everything in Java.
In other words:
![]() | Note |
---|---|
Do NOT expect Message to be converted into some type based on the |
MessageConverters
define two methods:
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);
It is important to understand the contract of these methods and their usage specifically in the context of Spring Cloud Stream.
The fromMessage
method converts incoming Message
to an argument type. The payload of the Message
could be any type and it’s
up to the actual implementation of the MessageConverter
to support multiple types. For example, some JSON converter may support the payload type as byte[]
and String
etc. This is important when application contains an internal pipeline (i.e., input → handler1 → handler2 →. . . → output) and the output of
the upstream handler results in a Message
which may not be in the initial wire format.
However. . .
The toMessage
method has a more strict contract and must always convert Message
to the wire format - byte[]
.
So for all intents and purposes (and especially when implementing your own converter) you might as well look at them as:
Object fromMessage(Message<?> message, Class<?> targetClass); Message<byte[]> toMessage(Object payload, @Nullable MessageHeaders headers);
As it was mentioned earlier the framework already provides a stack of MessageConverters
to handle most common use cases. Below is the ordered list of provided MessageConverters
.
![]() | Note |
---|---|
It is important to understand the importance of the order since the mechanism by which the framework locates the appropriate |
ApplicationJsonMessageMarshallingConverter
- variation of the org.springframework.messaging.converter.MappingJackson2MessageConverter
. Supports conversion of the payload of the
Message
from String
or byte[]
.TupleJsonMessageConverter
- [DEPRECATED] Supports conversion of the payload of the Message
from org.springframework.tuple.Tuple
.ByteArrayMessageConverter
- Supports conversion of the payload of the Message
from byte[]
to byte[]
for cases when contentType
is set to application/octet-stream
.
Essentially a pass through and exists primarily for backward compatibility.ObjectStringMessageConverter
- Supports conversion of any type to a String
, when contentType is text/plain
. Invokes Object’s toString()
method or if payload is
byte[]
then new String(byte[])
.JavaSerializationMessageConverter
- [DEPRECATED] Supports conversion based on java serialization when contentType
is application/x-java-serialized-object
.KryoMessageConverter
- [DEPRECATED] Supports conversion based on kryo serialization when contentType
is application/x-java-object
.JsonUnmarshallingConverter
- Similar to the ApplicationJsonMessageMarshallingConverter
. Supports conversion of any type when contentType
is application/x-java-object
.
Expects the actual type information to be embedded in the contentType
as an attribute (e.g., application/x-java-object;type=foo.bar.Baz
).In the event no appropriate converter is found the framework will throw an exception at which point you should check your code and configfuration and ensure you didn’t miss anything
(i.e., provide contentType
via binding or header). However, most likely you are dealing with some uncommon case (custom contentType
perhaps) and the current stack of provided MessageConverters
doesn’t know how to convert. And if that’s the case you can add custom MessageConverter
.
Spring Cloud Stream exposes a mechanism to define and register additional MessageConverters
. All you need to do is implement org.springframework.messaging.converter.MessageConverter
,
confiure it as @Bean
and annotate it with @StreamMessageConverter
and it will be added to the existing stack of MessageConverters
. The @StreamMessageConverter
qualifier annotation
is to avoid picking up other converters that may be present on the Application Context.
![]() | Note |
---|---|
It is important to undetrstand that custom |
Here is an example of creating a message converter bean to support new content type application/bar
:
@EnableBinding(Sink.class) @SpringBootApplication public static class SinkApplication { ... @Bean @StreamConverter public MessageConverter customMessageConverter() { return new MyCustomMessageConverter(); } }
public class MyCustomMessageConverter extends AbstractMessageConverter { public MyCustomMessageConverter() { super(new MimeType("application", "bar")); } @Override protected boolean supports(Class<?> clazz) { return (Bar.class.equals(clazz)); } @Override protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) { Object payload = message.getPayload(); return (payload instanceof Bar ? payload : new Bar((byte[]) payload)); } }
Spring Cloud Stream also provides support for Avro-based converters and schema evolution. See the specific section for details.