8. Content Type negotiation

8.1 Introduction

Data transformation is one of the core features of any message-driven microservice architecture. Given that in Spring Cloud Stream, such data is represented as a Spring Message, such message may have to be transformed to a desired shape/size before reaching its destination. This is required for two reasons:

1. To convert the contents of the incoming message to match the signature of the application-provided handler.

2. To convert the contents of the outgoing message to the wire format.

The wire format is typically byte[] (i.e., Kafka and Rabbit binders), but is governed by the binder implementation.

In Spring Cloud Stream, message transformation is accomplished with a org.springframework.messaging.converter.MessageConverter.

[Note]Note

As a supplement to the details to follow you may also want to read the following blog

8.2 Mechanics

To better understand the mechanics and the necessity behind content-type negotiation let’s look at the very simple use case using the following message handler as an example. Also let’s assume that this is the only handler in the application (no internal pipeline) for simplicity.

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String handle(Person person) {..}

The above handler expects Person type as an argument and will produce String type as an output. In order for the framework to succeed in passing the incoming Message as an argument to this handler it has to somehow transform the payload of the Message from the wire format to Person type. In other words the framework must locate and apply the appropriate MessageConverter. To accomplish that the framework needs some instructions from the user. One of these instructions is already provided by the signature of the handler method itself (Person type), so in theory, that should and in some cases is enough, but for the majority of the use cases in order to select the appropriate MessageConverter the framework needs an additional piece of information. That missing piece is contentType.

Spring Cloud Stream provides three simple mechanisms to define contentType and they all come with precedence order:

1. HEADER - the contentType can be communicated through the Message itself. By simply providing contentType header you are declaring the content type to use to locate and apply the appropriate MessageConverter.

2. BINDING - the contentType can be set per destination binding via spring.cloud.stream.bindings.input.content-type property. NOTE: the segment input in the property name corresponds to the actual name of the destination which is “input” in our case. This approach allows one to declare per-binding the content type to use to locate and apply the appropriate MessageConverter.

3. DEFAULT - in the event contentType is not present in the Message header and/or binding, the default application/json content type will be used to locate and apply the appropriate MessageConverter.

As mentioned, the above also demonstrates the order of precedence in the event there is a tie. For example, header provided content type takes precedence over any other content type. The same applies for content type set per binding which essentially allows one to override the default content type. But it also provides a sensible default which was determined from the community feedback.

Another reason for making application/json the default stems from the interoperability requirements driven by distributed microservices architectures where producer and consumer not only run in different JVMs, but can also run on different non-JVM platforms.

Once the non-void handler method returns and unless the return value is already a Message, the new Message is constructed with return vlaue as the payload while inheriting headers from the input Message less the ones defined/filtered by SpringIntegrationProperties.messageHandlerNotPropagatedHeaders. By default, there is only one header set there - contentType. This means that the new Message will not have contentType header set, thus ensuring that the contentType can evolve. You can always opt out to returning a Message from the handler method where you can inject any header you wish.

If there is an internal pipeline the Message is sent to the next handler going through the same process of conversion, or if there is no internal pipeline or you’ve reached the end of it the Message is sent back to the output destination.

8.2.1 Content type vs. argument type

As it was mentioned, for the framework to select the appropriate MessageConverter it requires argument type and optionally content type information. The logic for selecting the appropriate MessageConverter resides with the argument resolvers (HandlerMethodArgumentResolvers), right before the invocation of the user defined handler method (that is when the actual argument type is known to the framework). If argument type does NOT match the type of the current payload the framework delegates to the stack of the pre-configured MessageConverters to see if any one of them can convert the payload. As you can see the Object fromMessage(Message<?> message, Class<?> targetClass); operation of the MessageConverter takes targetClass as one of its arguments. The framework also ensures that the provided Message always contains contentType header in the event one was not there already (injects the default one or the one set per binding). That is the mechanism by which framework determines if message can be converted to a target type - contentType and argumenyt type. If no appropriate MessageConverter is found the exception is thrown at which time you can add custom MessageConverter (more on this later).

But what if the payload type matches the target type declared by the handler method? In this cases there is obviously nothing to convert and the payload will be passed unmodified. While this sounds pretty straight forward and logical, keep in mind handler methods that take Message<?> and/or Object as an argument. By doing so you are essentially forfeiting the conversion process by declaring the target type to be Object which is an instanceof everything in Java.

