6. Content Type and Transformation

To allow you to propagate information about the content type of produced messages, Spring Cloud Stream attaches, by default, a contentType header to outbound messages. For middleware that does not directly support headers, Spring Cloud Stream provides its own mechanism of automatically wrapping outbound messages in an envelope of its own. For middleware that does support headers, Spring Cloud Stream applications may receive messages with a given content type from non-Spring Cloud Stream applications.

Spring Cloud Stream can handle messages based on this information in two ways:

Spring Cloud Stream allows you to declaratively configure type conversion for inputs and outputs using the spring.cloud.stream.bindings.<channelName>.content-type property of a binding. Note that general type conversion may also be accomplished easily by using a transformer inside your application. Currently, Spring Cloud Stream natively supports the following type conversions commonly used in streams:

Where JSON represents either a byte array or String payload containing JSON. Currently, Objects may be converted from a JSON byte array or String. Converting to JSON always produces a String.

6.1 MIME types

content-type values are parsed as media types, e.g., application/json or text/plain;charset=UTF-8. MIME types are especially useful for indicating how to convert to String or byte[] content. Spring Cloud Stream also uses MIME type format to represent Java types, using the general type application/x-java-object with a type parameter. For example, application/x-java-object;type=java.util.Map or application/x-java-object;type=com.bar.Foo can be set as the content-type property of an input binding. In addition, Spring Cloud Stream provides custom MIME types, notably, application/x-spring-tuple to specify a Tuple.

6.2 MIME types and Java types

The type conversions Spring Cloud Stream provides out of the box are summarized in the following table: 'Source Payload' means the payload before conversion and 'Target Payload' means the 'payload' after conversion. The type conversion can occur either on the 'producer' side (output) or at the 'consumer' side (input).

Source PayloadTarget Payloadcontent-type header (source message)content-type header (after conversion)Comments

POJO

JSON String

ignored

application/json

 

Tuple

JSON String

ignored

application/json

JSON is tailored for Tuple

POJO

String (toString())

ignored

text/plain, java.lang.String

 

POJO

byte[] (java.io serialized)

ignored

application/x-java-serialized-object

 

JSON byte[] or String

POJO

application/json (or none)

application/x-java-object

 

byte[] or String

Serializable

application/x-java-serialized-object

application/x-java-object

 

JSON byte[] or String

Tuple

application/json (or none)

application/x-spring-tuple

 

byte[]

String

any

text/plain, java.lang.String

will apply any Charset specified in the content-type header

String

byte[]

any

application/octet-stream

will apply any Charset specified in the content-type header

[Note]Note

Conversion applies to payloads that require type conversion. For example, if an application produces an XML string with outputType=application/json, the payload will not be converted from XML to JSON. This is because the payload send to the outbound channel is already a String so no conversion will be applied at runtime. It is also important to note that when using the default serialization mechanism, the payload class must be shared between the sending and receiving application, and compatible with the binary content. This can create issues when application code changes independently in the two applications, as the binary format and code may become incompatible.

[Tip]Tip

While conversion is supported for both inbound and outbound channels, it is especially recommended to be used for the conversion of outbound messages. For the conversion of inbound messages, especially when the target is a POJO, the @StreamListener support will perform the conversion automatically.

6.3 Customizing message conversion

Besides the conversions that it supports out of the box, Spring Cloud Stream also supports registering your own message conversion implementations. This allows you to send and receive data in a variety of custom formats, including binary, and associate them with specific contentTypes. Spring Cloud Stream registers all the beans of type org.springframework.messaging.converter.MessageConverter as custom message converters along with the out of the box message converters.

If your message converter needs to work with a specific content-type and target class (for both input and output), then the message converter needs to extend org.springframework.messaging.converter.AbstractMessageConverter. For conversion when using @StreamListener, a message converter that implements org.springframework.messaging.converter.MessageConverter would suffice.

Here is an example of creating a message converter bean (with the content-type application/bar) inside a Spring Cloud Stream application:

