Serialization, Deserialization, and Message Conversion

Overview

Apache Kafka provides a high-level API for serializing and deserializing record values as well as their keys. It is present with the org.apache.kafka.common.serialization.Serializer<T> and org.apache.kafka.common.serialization.Deserializer<T> abstractions with some built-in implementations. Meanwhile, we can specify serializer and deserializer classes by using Producer or Consumer configuration properties. The following example shows how to do so:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

For more complex or particular cases, the KafkaConsumer (and, therefore, KafkaProducer) provides overloaded constructors to accept Serializer and Deserializer instances for keys and values, respectively.

When you use this API, the DefaultKafkaProducerFactory and DefaultKafkaConsumerFactory also provide properties (through constructors or setter methods) to inject custom Serializer and Deserializer instances into the target Producer or Consumer. Also, you can pass in Supplier<Serializer> or Supplier<Deserializer> instances through constructors - these Suppliers are called on creation of each Producer or Consumer.

String serialization

Since version 2.5, Spring for Apache Kafka provides ToStringSerializer and ParseStringDeserializer classes that use String representation of entities. They rely on methods toString and some Function<String> or BiFunction<String, Headers> to parse the String and populate properties of an instance. Usually, this would invoke some static method on the class, such as parse:

ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);

By default, the ToStringSerializer is configured to convey type information about the serialized entity in the record Headers. You can disable this by setting the addTypeInfo property to false. This information can be used by ParseStringDeserializer on the receiving side.

  • ToStringSerializer.ADD_TYPE_INFO_HEADERS (default true): You can set it to false to disable this feature on the ToStringSerializer (sets the addTypeInfo property).

ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
    byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
    String entityType = new String(header);

    if (entityType.contains("Thing")) {
        return Thing.parse(str);
    }
    else {
        // ...parsing logic
    }
});

You can configure the Charset used to convert String to/from byte[] with the default being UTF-8.

You can configure the deserializer with the name of the parser method using ConsumerConfig properties:

  • ParseStringDeserializer.KEY_PARSER

  • ParseStringDeserializer.VALUE_PARSER

The properties must contain the fully qualified name of the class followed by the method name, separated by a period .. The method must be static and have a signature of either (String, Headers) or (String).

A ToFromStringSerde is also provided, for use with Kafka Streams.

JSON

Spring for Apache Kafka also provides JsonSerializer and JsonDeserializer implementations that are based on the Jackson JSON object mapper. The JsonSerializer allows writing any Java object as a JSON byte[]. The JsonDeserializer requires an additional Class<?> targetType argument to allow the deserialization of a consumed byte[] to the proper target object. The following example shows how to create a JsonDeserializer:

JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);

You can customize both JsonSerializer and JsonDeserializer with an ObjectMapper. You can also extend them to implement some particular configuration logic in the configure(Map<String, ?> configs, boolean isKey) method.

Starting with version 2.3, all the JSON-aware components are configured by default with a JacksonUtils.enhancedObjectMapper() instance, which comes with the MapperFeature.DEFAULT_VIEW_INCLUSION and DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES features disabled. Also such an instance is supplied with well-known modules for custom data types, such a Java time and Kotlin support. See JacksonUtils.enhancedObjectMapper() JavaDocs for more information. This method also registers a org.springframework.kafka.support.JacksonMimeTypeModule for org.springframework.util.MimeType objects serialization into the plain string for inter-platform compatibility over the network. A JacksonMimeTypeModule can be registered as a bean in the application context and it will be auto-configured into the Spring Boot ObjectMapper instance.

Also starting with version 2.3, the JsonDeserializer provides TypeReference-based constructors for better handling of target generic container types.

Starting with version 2.1, you can convey type information in record Headers, allowing the handling of multiple types. In addition, you can configure the serializer and deserializer by using the following Kafka properties. They have no effect if you have provided Serializer and Deserializer instances for KafkaConsumer and KafkaProducer, respectively.