In other words:

[Note]Note

Do NOT expect Message to be converted into some type based on the contentType only. Remember that the contentType is complimentary to the target type. A hint if you wish which MessageConverter may or may not take into consideration.

8.2.2 Message Converters

MessageConverters define two methods:

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);

It is important to understand the contract of these methods and their usage specifically in the context of Spring Cloud Stream.

The fromMessage method converts incoming Message to an argument type. The payload of the Message could be any type and it’s up to the actual implementation of the MessageConverter to support multiple types. For example, some JSON converter may support the payload type as byte[] and String etc. This is important when application contains an internal pipeline (i.e., input → handler1 → handler2 →. . . → output) and the output of the upstream handler results in a Message which may not be in the initial wire format.

However. . .

The toMessage method has a more strict contract and must always convert Message to the wire format - byte[].

So for all intents and purposes (and especially when implementing your own converter) you might as well look at them as:

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<byte[]> toMessage(Object payload, @Nullable MessageHeaders headers);

8.3 Provided MessageConverters

As it was mentioned earlier the framework already provides a stack of MessageConverters to handle most common use cases. Below is the ordered list of provided MessageConverters.

[Note]Note

It is important to understand the importance of the order since the mechanism by which the framework locates the appropriate MessageConverter is by iterating through each and asking if it can convert using the first one that can convert.

  1. ApplicationJsonMessageMarshallingConverter - variation of the org.springframework.messaging.converter.MappingJackson2MessageConverter. Supports conversion of the payload of the Message from String or byte[].
  2. TupleJsonMessageConverter - [DEPRECATED] Supports conversion of the payload of the Message from org.springframework.tuple.Tuple.
  3. ByteArrayMessageConverter - Supports conversion of the payload of the Message from byte[] to byte[] for cases when contentType is set to application/octet-stream. Essentially a pass through and exists primarily for backward compatibility.
  4. ObjectStringMessageConverter - Supports conversion of any type to a String, when contentType is text/plain. Invokes Object’s toString() method or if payload is byte[] then new String(byte[]).
  5. JavaSerializationMessageConverter - [DEPRECATED] Supports conversion based on java serialization when contentType is application/x-java-serialized-object.
  6. KryoMessageConverter - [DEPRECATED] Supports conversion based on kryo serialization when contentType is application/x-java-object.
  7. JsonUnmarshallingConverter - Similar to the ApplicationJsonMessageMarshallingConverter. Supports conversion of any type when contentType is application/x-java-object. Expects the actual type information to be embedded in the contentType as an attribute (e.g., application/x-java-object;type=foo.bar.Baz).

In the event no appropriate converter is found the framework will throw an exception at which point you should check your code and configfuration and ensure you didn’t miss anything (i.e., provide contentType via binding or header). However, most likely you are dealing with some uncommon case (custom contentType perhaps) and the current stack of provided MessageConverters doesn’t know how to convert. And if that’s the case you can add custom MessageConverter.

8.4 User defined Message Converters

Spring Cloud Stream exposes a mechanism to define and register additional MessageConverters. All you need to do is implement org.springframework.messaging.converter.MessageConverter, confiure it as @Bean and annotate it with @StreamMessageConverter and it will be added to the existing stack of MessageConverters. The @StreamMessageConverter qualifier annotation is to avoid picking up other converters that may be present on the Application Context.

[Note]Note

It is important to undetrstand that custom MessageConverters are added to the head of the existing stack. This allows custom MessageConverters to take precedence over the existing ones, thus supporting not only addition, but the override of the existing ones.

Here is an example of creating a message converter bean to support new content type application/bar:

@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {

    ...

    @Bean
    @StreamConverter
    public MessageConverter customMessageConverter() {
        return new MyCustomMessageConverter();
    }
}
public class MyCustomMessageConverter extends AbstractMessageConverter {

    public MyCustomMessageConverter() {
        super(new MimeType("application", "bar"));
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return (Bar.class.equals(clazz));
    }

    @Override
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
    }
}

Spring Cloud Stream also provides support for Avro-based converters and schema evolution. See the specific section for details.