Apache Kafka Streams Support

Starting with version 1.1.4, Spring for Apache Kafka provides first-class support for Kafka Streams. To use it from a Spring application, the kafka-streams jar must be present on classpath. It is an optional dependency of the Spring for Apache Kafka project and is not downloaded transitively.

Basics

The reference Apache Kafka Streams documentation suggests the following way of using the API:

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();

So, we have two main components:

  • StreamsBuilder: With an API to build KStream (or KTable) instances.

  • KafkaStreams: To manage the lifecycle of those instances.

All KStream instances exposed to a KafkaStreams instance by a single StreamsBuilder are started and stopped at the same time, even if they have different logic. In other words, all streams defined by a StreamsBuilder are tied with a single lifecycle control. Once a KafkaStreams instance has been closed by streams.close(), it cannot be restarted. Instead, a new KafkaStreams instance to restart stream processing must be created.

Spring Management

To simplify using Kafka Streams from the Spring application context perspective and use the lifecycle management through a container, Spring for Apache Kafka introduces StreamsBuilderFactoryBean. This is an AbstractFactoryBean implementation to expose a StreamsBuilder singleton instance as a bean. The following example creates such a bean:

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
Starting with version 2.2, the stream configuration is now provided as a KafkaStreamsConfiguration object rather than a StreamsConfig.

The StreamsBuilderFactoryBean also implements SmartLifecycle to manage the lifecycle of an internal KafkaStreams instance. Similar to the Kafka Streams API, you must define the KStream instances before you start the KafkaStreams. That also applies for the Spring API for Kafka Streams. Therefore, when you use default autoStartup = true on the StreamsBuilderFactoryBean, you must declare KStream instances on the StreamsBuilder before the application context is refreshed. For example, KStream can be a regular bean definition, while the Kafka Streams API is used without any impacts. The following example shows how to do so:

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}

If you would like to control the lifecycle manually (for example, stopping and starting by some condition), you can reference the StreamsBuilderFactoryBean bean directly by using the factory bean (&) prefix. Since StreamsBuilderFactoryBean uses its internal KafkaStreams instance, it is safe to stop and restart it again. A new KafkaStreams is created on each start(). You might also consider using different StreamsBuilderFactoryBean instances, if you would like to control the lifecycles for KStream instances separately.

You also can specify KafkaStreams.StateListener, Thread.UncaughtExceptionHandler, and StateRestoreListener options on the StreamsBuilderFactoryBean, which are delegated to the internal KafkaStreams instance. Also, apart from setting those options indirectly on StreamsBuilderFactoryBean, starting with version 2.1.5, you can use a KafkaStreamsCustomizer callback interface to configure an inner KafkaStreams instance. Note that KafkaStreamsCustomizer overrides the options provided by StreamsBuilderFactoryBean. If you need to perform some KafkaStreams operations directly, you can access that internal KafkaStreams instance by using StreamsBuilderFactoryBean.getKafkaStreams(). You can autowire StreamsBuilderFactoryBean bean by type, but you should be sure to use the full type in the bean definition, as the following example shows:

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

Alternatively, you can add @Qualifier for injection by name if you use interface bean definition. The following example shows how to do so:

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

Starting with version 2.4.1, the factory bean has a new property infrastructureCustomizer with type KafkaStreamsInfrastructureCustomizer; this allows customization of the StreamsBuilder (e.g. to add a state store) and/or the Topology before the stream is created.

public interface KafkaStreamsInfrastructureCustomizer {

    void configureBuilder(StreamsBuilder builder);

    void configureTopology(Topology topology);

}

Default no-op implementations are provided to avoid having to implement both methods if one is not required.

A CompositeKafkaStreamsInfrastructureCustomizer is provided, for when you need to apply multiple customizers.

KafkaStreams Micrometer Support

Introduced in version 2.5.3, you can configure a KafkaStreamsMicrometerListener to automatically register micrometer meters for the KafkaStreams object managed by the factory bean:

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));

Streams JSON Serialization and Deserialization

For serializing and deserializing data when reading or writing to topics or state stores in JSON format, Spring for Apache Kafka provides a JsonSerde implementation that uses JSON, delegating to the JsonSerializer and JsonDeserializer described in Serialization, Deserialization, and Message Conversion. The JsonSerde implementation provides the same configuration options through its constructor (target type or ObjectMapper). In the following example, we use the JsonSerde to serialize and deserialize the Cat payload of a Kafka stream (the JsonSerde can be used in a similar fashion wherever an instance is required):

stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");

When constructing the serializer/deserializer programmatically for use in the producer/consumer factory, since version 2.3, you can use the fluent API, which simplifies configuration.