@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {

  ...

  @Bean
  public MessageConverter customMessageConverter() {
    return new MyCustomMessageConverter();
  }
public class MyCustomMessageConverter extends AbstractMessageConverter {

	public MyCustomMessageConverter() {
		super(new MimeType("application", "bar"));
	}

	@Override
  protected boolean supports(Class<?> clazz) {
    return (Bar.class == clazz);
  }

	@Override
	protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
		Object payload = message.getPayload();
		return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
	}
}

6.4 Schema-based message converters

Spring Cloud Stream provides support for schema-based message converters through its spring-cloud-stream-schema module. Currently, the only serialization format supported out of the box is Apache Avro, with more formats to be added in future versions.

6.4.1 Apache Avro Message Converters

The spring-cloud-stream-schema module contains two types of message converters that can be used for Apache Avro serialization:

  • converters using the class information of the serialized/deserialized objects, or a schema with a location known at startup;
  • converters using a schema registry - they locate the schemas at runtime, as well as dynamically registering new schemas as domain objects evolve.

Converters with schema support

The AvroSchemaMessageConverter supports serializing and deserializing messages either using a predefined schema or by using the schema information available in the class (either reflectively, or contained in the SpecificRecord). If the target type of the conversion is a GenericRecord, then a schema must be set.

For using it, you can simply add it to the application context, optionally specifying one ore more MimeTypes to associate it with. The default MimeType is application/avro.

Here is an example of configuring it in a sink application registering the Apache Avro MessageConverter, without a predefined schema:

@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {

  ...

  @Bean
  public MessageConverter userMessageConverter() {
      return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
  }
}

Conversely, here is an application that registers a converter with a predefined schema, to be found on the classpath:

@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {

  ...

  @Bean
  public MessageConverter userMessageConverter() {
      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
      converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
      return converter;
  }
}

In order to understand the schema registry client converter, we will describe the schema registry support first.

6.5 Schema Registry Support

Most serialization models, especially the ones that aim for portability across different platforms and languages, rely on a schema that describes how the data is serialized in the binary payload. In order to serialize the data and then to interpret it, both the sending and receiving sides must have access to a schema that describes the binary format. In certain cases, the schema can be inferred from the payload type on serialization, or from the target type on deserialization, but in a lot of cases applications benefit from having access to an explicit schema that describes the binary data format. A schema registry allows you to store schema information in a textual format (typically JSON) and makes that information accessible to various applications that need it to receive and send data in binary format. A schema is referenceable as a tuple consisting of:

  • a subject that is the logical name of the schema;
  • the schema version;
  • the schema format which describes the binary format of the data.

6.5.1 Schema Registry Server

Spring Cloud Stream provides a schema registry server implementation. In order to use it, you can simply add the spring-cloud-stream-schema-server artifact to your project and use the @EnableSchemaRegistryServer annotation, adding the schema registry server REST controller to your application. This annotation is intended to be used with Spring Boot web applications, and the listening port of the server is controlled by the server.port setting. The spring.cloud.stream.schema.server.path setting can be used to control the root path of the schema server (especially when it is embedded in other applications). The spring.cloud.stream.schema.server.allowSchemaDeletion boolean setting enables the deletion of schema. By default this is disabled.

The schema registry server uses a relational database to store the schemas. By default, it uses an embedded database. You can customize the schema storage using the Spring Boot SQL database and JDBC configuration options.

A Spring Boot application enabling the schema registry looks as follows:

@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServerApplication {
	public static void main(String[] args) {
		SpringApplication.run(SchemaRegistryServerApplication.class, args);
	}
}

Schema Registry Server API

The Schema Registry Server API consists of the following operations:

POST /

Register a new schema.

Accepts JSON payload with the following fields:

  • subject the schema subject;
  • format the schema format;
  • definition the schema definition.

Response is a schema object in JSON format, with the following fields:

  • id the schema id;
  • subject the schema subject;
  • format the schema format;
  • version the schema version;
  • definition the schema definition.
GET /{subject}/{format}/{version}

Retrieve an existing schema by its subject, format and version.

Response is a schema object in JSON format, with the following fields:

  • id the schema id;
  • subject the schema subject;
  • format the schema format;
  • version the schema version;
  • definition the schema definition.
GET /{subject}/{format}

Retrieve a list of existing schema by its subject and format.

