To allow you to propagate information about the content type of produced messages, Spring Cloud Stream attaches, by default, a contentType
header to outbound messages.
For middleware that does not directly support headers, Spring Cloud Stream provides its own mechanism of automatically wrapping outbound messages in an envelope of its own.
For middleware that does support headers, Spring Cloud Stream applications may receive messages with a given content type from non-Spring Cloud Stream applications.
The content type resolution process have been redesigned for Spring Cloud Stream 2.0.
Please read the migrating from 1.3 section to understand the changes when interacting with applications using versions of the framework.
The framework depends on a contentType
to be present as a header in order to know how serialize/deserialize a payload.
Spring Cloud Stream allows you to declaratively configure type conversion for inputs and outputs using the spring.cloud.stream.bindings.<channelName>.content-type
property of a binding.
Note that general type conversion may also be accomplished easily by using a transformer inside your application.
![]() | Note |
---|---|
For both input and output channel, setting a contentType via a property or via annotation only triggers the |
![]() | Tip |
---|---|
Although contentType became a required property, the framework will set a default value of |
The content-type
values are parsed as media types, e.g., application/json
or text/plain;charset=UTF-8
.
MIME types are especially useful for indicating how to convert to String or byte[] content.
Spring Cloud Stream also uses MIME type format to represent Java types, using the general type application/x-java-object
with a type
parameter.
For example, application/x-java-object;type=java.util.Map
or application/x-java-object;type=com.bar.Foo
can be set as the content-type
property of an input binding.
In addition, Spring Cloud Stream provides custom MIME types, notably, application/x-spring-tuple
to specify a Tuple.
You can configure a message channel content type using spring.cloud.stream.bindings.<channelName>.content-type
property, or using the @Input
and @Output
annotations.
By doing so, even if you send a POJO with no contentType
information, the framework will set the MessageHeader contentType
to the specified value set for the channel.
However, if you send a Message<T>
and sets the contentType
manually, that takes precedence over the configured property value.
This is valid for both input and output channels. The MessageHeader
will always take precedence over the default configured contentType
for the channel.
Starting with version 2.0, the framework will no longer try to infer a contentType based on the payload T
of a Message<T>
.
It will instead use the contentType header (or the default provided by the framework) to configure the right MessageConverter
to serialize the payload into byte[]
.
The contentType
you set is a hint to activate the corresponding MessageConverter
. The converter can then modify the contentType to augment the information, such as the case with Kryo
and Avro
conveters.
For outbound messages, if your payload is of typ byte[]
, the framework will skip the conversion logic, and just write those bytes to the wire.
In this case, if contentType
of the message is absent, it will set the default value specified to channel.
![]() | Tip |
---|---|
If you intend to bypass conversion, just make sure you set the appropriate |
The following snippet shows how you can bypass conversion and set the correct contentType header.
@Autowired private Source source; public void sendImageData(File f) throws Exception{ byte[] data = Files.readAllBytes(f.toPath()); MimeType mimeType = (f.getName().endsWith("gif")) ? MimeTypeUtils.IMAGE_GIF : MimeTypeUtils.IMAGE_JPEG; source.output().send(MessageBuilder.withPayload(data) .setHeader(MessageHeaders.CONTENT_TYPE, mimeType) .build()); }
Regardless of contentType used, the result is always a Message<byte[]>
with a header contentType
set. This is what gets passed to the binder to be sent over the wire.
content-type header | MessageConverter | content-type augmented | Supported types | Comments |
---|---|---|---|---|
application/json | CustomMappingJackson2MessageConverter | application/json | POJO, primitives and Strings that represent JSON data | It’s the default converter if none is specified. Note that if you send a raw String it will be quoted |
text/plain | ObjectStringMessageConverter | text/plain | Invokes | |
application/x-spring-tuple | TupleJsonMessageConverter | application/x-spring-tuple | org.springframework.tuple.Tuple | |
application/x-java-serialized-object | JavaSerializationMessageConverter | application/x-java-serialized-object | Any Java type that implements | This converter uses java native serialization. Receivers of this data must have the same class on the classpath. |
application/x-java-object | KryoMessageConverter | application/x-java-object;type=<Class being serialized> | Any Java type that can be serialized using Kryo | Receivers of this data must have the same class on the classpath. |
application/avro | AvroMessageConverter | application/avro | A Generic or SpecificRecord from Avro types, a POJO if reflection is used | Avro needs an associated schema to write/read data. Please refer to the section on the docs on how to use it properly |
For input channels, Spring Cloud Stream uses @StreamListener
and @ServiceActivator
content handling to support the conversion.
It does so by checking either the channel content-type
set via @Input(contentType="text/plain")
annotation or via spring.cloud.stream.bindings.<channel>.contentType
property, or the presense of a header contentType
.
The framework will check the contentType set for the Message, select the appropriate MessageConverter
and apply conversion passing the argument as the target type.
If the converter does not support the target type it will return null
, if all configured converters return null
, a MessageConversionException
is thrown.
Just like output channels, if your method payload argument is of type Message<byte[]>
, byte[]
or Message<?>
conversion is skipped and you get the raw bytes from the wire, plus the corresponding headers.
![]() | Tip |
---|---|
Remember, the MessageHeader always takes precedence over the annotation or property configuration. |
content-type header | MessageConverter | Supported target type | Comments |
---|---|---|---|
application/json | CustomMappingJackson2MessageConverter | POJO or String | |
text/plain | ObjectStringMessageConverter | String | |
application/x-spring-tuple | TupleJsonMessageConverter | org.springframework.tuple.Tuple | |
application/x-java-serialized-object | JavaSerializationMessageConverter | Any Java type that implements | |
application/x-java-object | KryoMessageConverter | Any Java type that can be serialized using Kryo | |
application/avro | AvroMessageConverter | A Generic or SpecificRecord from Avro types, a POJO if reflection is used | Avro needs an associated schema to write/read data. Please refer to the section on the docs on how to use it properly |
Besides the conversions that it supports out of the box, Spring Cloud Stream also supports registering your own message conversion implementations.
This allows you to send and receive data in a variety of custom formats, including binary, and associate them with specific contentTypes
.
Spring Cloud Stream registers all the beans of type org.springframework.messaging.converter.MessageConverter
that are qualifeied using @StreamConverter
annotation, as custom message converters along with the out of the box message converters.
![]() | Note |
---|---|
The framework requires the |
If your message converter needs to work with a specific content-type
and target class (for both input and output), then the message converter needs to extend org.springframework.messaging.converter.AbstractMessageConverter
.
For conversion when using @StreamListener
, a message converter that implements org.springframework.messaging.converter.MessageConverter
would suffice.
Here is an example of creating a message converter bean (with the content-type application/bar
) inside a Spring Cloud Stream application:
@EnableBinding(Sink.class) @SpringBootApplication public static class SinkApplication { ... @Bean @StreamConverter public MessageConverter customMessageConverter() { return new MyCustomMessageConverter(); }
public class MyCustomMessageConverter extends AbstractMessageConverter { public MyCustomMessageConverter() { super(new MimeType("application", "bar")); } @Override protected boolean supports(Class<?> clazz) { return (Bar.class == clazz); } @Override protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) { Object payload = message.getPayload(); return (payload instanceof Bar ? payload : new Bar((byte[]) payload)); } }
Spring Cloud Stream also provides support for Avro-based converters and schema evolution. See the specific section for details.
The @StreamListener
annotation provides a convenient way for converting incoming messages without the need to specify the content type of an input channel.
During the dispatching process to methods annotated with @StreamListener
, a conversion will be applied automatically if the argument requires it.
For example, let’s consider a message with the String content {"greeting":"Hello, world"}
and a content-type
header of application/json
is received on the input channel.
Let us consider the following application that receives it:
public class GreetingMessage { String greeting; public String getGreeting() { return greeting; } public void setGreeting(String greeting) { this.greeting = greeting; } } @EnableBinding(Sink.class) @EnableAutoConfiguration public static class GreetingSink { @StreamListener(Sink.INPUT) public void receive(Greeting greeting) { // handle Greeting } }
The argument of the method will be populated automatically with the POJO containing the unmarshalled form of the JSON String.