Configuration Properties

  • JsonSerializer.ADD_TYPE_INFO_HEADERS (default true): You can set it to false to disable this feature on the JsonSerializer (sets the addTypeInfo property).

  • JsonSerializer.TYPE_MAPPINGS (default empty): See Mapping Types.

  • JsonDeserializer.USE_TYPE_INFO_HEADERS (default true): You can set it to false to ignore headers set by the serializer.

  • JsonDeserializer.REMOVE_TYPE_INFO_HEADERS (default true): You can set it to false to retain headers set by the serializer.

  • JsonDeserializer.KEY_DEFAULT_TYPE: Fallback type for deserialization of keys if no header information is present.

  • JsonDeserializer.VALUE_DEFAULT_TYPE: Fallback type for deserialization of values if no header information is present.

  • JsonDeserializer.TRUSTED_PACKAGES (default java.util, java.lang): Comma-delimited list of package patterns allowed for deserialization. * means deserializing all.

  • JsonDeserializer.TYPE_MAPPINGS (default empty): See Mapping Types.

  • JsonDeserializer.KEY_TYPE_METHOD (default empty): See Using Methods to Determine Types.

  • JsonDeserializer.VALUE_TYPE_METHOD (default empty): See Using Methods to Determine Types.

Starting with version 2.2, the type information headers (if added by the serializer) are removed by the deserializer. You can revert to the previous behavior by setting the removeTypeHeaders property to false, either directly on the deserializer or with the configuration property described earlier.

Starting with version 2.8, if you construct the serializer or deserializer programmatically as shown in Programmatic Construction, the above properties will be applied by the factories, as long as you have not set any properties explicitly (using set*() methods or using the fluent API). Previously, when creating programmatically, the configuration properties were never applied; this is still the case if you explicitly set properties on the object directly.

Mapping Types

Starting with version 2.2, when using JSON, you can now provide type mappings by using the properties in the preceding list. Previously, you had to customize the type mapper within the serializer and deserializer. Mappings consist of a comma-delimited list of token:className pairs. On outbound, the payload’s class name is mapped to the corresponding token. On inbound, the token in the type header is mapped to the corresponding class name.

The following example creates a set of mappings:

senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.hat");
The corresponding objects must be compatible.

If you use Spring Boot, you can provide these properties in the application.properties (or yaml) file. The following example shows how to do so:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat

You can perform only simple configuration with properties. For more advanced configuration (such as using a custom ObjectMapper in the serializer and deserializer), you should use the producer and consumer factory constructors that accept a pre-built serializer and deserializer. The following Spring Boot example overrides the default factories:

@Bean
public ConsumerFactory<String, Thing> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
    Map<String, Object> properties = new HashMap<>();
    // properties.put(..., ...)
    // ...
    return new DefaultKafkaConsumerFactory<>(properties,
        new StringDeserializer(), customValueDeserializer);
}

@Bean
public ProducerFactory<String, Thing> kafkaProducerFactory(JsonSerializer customValueSerializer) {
    return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
        new StringSerializer(), customValueSerializer);
}

Setters are also provided, as an alternative to using these constructors.

Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean useHeadersIfPresent argument (which is true by default). The following example shows how to do so:

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
        new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

Using Methods to Determine Types

Starting with version 2.5, you can now configure the deserializer, via properties, to invoke a method to determine the target type. If present, this will override any of the other techniques discussed above. This can be useful if the data is published by an application that does not use the Spring serializer and you need to deserialize to different types depending on the data, or other headers. Set these properties to the method name - a fully qualified class name followed by the method name, separated by a period .. The method must be declared as public static, have one of three signatures (String topic, byte[] data, Headers headers), (byte[] data, Headers headers) or (byte[] data) and return a Jackson JavaType.

  • JsonDeserializer.KEY_TYPE_METHOD : spring.json.key.type.method

  • JsonDeserializer.VALUE_TYPE_METHOD : spring.json.value.type.method

You can use arbitrary headers or inspect the data to determine the type.

JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);

JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);

public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
    // {"thisIsAFieldInThing1":"value", ...
    if (data[21] == '1') {
        return thing1Type;
    }
    else {
        return thing2Type;
    }
}

For more sophisticated data inspection consider using JsonPath or similar but, the simpler the test to determine the type, the more efficient the process will be.