stream.through(
    new JsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");

Using KafkaStreamBrancher

The KafkaStreamBrancher class introduces a more convenient way to build conditional branches on top of KStream.

Consider the following example that does not use KafkaStreamBrancher:

KStream<String, String>[] branches = builder.stream("source").branch(
        (key, value) -> value.contains("A"),
        (key, value) -> value.contains("B"),
        (key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

The following example uses KafkaStreamBrancher:

new KafkaStreamBrancher<String, String>()
        .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
        .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
        //default branch should not necessarily be defined in the end of the chain!
        .defaultBranch(ks -> ks.to("C"))
        .onTopOf(builder.stream("source"));
        //onTopOf method returns the provided stream so we can continue with method chaining

Configuration

To configure the Kafka Streams environment, the StreamsBuilderFactoryBean requires a KafkaStreamsConfiguration instance. See the Apache Kafka documentation for all possible options.

Starting with version 2.2, the stream configuration is now provided as a KafkaStreamsConfiguration object, rather than as a StreamsConfig.

To avoid boilerplate code for most cases, especially when you develop microservices, Spring for Apache Kafka provides the @EnableKafkaStreams annotation, which you should place on a @Configuration class. All you need is to declare a KafkaStreamsConfiguration bean named defaultKafkaStreamsConfig. A StreamsBuilderFactoryBean bean, named defaultKafkaStreamsBuilder, is automatically declared in the application context. You can declare and use any additional StreamsBuilderFactoryBean beans as well. You can perform additional customization of that bean, by providing a bean that implements StreamsBuilderFactoryBeanConfigurer. If there are multiple such beans, they will be applied according to their Ordered.order property.

Cleanup & Stop configuration

When the factory is stopped, the KafkaStreams.close() is called with 2 parameters :

  • closeTimeout : how long to to wait for the threads to shutdown (defaults to DEFAULT_CLOSE_TIMEOUT set to 10 seconds). Can be configured using StreamsBuilderFactoryBean.setCloseTimeout().

  • leaveGroupOnClose : to trigger consumer leave call from the group (defaults to false). Can be configured using StreamsBuilderFactoryBean.setLeaveGroupOnClose().

By default, when the factory bean is stopped, the KafkaStreams.cleanUp() method is called. Starting with version 2.1.2, the factory bean has additional constructors, taking a CleanupConfig object that has properties to let you control whether the cleanUp() method is called during start() or stop() or neither. Starting with version 2.7, the default is to never clean up local state.

Header Enricher

Version 3.0 added the HeaderEnricherProcessor extension of ContextualProcessor; providing the same functionality as the deprecated HeaderEnricher which implemented the deprecated Transformer interface. This can be used to add headers within the stream processing; the header values are SpEL expressions; the root object of the expression evaluation has 3 properties:

  • record - the org.apache.kafka.streams.processor.api.Record (key, value, timestamp, headers)

  • key - the key of the current record

  • value - the value of the current record

  • context - the ProcessorContext, allowing access to the current record metadata

The expressions must return a byte[] or a String (which will be converted to byte[] using UTF-8).

To use the enricher within a stream:

.process(() -> new HeaderEnricherProcessor(expressions))

The processor does not change the key or value; it simply adds headers.

You need a new instance for each record.
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))

Here is a simple example, adding one literal header and one variable:

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .process(() -> supplier)
        .to(OUTPUT);

MessagingProcessor

Version 3.0 added the MessagingProcessor extension of ContextualProcessor, providing the same functionality as the deprecated MessagingTransformer which implemented the deprecated Transformer interface. This allows a Kafka Streams topology to interact with a Spring Messaging component, such as a Spring Integration flow. The transformer requires an implementation of MessagingFunction.

@FunctionalInterface
public interface MessagingFunction {

    Message<?> exchange(Message<?> message);

}

Spring Integration automatically provides an implementation using its GatewayProxyFactoryBean. It also requires a MessagingMessageConverter to convert the key, value and metadata (including headers) to/from a Spring Messaging Message<?>. See [Calling a Spring Integration Flow from a KStream] for more information.

Recovery from Deserialization Exceptions

Version 2.3 introduced the RecoveringDeserializationExceptionHandler which can take some action when a deserialization exception occurs. Refer to the Kafka documentation about DeserializationExceptionHandler, of which the RecoveringDeserializationExceptionHandler is an implementation. The RecoveringDeserializationExceptionHandler is configured with a ConsumerRecordRecoverer implementation. The framework provides the DeadLetterPublishingRecoverer which sends the failed record to a dead-letter topic. See Publishing Dead-letter Records for more information about this recoverer.

To configure the recoverer, add the following properties to your streams configuration:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

Of course, the recoverer() bean can be your own implementation of ConsumerRecordRecoverer.

Interactive Query Support

Starting with version 3.2, Spring for Apache Kafka provides basic facilities required for interactive queries in Kafka Streams. Interactive queries are useful in stateful Kafka Streams applications since they provide a way to constantly query the stateful stores in the application. Thus, if an application wants to materialize the current view of the system under consideration, interactive queries provide a way to do that. To learn more about interacive queries, see this article. The support in Spring for Apache Kafka is centered around an API called KafkaStreamsInteractiveQueryService which is a facade around interactive queries APIs in Kafka Streams library. An application can create an instance of this service as a bean and then later on use it to retrieve the state store by its name.

The following code snippet shows an example.

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    return kafkaStreamsInteractiveQueryService;
}

