To allow you to propagate information about the content type of produced messages, Spring Cloud Stream attaches, by default, a contentType
header to outbound messages.
For middleware that does not directly support headers, Spring Cloud Stream provides its own mechanism of automatically wrapping outbound messages in an envelope of its own.
For middleware that does support headers, Spring Cloud Stream applications may receive messages with a given content type from non-Spring Cloud Stream applications.
Spring Cloud Stream can handle messages based on this information in two ways:
contentType
settings on inbound and outbound channels@StreamListener
Spring Cloud Stream allows you to declaratively configure type conversion for inputs and outputs using the spring.cloud.stream.bindings.<channelName>.content-type
property of a binding.
Note that general type conversion may also be accomplished easily by using a transformer inside your application.
Currently, Spring Cloud Stream natively supports the following type conversions commonly used in streams:
Where JSON represents either a byte array or String payload containing JSON. Currently, Objects may be converted from a JSON byte array or String. Converting to JSON always produces a String.
If no content-type
property is set on an outbound channel, Spring Cloud Stream will serialize the payload using a serializer based on the Kryo serialization framework.
Deserializing messages at the destination requires the payload class to be present on the receiver’s classpath.
content-type
values are parsed as media types, e.g., application/json
or text/plain;charset=UTF-8
.
MIME types are especially useful for indicating how to convert to String or byte[] content.
Spring Cloud Stream also uses MIME type format to represent Java types, using the general type application/x-java-object
with a type
parameter.
For example, application/x-java-object;type=java.util.Map
or application/x-java-object;type=com.bar.Foo
can be set as the content-type
property of an input binding.
In addition, Spring Cloud Stream provides custom MIME types, notably, application/x-spring-tuple
to specify a Tuple.
The type conversions Spring Cloud Stream provides out of the box are summarized in the following table: 'Source Payload' means the payload before conversion and 'Target Payload' means the 'payload' after conversion. The type conversion can occur either on the 'producer' side (output) or at the 'consumer' side (input).
Source Payload | Target Payload | content-type header (source message) | content-type header (after conversion) | Comments |
---|---|---|---|---|
POJO | JSON String | ignored | application/json | |
Tuple | JSON String | ignored | application/json | JSON is tailored for Tuple |
POJO | String (toString()) | ignored | text/plain, java.lang.String | |
POJO | byte[] (java.io serialized) | ignored | application/x-java-serialized-object | |
JSON byte[] or String | POJO | application/json (or none) | application/x-java-object | |
byte[] or String | Serializable | application/x-java-serialized-object | application/x-java-object | |
JSON byte[] or String | Tuple | application/json (or none) | application/x-spring-tuple | |
byte[] | String | any | text/plain, java.lang.String | will apply any Charset specified in the content-type header |
String | byte[] | any | application/octet-stream | will apply any Charset specified in the content-type header |
Note | |
---|---|
Conversion applies to payloads that require type conversion. For example, if an application produces an XML string with outputType=application/json, the payload will not be converted from XML to JSON. This is because the payload send to the outbound channel is already a String so no conversion will be applied at runtime. It is also important to note that when using the default serialization mechanism, the payload class must be shared between the sending and receiving application, and compatible with the binary content. This can create issues when application code changes independently in the two applications, as the binary format and code may become incompatible. |
Tip | |
---|---|
While conversion is supported for both inbound and outbound channels, it is especially recommended to be used for the conversion of outbound messages.
For the conversion of inbound messages, especially when the target is a POJO, the |
Besides the conversions that it supports out of the box, Spring Cloud Stream also supports registering your own message conversion implementations.
This allows you to send and receive data in a variety of custom formats, including binary, and associate them with specific contentTypes
.
Spring Cloud Stream registers all the beans of type org.springframework.messaging.converter.MessageConverter
as custom message converters along with the out of the box message converters.
If your message converter needs to work with a specific content-type
and target class (for both input and output), then the message converter needs to extend org.springframework.messaging.converter.AbstractMessageConverter
.
For conversion when using @StreamListener
, a message converter that implements org.springframework.messaging.converter.MessageConverter
would suffice.
Here is an example of creating a message converter bean (with the content-type application/bar
) inside a Spring Cloud Stream application:
@EnableBinding(Sink.class) @SpringBootApplication public static class SinkApplication { ... @Bean public MessageConverter customMessageConverter() { return new MyCustomMessageConverter(); }
public class MyCustomMessageConverter extends AbstractMessageConverter { public MyCustomMessageConverter() { super(new MimeType("application", "bar")); } @Override protected boolean supports(Class<?> clazz) { return (Bar.class == clazz); } @Override protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) { Object payload = message.getPayload(); return (payload instanceof Bar ? payload : new Bar((byte[]) payload)); } }
Spring Cloud Stream also provides support for Avro-based converters and schema evolution. See the specific section for details.
The @StreamListener
annotation provides a convenient way for converting incoming messages without the need to specify the content type of an input channel.
During the dispatching process to methods annotated with @StreamListener
, a conversion will be applied automatically if the argument requires it.
For example, let’s consider a message with the String content {"greeting":"Hello, world"}
and a content-type
header of application/json
is received on the input channel.
Let us consider the following application that receives it:
public class GreetingMessage { String greeting; public String getGreeting() { return greeting; } public void setGreeting(String greeting) { this.greeting = greeting; } } @EnableBinding(Sink.class) @EnableAutoConfiguration public static class GreetingSink { @StreamListener(Sink.INPUT) public void receive(Greeting greeting) { // handle Greeting } }
The argument of the method will be populated automatically with the POJO containing the unmarshalled form of the JSON String.