The following is an example of creating the deserializer programmatically (when providing the consumer factory with the deserializer in the constructor):

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);

...

public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
    ...
}

Programmatic Construction

When constructing the serializer/deserializer programmatically for use in the producer/consumer factory, since version 2.3, you can use the fluent API, which simplifies configuration.

@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
        new JsonSerializer<MyKeyType>()
            .forKeys()
            .noTypeInfo(),
        new JsonSerializer<MyValueType>()
            .noTypeInfo());
    return pf;
}

@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
        new JsonDeserializer<>(MyKeyType.class)
            .forKeys()
            .ignoreTypeHeaders(),
        new JsonDeserializer<>(MyValueType.class)
            .ignoreTypeHeaders());
    return cf;
}

To provide type mapping programmatically, similar to Using Methods to Determine Types, use the typeFunction property.

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeFunction(MyUtils::thingOneOrThingTwo);

Alternatively, as long as you don’t use the fluent API to configure properties, or set them using set*() methods, the factories will configure the serializer/deserializer using the configuration properties; see Configuration Properties.

Delegating Serializer and Deserializer

Using Headers

Version 2.3 introduced the DelegatingSerializer and DelegatingDeserializer, which allow producing and consuming records with different key and/or value types. Producers must set a header DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR to a selector value that is used to select which serializer to use for the value and DelegatingSerializer.KEY_SERIALIZATION_SELECTOR for the key; if a match is not found, an IllegalStateException is thrown.

For incoming records, the deserializer uses the same headers to select the deserializer to use; if a match is not found or the header is not present, the raw byte[] is returned.

You can configure the map of selector to Serializer / Deserializer via a constructor, or you can configure it via Kafka producer/consumer properties with the keys DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG and DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG. For the serializer, the producer property can be a Map<String, Object> where the key is the selector and the value is a Serializer instance, a serializer Class or the class name. The property can also be a String of comma-delimited map entries, as shown below.

For the deserializer, the consumer property can be a Map<String, Object> where the key is the selector and the value is a Deserializer instance, a deserializer Class or the class name. The property can also be a String of comma-delimited map entries, as shown below.

To configure using properties, use the following syntax:

producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")

consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")

Producers would then set the DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR header to thing1 or thing2.

This technique supports sending different types to the same topic (or different topics).

Starting with version 2.5.1, it is not necessary to set the selector header, if the type (key or value) is one of the standard types supported by Serdes (Long, Integer, etc). Instead, the serializer will set the header to the class name of the type. It is not necessary to configure serializers or deserializers for these types, they will be created (once) dynamically.

For another technique to send different types to different topics, see Using RoutingKafkaTemplate.

By Type

Version 2.8 introduced the DelegatingByTypeSerializer.

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            null, new DelegatingByTypeSerializer(Map.of(
                    byte[].class, new ByteArraySerializer(),
                    Bytes.class, new BytesSerializer(),
                    String.class, new StringSerializer())));
}

Starting with version 2.8.3, you can configure the serializer to check if the map key is assignable from the target object, useful when a delegate serializer can serialize sub classes. In this case, if there are amiguous matches, an ordered Map, such as a LinkedHashMap should be provided.

By Topic

Starting with version 2.8, the DelegatingByTopicSerializer and DelegatingByTopicDeserializer allow selection of a serializer/deserializer based on the topic name. Regex Patterns are used to lookup the instance to use. The map can be configured using a constructor, or via properties (a comma delimited list of pattern:serializer).

producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArraySerializer.class.getName()
        + ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArrayDeserializer.class.getName()
        + ", topic[5-9]:" + StringDeserializer.class.getName());

Use KEY_SERIALIZATION_TOPIC_CONFIG when using this for keys.

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            new IntegerSerializer(),
            new DelegatingByTopicSerializer(Map.of(
                    Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
                    Pattern.compile("topic[5-9]"), new StringSerializer())),
                    new JsonSerializer<Object>());  // default
}

You can specify a default serializer/deserializer to use when there is no pattern match using DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT and DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT.

An additional property DelegatingByTopicSerialization.CASE_SENSITIVE (default true), when set to false makes the topic lookup case insensitive.

