This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.1.4!

Record serialization and deserialization

Kafka Streams binder allows you to serialize and deserialize records in two ways. One is the native serialization and deserialization facilities provided by Kafka and the other one is the message conversion capabilities of Spring Cloud Stream framework. Lets look at some details.

Inbound deserialization

Keys are always deserialized using native Serdes.

For values, by default, deserialization on the inbound is natively performed by Kafka. Please note that this is a major change on default behavior from previous versions of Kafka Streams binder where the deserialization was done by the framework.

Kafka Streams binder will try to infer matching Serde types by looking at the type signature of java.util.function.Function|Consumer. Here is the order that it matches the Serdes.

  • If the application provides a bean of type Serde and if the return type is parameterized with the actual type of the incoming key or value type, then it will use that Serde for inbound deserialization. For e.g. if you have the following in the application, the binder detects that the incoming value type for the KStream matches with a type that is parameterized on a Serde bean. It will use that for inbound deserialization.

@Bean
public Serde<Foo> customSerde() {
 ...
}

@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
  • Next, it looks at the types and see if they are one of the types exposed by Kafka Streams. If so, use them. Here are the Serde types that the binder will try to match from Kafka Streams.

    Integer, Long, Short, Double, Float, byte[], UUID and String.
  • If none of the Serdes provided by Kafka Streams don’t match the types, then it will use JsonSerde provided by Spring Kafka. In this case, the binder assumes that the types are JSON friendly. This is useful if you have multiple value objects as inputs since the binder will internally infer them to correct Java types. Before falling back to the JsonSerde though, the binder checks at the default Serde s set in the Kafka Streams configuration to see if it is a Serde that it can match with the incoming KStream’s types.

If none of the above strategies worked, then the applications must provide the Serde s through configuration. This can be configured in two ways - binding or default.

First the binder will look if a Serde is provided at the binding level. For e.g. if you have the following processor,

@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}

then, you can provide a binding level Serde using the following:

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
If you provide Serde as abover per input binding, then that will takes higher precedence and the binder will stay away from any Serde inference.

If you want the default key/value Serdes to be used for inbound deserialization, you can do so at the binder level.

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

If you don’t want the native decoding provided by Kafka, you can rely on the message conversion features that Spring Cloud Stream provides. Since native decoding is the default, in order to let Spring Cloud Stream deserialize the inbound value object, you need to explicitly disable native decoding.

For e.g. if you have the same BiFunction processor as above, then spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false You need to disable native decoding for all the inputs individually. Otherwise, native decoding will still be applied for those you do not disable.

By default, Spring Cloud Stream will use application/json as the content type and use an appropriate json message converter. You can use custom message converters by using the following property and an appropriate MessageConverter bean.

spring.cloud.stream.bindings.process-in-0.contentType

Outbound serialization

Outbound serialization pretty much follows the same rules as above for inbound deserialization. As with the inbound deserialization, one major change from the previous versions of Spring Cloud Stream is that the serialization on the outbound is handled by Kafka natively. Before 3.0 versions of the binder, this was done by the framework itself.

Keys on the outbound are always serialized by Kafka using a matching Serde that is inferred by the binder. If it can’t infer the type of the key, then that needs to be specified using configuration.

Value serdes are inferred using the same rules used for inbound deserialization. First it matches to see if the outbound type is from a provided bean in the application. If not, it checks to see if it matches with a Serde exposed by Kafka such as - Integer, Long, Short, Double, Float, byte[], UUID and String. If that doesnt’t work, then it falls back to JsonSerde provided by the Spring Kafka project, but first look at the default Serde configuration to see if there is a match. Keep in mind that all these happen transparently to the application. If none of these work, then the user has to provide the Serde to use by configuration.

Lets say you are using the same BiFunction processor as above. Then you can configure outbound key/value Serdes as following.

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

If Serde inference fails, and no binding level Serdes are provided, then the binder falls back to the JsonSerde, but look at the default Serdes for a match.

Default serdes are configured in the same way as above where it is described under deserialization.

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

If your application uses the branching feature and has multiple output bindings, then these have to be configured per binding. Once again, if the binder is capable of inferring the Serde types, you don’t need to do this configuration.

If you don’t want the native encoding provided by Kafka, but want to use the framework provided message conversion, then you need to explicitly disable native encoding since since native encoding is the default. For e.g. if you have the same BiFunction processor as above, then spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false You need to disable native encoding for all the output individually in the case of branching. Otherwise, native encoding will still be applied for those you don’t disable.

When conversion is done by Spring Cloud Stream, by default, it will use application/json as the content type and use an appropriate json message converter. You can use custom message converters by using the following property and a corresponding MessageConverter bean.

spring.cloud.stream.bindings.process-out-0.contentType

When native encoding/decoding is disabled, binder will not do any inference as in the case of native Serdes. Applications need to explicitly provide all the configuration options. For that reason, it is generally advised to stay with the default options for de/serialization and stick with native de/serialization provided by Kafka Streams when you write Spring Cloud Stream Kafka Streams applications. The one scenario in which you must use message conversion capabilities provided by the framework is when your upstream producer is using a specific serialization strategy. In that case, you want to use a matching deserialization strategy as native mechanisms may fail. When relying on the default Serde mechanism, the applications must ensure that the binder has a way forward with correctly map the inbound and outbound with a proper Serde, as otherwise things might fail.

It is worth to mention that the data de/serialization approaches outlined above are only applicable on the edges of your processors, i.e. - inbound and outbound. Your business logic might still need to call Kafka Streams API’s that explicitly need Serde objects. Those are still the responsibility of the application and must be handled accordingly by the developer.