Message Headers

The 0.11.0.0 client introduced support for headers in messages. As of version 2.0, Spring for Apache Kafka now supports mapping these headers to and from spring-messaging MessageHeaders.

Previous versions mapped ConsumerRecord and ProducerRecord to spring-messaging Message<?>, where the value property is mapped to and from the payload and other properties (topic, partition, and so on) were mapped to headers. This is still the case, but additional (arbitrary) headers can now be mapped.

Apache Kafka headers have a simple API, shown in the following interface definition:

public interface Header {

    String key();

    byte[] value();

}

The KafkaHeaderMapper strategy is provided to map header entries between Kafka Headers and MessageHeaders. Its interface definition is as follows:

public interface KafkaHeaderMapper {

    void fromHeaders(MessageHeaders headers, Headers target);

    void toHeaders(Headers source, Map<String, Object> target);

}

The SimpleKafkaHeaderMapper maps raw headers as byte[], with configuration options for conversion to String values.

The DefaultKafkaHeaderMapper maps the key to the MessageHeaders header name and, in order to support rich header types for outbound messages, JSON conversion is performed. A "special" header (with a key of spring_json_header_types) contains a JSON map of <key>:<type>. This header is used on the inbound side to provide appropriate conversion of each header value to the original type.

On the inbound side, all Kafka Header instances are mapped to MessageHeaders. On the outbound side, by default, all MessageHeaders are mapped, except id, timestamp, and the headers that map to ConsumerRecord properties.

You can specify which headers are to be mapped for outbound messages, by providing patterns to the mapper. The following listing shows a number of example mappings:

public DefaultKafkaHeaderMapper() { (1)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
    ...
}

public DefaultKafkaHeaderMapper(String... patterns) { (3)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
    ...
}
1 Uses a default Jackson ObjectMapper and maps most headers, as discussed before the example.
2 Uses the provided Jackson ObjectMapper and maps most headers, as discussed before the example.
3 Uses a default Jackson ObjectMapper and maps headers according to the provided patterns.
4 Uses the provided Jackson ObjectMapper and maps headers according to the provided patterns.

Patterns are rather simple and can contain a leading wildcard (*), a trailing wildcard, or both (for example, *.cat.*). You can negate patterns with a leading !. The first pattern that matches a header name (whether positive or negative) wins.

When you provide your own patterns, we recommend including !id and !timestamp, since these headers are read-only on the inbound side.

By default, the mapper deserializes only classes in java.lang and java.util. You can trust other (or all) packages by adding trusted packages with the addTrustedPackages method. If you receive messages from untrusted sources, you may wish to add only those packages you trust. To trust all packages, you can use mapper.addTrustedPackages("*").
Mapping String header values in a raw form is useful when communicating with systems that are not aware of the mapper’s JSON format.

Starting with version 2.2.5, you can specify that certain string-valued headers should not be mapped using JSON, but to/from a raw byte[]. The AbstractKafkaHeaderMapper has new properties; mapAllStringsOut when set to true, all string-valued headers will be converted to byte[] using the charset property (default UTF-8). In addition, there is a property rawMappedHeaders, which is a map of header name : boolean; if the map contains a header name, and the header contains a String value, it will be mapped as a raw byte[] using the charset. This map is also used to map raw incoming byte[] headers to String using the charset if, and only if, the boolean in the map value is true. If the boolean is false, or the header name is not in the map with a true value, the incoming header is simply mapped as the raw unmapped header.

The following test case illustrates this mechanism.

@Test
public void testSpecificStringConvert() {
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    Map<String, Boolean> rawMappedHeaders = new HashMap<>();
    rawMappedHeaders.put("thisOnesAString", true);
    rawMappedHeaders.put("thisOnesBytes", false);
    mapper.setRawMappedHeaders(rawMappedHeaders);
    Map<String, Object> headersMap = new HashMap<>();
    headersMap.put("thisOnesAString", "thing1");
    headersMap.put("thisOnesBytes", "thing2");
    headersMap.put("alwaysRaw", "thing3".getBytes());
    MessageHeaders headers = new MessageHeaders(headersMap);
    Headers target = new RecordHeaders();
    mapper.fromHeaders(headers, target);
    assertThat(target).containsExactlyInAnyOrder(
            new RecordHeader("thisOnesAString", "thing1".getBytes()),
            new RecordHeader("thisOnesBytes", "thing2".getBytes()),
            new RecordHeader("alwaysRaw", "thing3".getBytes()));
    headersMap.clear();
    mapper.toHeaders(target, headersMap);
    assertThat(headersMap).contains(
            entry("thisOnesAString", "thing1"),
            entry("thisOnesBytes", "thing2".getBytes()),
            entry("alwaysRaw", "thing3".getBytes()));
}

Both header mappers map all inbound headers, by default. Starting with version 2.8.8, the patterns, can also applied to inbound mapping. To create a mapper for inbound mapping, use one of the static methods on the respective mapper:

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}

public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

For example:

DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");

This will exclude all headers beginning with abc and include all others.

By default, the DefaultKafkaHeaderMapper is used in the MessagingMessageConverter and BatchMessagingMessageConverter, as long as Jackson is on the classpath.

With the batch converter, the converted headers are available in the KafkaHeaders.BATCH_CONVERTED_HEADERS as a List<Map<String, Object>> where the map in a position of the list corresponds to the data position in the payload.

If there is no converter (either because Jackson is not present or it is explicitly set to null), the headers from the consumer record are provided unconverted in the KafkaHeaders.NATIVE_HEADERS header. This header is a Headers object (or a List<Headers> in the case of the batch converter), where the position in the list corresponds to the data position in the payload.

Certain types are not suitable for JSON serialization, and a simple toString() serialization might be preferred for these types. The DefaultKafkaHeaderMapper has a method called addToStringClasses() that lets you supply the names of classes that should be treated this way for outbound mapping. During inbound mapping, they are mapped as String. By default, only org.springframework.util.MimeType and org.springframework.http.MediaType are mapped this way.
Starting with version 2.3, handling of String-valued headers is simplified. Such headers are no longer JSON encoded, by default (i.e. they do not have enclosing "..." added). The type is still added to the JSON_TYPES header so the receiving system can convert back to a String (from byte[]). The mapper can handle (decode) headers produced by older versions (it checks for a leading "); in this way an application using 2.3 can consume records from older versions.
To be compatible with earlier versions, set encodeStrings to true, if records produced by a version using 2.3 might be consumed by applications using earlier versions. When all applications are using 2.3 or higher, you can leave the property at its default value of false.
@Bean
MessagingMessageConverter converter() {
    MessagingMessageConverter converter = new MessagingMessageConverter();
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    mapper.setEncodeStrings(true);
    converter.setHeaderMapper(mapper);
    return converter;
}

If using Spring Boot, it will auto configure this converter bean into the auto-configured KafkaTemplate; otherwise you should add this converter to the template.