Retrying Deserializer

The RetryingDeserializer uses a delegate Deserializer and RetryTemplate to retry deserialization when the delegate might have transient errors, such as network issues, during deserialization.

ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
    new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
    new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));

Starting with version 3.1.2, a RecoveryCallback can be set on the RetryingDeserializer optionally.

Refer to the spring-retry project for configuration of the RetryTemplate with a retry policy, back off policy, etc.

Spring Messaging Message Conversion

Although the Serializer and Deserializer API is quite simple and flexible from the low-level Kafka Consumer and Producer perspective, you might need more flexibility at the Spring Messaging level, when using either @KafkaListener or Spring Integration’s Apache Kafka Support. To let you easily convert to and from org.springframework.messaging.Message, Spring for Apache Kafka provides a MessageConverter abstraction with the MessagingMessageConverter implementation and its JsonMessageConverter (and subclasses) customization. You can inject the MessageConverter into a KafkaTemplate instance directly and by using AbstractKafkaListenerContainerFactory bean definition for the @KafkaListener.containerFactory() property. The following example shows how to do so:

@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordMessageConverter(new JsonMessageConverter());
    return factory;
}
...
@KafkaListener(topics = "jsonData",
                containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}

When using Spring Boot, simply define the converter as a @Bean and Spring Boot auto configuration will wire it into the auto-configured template and container factory.

When you use a @KafkaListener, the parameter type is provided to the message converter to assist with the conversion.

This type inference can be achieved only when the @KafkaListener annotation is declared at the method level. With a class-level @KafkaListener, the payload type is used to select which @KafkaHandler method to invoke, so it must already have been converted before the method can be chosen.

On the consumer side, you can configure a JsonMessageConverter; it can handle ConsumerRecord values of type byte[], Bytes and String so should be used in conjunction with a ByteArrayDeserializer, BytesDeserializer or StringDeserializer. (byte[] and Bytes are more efficient because they avoid an unnecessary byte[] to String conversion). You can also configure the specific subclass of JsonMessageConverter corresponding to the deserializer, if you so wish.

On the producer side, when you use Spring Integration or the KafkaTemplate.send(Message<?> message) method (see Using KafkaTemplate), you must configure a message converter that is compatible with the configured Kafka Serializer.

  • StringJsonMessageConverter with StringSerializer

  • BytesJsonMessageConverter with BytesSerializer

  • ByteArrayJsonMessageConverter with ByteArraySerializer

Again, using byte[] or Bytes is more efficient because they avoid a String to byte[] conversion.

For convenience, starting with version 2.3, the framework also provides a StringOrBytesSerializer which can serialize all three value types so it can be used with any of the message converters.

Starting with version 2.7.1, message payload conversion can be delegated to a spring-messaging SmartMessageConverter; this enables conversion, for example, to be based on the MessageHeaders.CONTENT_TYPE header.

The KafkaMessageConverter.fromMessage() method is called for outbound conversion to a ProducerRecord with the message payload in the ProducerRecord.value() property. The KafkaMessageConverter.toMessage() method is called for inbound conversion from ConsumerRecord with the payload being the ConsumerRecord.value() property. The SmartMessageConverter.toMessage() method is called to create a new outbound Message<?> from the Message passed to fromMessage() (usually by KafkaTemplate.send(Message<?> msg)). Similarly, in the KafkaMessageConverter.toMessage() method, after the converter has created a new Message<?> from the ConsumerRecord, the SmartMessageConverter.fromMessage() method is called and then the final inbound message is created with the newly converted payload. In either case, if the SmartMessageConverter returns null, the original message is used.

When the default converter is used in the KafkaTemplate and listener container factory, you configure the SmartMessageConverter by calling setMessagingConverter() on the template and via the contentMessageConverter property on @KafkaListener methods.

Examples:

template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
    contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
    ...
}

Using Spring Data Projection Interfaces

Starting with version 2.1.1, you can convert JSON to a Spring Data Projection interface instead of a concrete type. This allows very selective, and low-coupled bindings to data, including the lookup of values from multiple places inside the JSON document. For example the following interface can be defined as message payload type:

interface SomeSample {

  @JsonPath({ "$.username", "$.user.name" })
  String getUsername();

}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
    String username = in.getUsername();
    ...
}

Accessor methods will be used to lookup the property name as field in the received JSON document by default. The @JsonPath expression allows customization of the value lookup, and even to define multiple JSON Path expressions, to look up values from multiple places until an expression returns an actual value.

To enable this feature, use a ProjectingMessageConverter configured with an appropriate delegate converter (used for outbound conversion and converting non-projection interfaces). You must also add spring-data:spring-data-commons and com.jayway.jsonpath:json-path to the classpath.

When used as the parameter to a @KafkaListener method, the interface type is automatically passed to the converter as normal.

Using ErrorHandlingDeserializer

When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll() returns. To solve this problem, the ErrorHandlingDeserializer has been introduced. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a null value and a DeserializationException in a header that contains the cause and the raw bytes. When you use a record-level MessageListener, if the ConsumerRecord contains a DeserializationException header for either the key or value, the container’s ErrorHandler is called with the failed ConsumerRecord. The record is not passed to the listener.

Alternatively, you can configure the ErrorHandlingDeserializer to create a custom value by providing a failedDeserializationFunction, which is a Function<FailedDeserializationInfo, T>. This function is invoked to create an instance of T, which is passed to the listener in the usual fashion. An object of type FailedDeserializationInfo, which contains all the contextual information is provided to the function. You can find the DeserializationException (as a serialized Java object) in headers. See the Javadoc for the ErrorHandlingDeserializer for more information.

You can use the DefaultKafkaConsumerFactory constructor that takes key and value Deserializer objects and wire in appropriate ErrorHandlingDeserializer instances that you have configured with the proper delegates. Alternatively, you can use consumer configuration properties (which are used by the ErrorHandlingDeserializer) to instantiate the delegates. The property names are ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS and ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS. The property value can be a class or class name. The following example shows how to set these properties:

... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

The following example uses a failedDeserializationFunction.

public class BadThing extends Thing {

  private final FailedDeserializationInfo failedDeserializationInfo;

  public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
    this.failedDeserializationInfo = failedDeserializationInfo;
  }

  public FailedDeserializationInfo getFailedDeserializationInfo() {
    return this.failedDeserializationInfo;
  }

}

public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {

  @Override
  public Thing apply(FailedDeserializationInfo info) {
    return new BadThing(info);
  }

}

The preceding example uses the following configuration:

...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
If the consumer is configured with an ErrorHandlingDeserializer, it is important to configure the KafkaTemplate and its producer with a serializer that can handle normal objects as well as raw byte[] values, which result from deserialization exceptions. The generic value type of the template should be Object. One technique is to use the DelegatingByTypeSerializer; an example follows:
@Bean
public ProducerFactory<String, Object> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
    new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
          MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

When using an ErrorHandlingDeserializer with a batch listener, you must check for the deserialization exceptions in message headers. When used with a DefaultBatchErrorHandler, you can use that header to determine which record the exception failed on and communicate to the error handler via a BatchListenerFailedException.

@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
    for (int i = 0; i < in.size(); i++) {
        Thing thing = in.get(i);
        if (thing == null
                && headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
            try {
                DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
                        headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
                if (deserEx != null) {
                    logger.error(deserEx, "Record at index " + i + " could not be deserialized");
                }
            }
            catch (Exception ex) {
                logger.error(ex, "Record at index " + i + " could not be deserialized");
            }
            throw new BatchListenerFailedException("Deserialization", deserEx, i);
        }
        process(thing);
    }
}

SerializationUtils.byteArrayToDeserializationException() can be used to convert the header to a DeserializationException.

When consuming List<ConsumerRecord<?, ?>, SerializationUtils.getExceptionFromHeader() is used instead:

