Pulsar Headers

Pulsar does not have a first-class “header” concept but instead provides a map for custom user properties as well as methods to access the message metadata typically stored in a message header (eg. id and event-time). As such, the terms “Pulsar message header” and “Pulsar message metadata” are used interchangeably. The list of available message metadata (headers) can be found in PulsarHeaders.java.

Spring Headers

Spring Messaging provides first-class “header” support via its MessageHeaders abstraction.

The Pulsar message metadata can be consumed as Spring message headers. The list of available headers can be found in PulsarHeaders.java.

Accessing in Single Record based Consumer

The following example shows how you can access the various Pulsar Headers in an application that uses the single record mode of consuming:

@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
                @Header(PulsarHeaders.RAW_DATA) byte[] rawData,
                @Header("foo") String foo) {

}

In the preceding example, we access the values for the messageId and rawData message metadata as well as a custom message property named foo. The Spring @Header annotation is used for each header field.

You can also use Pulsar’s Message as the envelope to carry the payload. When doing so, the user can directly call the corresponding methods on the Pulsar message for retrieving the metadata. However, as a convenience, you can also retrieve it by using the Header annotation. Note that you can also use the Spring messaging Message envelope to carry the payload and then retrieve the Pulsar headers by using @Header.

Accessing in Batch Record based Consumer

In this section, we see how to access the various Pulsar Headers in an application that uses a batch consumer:

@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
					@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
					@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {

}

In the preceding example, we consume the data as a List<String>. When extracting the various headers, we do so as a List<> as well. Spring for Apache Pulsar ensures that the headers list corresponds to the data list.

You can also extract headers in the same manner when you use the batch listener and receive payloads as List<org.apache.pulsar.client.api.Message<?>, org.apache.pulsar.client.api.Messages<?>, or org.springframework.messaging.Messsge<?>.

Message Header Mapping

The PulsarHeaderMapper strategy is provided to map headers to and from Pulsar user properties and Spring MessageHeaders.

Its interface definition is as follows:

public interface PulsarHeaderMapper {

	Map<String, String> toPulsarHeaders(MessageHeaders springHeaders);

	MessageHeaders toSpringHeaders(Message<?> pulsarMessage);
}

The framework provides a couple of mapper implementations.

  • The JsonPulsarHeaderMapper maps headers as JSON in order to support rich header types and is the default when the Jackson JSON library is on the classpath.

  • The ToStringPulsarHeaderMapper maps headers as strings using the toString() method on the header values and is the fallback mapper.

JSON Header Mapper

The JsonPulsarHeaderMapper uses a “special” header (with a key of spring_json_header_types) that contains a JSON map of <key>:<type>. This header is used on the inbound side (Pulsar → Spring) to provide appropriate conversion of each header value to the original type.

Trusted Packages

By default, the JSON mapper deserializes classes in all packages. However, if you receive messages from untrusted sources, you may wish to add only those packages you trust via the trustedPackages property on a custom configured JsonPulsarHeaderMapper bean you provide.

ToString Classes

Certain types are not suitable for JSON serialization, and a simple toString() serialization might be preferred for these types. The JsonPulsarHeaderMapper has a property called addToStringClasses() that lets you supply the names of classes that should be treated this way for outbound mapping. During inbound mapping, they are mapped as String. By default, only org.springframework.util.MimeType and org.springframework.http.MediaType are mapped this way.

Custom ObjectMapper

The JSON mapper uses a reasonable configured Jackson 2 ObjectMapper to handle serialization of header values. However, to provide a custom object mapper one must simply provide an ObjectMapper bean with the name pulsarHeaderObjectMapper. For example:

@Configuration(proxyBeanMethods = false)
static class PulsarHeadersCustomObjectMapperTestConfig {

    @Bean(name = "pulsarHeaderObjectMapper")
    ObjectMapper customObjectMapper() {
        var objectMapper = new ObjectMapper();
		// do things with your special header object mapper here
        return objectMapper;
    }
}
The object mapper in the example above should be an instance of com.fasterxml.jackson.databind.ObjectMapper, not the shaded org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper.
The same limitations regarding Jackson 2 vs. Jackson 3 apply here }

Inbound/Outbound Patterns

On the inbound side, by default, all Pulsar headers (message metadata plus user properties) are mapped to MessageHeaders. On the outbound side, by default, all MessageHeaders are mapped, except id, timestamp, and the headers that represent the Pulsar message metadata (i.e. the headers that are prefixed with pulsar_message_). You can specify which headers are mapped for inbound and outbound messages by configuring the inboundPatterns and outboundPatterns on a mapper bean you provide. You can include Pulsar message metadata headers on the outbound messages by adding the exact header name to the outboundPatterns as patterns are not supported for metadata headers. Patterns are rather simple and can contain a leading wildcard (*), a trailing wildcard, or both (for example, *.cat.*). You can negate patterns with a leading !. The first pattern that matches a header name (whether positive or negative) wins.

When you provide your own patterns, we recommend including !id and !timestamp, since these headers are read-only on the inbound side.