Assuming that a Kafka Streams application has a state store called app-store, then that store can be retrieved via the KafkStreamsInteractiveQuery API as show below.

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

ReadOnlyKeyValueStore<Object, Object>  appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());

Once an application gains access to the state store, then it can query from it for key-value information.

In this case, the state store that the application uses is a read-only key value store. There are other types of state stores that a Kafka Streams application can use. For instance, if an application prefers to query a window based store, it can build that store in the Kafka Streams application business logic and then later on retrieve it. Because of this reason, the API to retrieve the queryable store in KafkaStreamsInteractiveQueryService has a generic store type signature, so that the end-user can assign the proper type.

Here is the type signature from the API.

public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)

When calling this method, the user can specifially ask for the proper state store type, as we have done in the above example.

Retrying State Store Retrieval

When trying to retrieve the state store using the KafkaStreamsInteractiveQueryService, there is a chance that the state store might not be found for various reasons. If those reasons are transitory, KafkaStreamsInteractiveQueryService provides an option to retry the retrieval of the state store by allowing to inject a custom RetryTemplate. By default, the RetryTemmplate that is used in KafkaStreamsInteractiveQueryService uses a maximum attempts of three with a fixed backoff of one second.

Here is how you can inject a custom RetryTemmplate into KafkaStreamsInteractiveQueryService with the maximum attempts of ten.

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
    RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
    retryTemplate.setRetryPolicy(retryPolicy);
    kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
    return kafkaStreamsInteractiveQueryService;
}

Querying Remote State Stores

The API shown above for retrieving the state store - retrieveQueryableStore is intended for locally available key-value state stores. In productions settings, Kafka Streams applications are most likely distributed based on the number of partitions. If a topic has four partitions and there are four instances of the same Kafka Streams processor running, then each instance maybe responsible for processing a single partition from the topic. In this scenario, calling retrieveQueryableStore may not give the correct result that an instance is looking for, although it might return a valid store. Let’s assume that the topic with four partitions has data about various keys and a single partition is always responsible for a specific key. If the instance that is calling retrieveQueryableStore is looking for information about a key that this instance does not host, then it will not receive any data. This is because the current Kafka Streams instance does not know anything about this key. To fix this, the calling instance first needs to make sure that they have the host information for the Kafka Streams processor instance where the particular key is hosted. This can be retrieved from any Kafka Streams instance under the same application.id as below.

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());

In the example code above, the calling instance is querying for a particular key 12345 from the state-store named app-store. The API also needs a corresponding key serializer, which in this case is the IntegerSerializer. Kafka Streams looks through all it’s instances under the same application.id and tries to find which instance hosts this particular key, Once found, it returns that host information as a HostInfo object.

This is how the API looks like:

public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)

When using multiple instances of the Kafka Streams processors of the same application.id in a distributed way like this, the application is supposed to provide an RPC layer where the state stores can be queried over an RPC endpoint such as a REST one. See this article for more details on this. When using Spring for Apache Kafka, it is very easy to add a Spring based REST endpoint by using the spring-web technologies. Once there is a REST endpoint, then that can be used to query the state stores from any Kafka Streams instance, given the HostInfo where the key is hosted is known to the instance.

If the key hosting the instance is the current instance, then the application does not need to call the RPC mechanism, but rather make an in-JVM call. However, the trouble is that an application may not know that the instance that is making the call is where the key is hosted because a particular server may lose a partition due to a consumer rebalance. To fix this issue, KafkaStreamsInteractiveQueryService provides a convenient API for querying the current host information via an API method getCurrentKafkaStreamsApplicationHostInfo() that returns the current HostInfo. The idea is that the application can first acquire information about where the key is held, and then compare the HostInfo with the one about the current instance. If the HostInfo data matches, then it can proceed with a simple JVM call via the retrieveQueryableStore, otherwise go with the RPC option.

Kafka Streams Example

The following example combines the various topics we have covered in this chapter:

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
                .reduce((String value1, String value2) -> value1 + value2,
                		Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}