Response is a list of schemas with each schema object in JSON format, with the following fields:

  • id the schema id;
  • subject the schema subject;
  • format the schema format;
  • version the schema version;
  • definition the schema definition.
GET /schemas/{id}

Retrieve an existing schema by its id.

Response is a schema object in JSON format, with the following fields:

  • id the schema id;
  • subject the schema subject;
  • format the schema format;
  • version the schema version;
  • definition the schema definition.
DELETE /{subject}/{format}/{version}

Delete an existing schema by its subject, format and version.

DELETE /schemas/{id}

Delete an existing schema by its id.

DELETE /{subject}

Delete existing schemas by their subject.

[Note]Note

This note applies to users of Spring Cloud Stream 1.1.0.RELEASE only. Spring Cloud Stream 1.1.0.RELEASE used the table name schema for storing Schema objects, which is a keyword in a number of database implementations. To avoid any conflicts in the future, starting with 1.1.1.RELEASE we have opted for the name SCHEMA_REPOSITORY for the storage table. Any Spring Cloud Stream 1.1.0.RELEASE users that are upgrading are advised to migrate their existing schemas to the new table before upgrading.

6.5.2 Schema Registry Client

The client-side abstraction for interacting with schema registry servers is the SchemaRegistryClient interface, with the following structure:

public interface SchemaRegistryClient {

	SchemaRegistrationResponse register(String subject, String format, String schema);

	String fetch(SchemaReference schemaReference);

	String fetch(Integer id);

}

Spring Cloud Stream provides out of the box implementations for interacting with its own schema server, as well as for interacting with the Confluent Schema Registry.

A client for the Spring Cloud Stream schema registry can be configured using the @EnableSchemaRegistryClient as follows:

  @EnableBinding(Sink.class)
  @SpringBootApplication
  @EnableSchemaRegistryClient
  public static class AvroSinkApplication {
    ...
  }
[Note]Note

The default converter is optimized to cache not only the schemas from the remote server but also the parse() and toString() methods that are quite expensive. Because of this, it uses a DefaultSchemaRegistryClient that does not caches responses. If you intend to use the client directly on your code, you can request a bean that also caches responses to be created. To do that, just add the property spring.cloud.stream.schemaRegistryClient.cached=true to your application properties.

6.5.3 Avro Schema Registry Client Message Converters

For Spring Boot applications that have a SchemaRegistryClient bean registered with the application context, Spring Cloud Stream will auto-configure an Apache Avro message converter that uses the schema registry client for schema management. This eases schema evolution, as applications that receive messages can get easy access to a writer schema that can be reconciled with their own reader schema.

For outbound messages, the MessageConverter will be activated if the content type of the channel is set to application/*+avro, e.g.:

spring.cloud.stream.bindings.output.contentType=application/*+avro

During the outbound conversion, the message converter will try to infer the schemas of the outbound messages based on their type and register them to a subject based on the payload type using the SchemaRegistryClient. If an identical schema is already found, then a reference to it will be retrieved. If not, the schema will be registered and a new version number will be provided. The message will be sent with a contentType header using the scheme application/[prefix].[subject].v[version]+avro, where prefix is configurable and subject is deduced from the payload type.

For example, a message of the type User may be sent as a binary payload with a content type of application/vnd.user.v2+avro, where user is the subject and 2 is the version number.

When receiving messages, the converter will infer the schema reference from the header of the incoming message and will try to retrieve it. The schema will be used as the writer schema in the deserialization process.

6.6 @StreamListener and Message Conversion

The @StreamListener annotation provides a convenient way for converting incoming messages without the need to specify the content type of an input channel. During the dispatching process to methods annotated with @StreamListener, a conversion will be applied automatically if the argument requires it.

For example, let’s consider a message with the String content {"greeting":"Hello, world"} and a content-type header of application/json is received on the input channel. Let us consider the following application that receives it:

public class GreetingMessage {

  String greeting;

  public String getGreeting() {
    return greeting;
  }

  public void setGreeting(String greeting) {
    this.greeting = greeting;
  }
}

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class GreetingSink {

		@StreamListener(Sink.INPUT)
		public void receive(Greeting greeting) {
			// handle Greeting
		}
	}

The argument of the method will be populated automatically with the POJO containing the unmarshalled form of the JSON String.