@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
    for (int i = 0; i < in.size(); i++) {
        ConsumerRecord<String, Thing> rec = in.get(i);
        if (rec.value() == null) {
            DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
                    SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
            if (deserEx != null) {
                logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
                throw new BatchListenerFailedException("Deserialization", deserEx, i);
            }
        }
        process(rec.value());
    }
}
If you are also using a DeadLetterPublishingRecoverer, the record published for a DeserializationException will have a record.value() of type byte[]; this should not be serialized. Consider using a DelegatingByTypeSerializer configured to use a ByteArraySerializer for byte[] and the normal serializer (Json, Avro, etc) for all other types.

Starting with version 3.1, you can add a Validator to the ErrorHandlingDeserializer. If the delegate Deserializer successfully deserializes the object, but that object fails validation, an exception is thrown similar to a deserialization exception occurring. This allows the original raw data to be passed to the error handler. WHen creating the deserializer yourself, simply call setValidator; if you configure the serializer using properties, set the consumer configuration property ErrorHandlingDeserializer.VALIDATOR_CLASS to the class or fully qualified class name for your Validator. When using Spring Boot, this property name is spring.kafka.consumer.properties.spring.deserializer.validator.class.

Payload Conversion with Batch Listeners

You can also use a JsonMessageConverter within a BatchMessagingMessageConverter to convert batch messages when you use a batch listener container factory. See Serialization, Deserialization, and Message Conversion and Spring Messaging Message Conversion for more information.

By default, the type for the conversion is inferred from the listener argument. If you configure the JsonMessageConverter with a DefaultJackson2TypeMapper that has its TypePrecedence set to TYPE_ID (instead of the default INFERRED), the converter uses the type information in headers (if present) instead. This allows, for example, listener methods to be declared with interfaces instead of concrete classes. Also, the type converter supports mapping, so the deserialization can be to a different type than the source (as long as the data is compatible). This is also useful when you use class-level @KafkaListener instances where the payload must have already been converted to determine which method to invoke. The following example creates beans that use this method:

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
    return factory;
}

@Bean
public JsonMessageConverter converter() {
    return new JsonMessageConverter();
}

Note that, for this to work, the method signature for the conversion target must be a container object with a single generic parameter type, such as the following:

@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

Note that you can still access the batch headers.

If the batch converter has a record converter that supports it, you can also receive a list of messages where the payloads are converted according to the generic type. The following example shows how to do so:

@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
    ...
}

ConversionService Customization

Starting with version 2.1.1, the org.springframework.core.convert.ConversionService used by the default org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory to resolve parameters for the invocation of a listener method is supplied with all beans that implement any of the following interfaces:

  • org.springframework.core.convert.converter.Converter

  • org.springframework.core.convert.converter.GenericConverter

  • org.springframework.format.Formatter

This lets you further customize listener deserialization without changing the default configuration for ConsumerFactory and KafkaListenerContainerFactory.

Setting a custom MessageHandlerMethodFactory on the KafkaListenerEndpointRegistrar through a KafkaListenerConfigurer bean disables this feature.

Adding Custom HandlerMethodArgumentResolver to @KafkaListener

Starting with version 2.4.2 you are able to add your own HandlerMethodArgumentResolver and resolve custom method parameters. All you need is to implement KafkaListenerConfigurer and use method setCustomMethodArgumentResolvers() from class KafkaListenerEndpointRegistrar.

@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setCustomMethodArgumentResolvers(
            new HandlerMethodArgumentResolver() {

                @Override
                public boolean supportsParameter(MethodParameter parameter) {
                    return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
                }

                @Override
                public Object resolveArgument(MethodParameter parameter, Message<?> message) {
                    return new CustomMethodArgument(
                        message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
                    );
                }
            }
        );
    }

}

You can also completely replace the framework’s argument resolution by adding a custom MessageHandlerMethodFactory to the KafkaListenerEndpointRegistrar bean. If you do this, and your application needs to handle tombstone records, with a null value() (e.g. from a compacted topic), you should add a KafkaNullAwarePayloadArgumentResolver to the factory; it must be the last resolver because it supports all types and can match arguments without a @Payload annotation. If you are using a DefaultMessageHandlerMethodFactory, set this resolver as the last custom resolver; the factory will ensure that this resolver will be used before the standard PayloadMethodArgumentResolver, which has no knowledge of KafkaNull payloads.