Apache Kafka Streams Support

Starting with version 1.1.4, Spring for Apache Kafka provides first-class support for Kafka Streams. To use it from a Spring application, the kafka-streams jar must be present on classpath. It is an optional dependency of the Spring for Apache Kafka project and is not downloaded transitively.

Basics

The reference Apache Kafka Streams documentation suggests the following way of using the API:

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();

So, we have two main components:

  • StreamsBuilder: With an API to build KStream (or KTable) instances.

  • KafkaStreams: To manage the lifecycle of those instances.

All KStream instances exposed to a KafkaStreams instance by a single StreamsBuilder are started and stopped at the same time, even if they have different logic. In other words, all streams defined by a StreamsBuilder are tied with a single lifecycle control. Once a KafkaStreams instance has been closed by streams.close(), it cannot be restarted. Instead, a new KafkaStreams instance to restart stream processing must be created.

Spring Management

To simplify using Kafka Streams from the Spring application context perspective and use the lifecycle management through a container, Spring for Apache Kafka introduces StreamsBuilderFactoryBean. This is an AbstractFactoryBean implementation to expose a StreamsBuilder singleton instance as a bean. The following example creates such a bean:

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
Starting with version 2.2, the stream configuration is now provided as a KafkaStreamsConfiguration object rather than a StreamsConfig.

The StreamsBuilderFactoryBean also implements SmartLifecycle to manage the lifecycle of an internal KafkaStreams instance. Similar to the Kafka Streams API, you must define the KStream instances before you start the KafkaStreams. That also applies for the Spring API for Kafka Streams. Therefore, when you use default autoStartup = true on the StreamsBuilderFactoryBean, you must declare KStream instances on the StreamsBuilder before the application context is refreshed. For example, KStream can be a regular bean definition, while the Kafka Streams API is used without any impacts. The following example shows how to do so:

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}

If you would like to control the lifecycle manually (for example, stopping and starting by some condition), you can reference the StreamsBuilderFactoryBean bean directly by using the factory bean (&) prefix. Since StreamsBuilderFactoryBean uses its internal KafkaStreams instance, it is safe to stop and restart it again. A new KafkaStreams is created on each start(). You might also consider using different StreamsBuilderFactoryBean instances, if you would like to control the lifecycles for KStream instances separately.

You also can specify KafkaStreams.StateListener, Thread.UncaughtExceptionHandler, and StateRestoreListener options on the StreamsBuilderFactoryBean, which are delegated to the internal KafkaStreams instance. Also, apart from setting those options indirectly on StreamsBuilderFactoryBean, starting with version 2.1.5, you can use a KafkaStreamsCustomizer callback interface to configure an inner KafkaStreams instance. Note that KafkaStreamsCustomizer overrides the options provided by StreamsBuilderFactoryBean. If you need to perform some KafkaStreams operations directly, you can access that internal KafkaStreams instance by using StreamsBuilderFactoryBean.getKafkaStreams(). You can autowire StreamsBuilderFactoryBean bean by type, but you should be sure to use the full type in the bean definition, as the following example shows:

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

Alternatively, you can add @Qualifier for injection by name if you use interface bean definition. The following example shows how to do so:

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

Starting with version 2.4.1, the factory bean has a new property infrastructureCustomizer with type KafkaStreamsInfrastructureCustomizer; this allows customization of the StreamsBuilder (e.g. to add a state store) and/or the Topology before the stream is created.

public interface KafkaStreamsInfrastructureCustomizer {

    void configureBuilder(StreamsBuilder builder);

    void configureTopology(Topology topology);

}

Default no-op implementations are provided to avoid having to implement both methods if one is not required.

A CompositeKafkaStreamsInfrastructureCustomizer is provided, for when you need to apply multiple customizers.

KafkaStreams Micrometer Support

Introduced in version 2.5.3, you can configure a KafkaStreamsMicrometerListener to automatically register micrometer meters for the KafkaStreams object managed by the factory bean:

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));

Streams JSON Serialization and Deserialization

For serializing and deserializing data when reading or writing to topics or state stores in JSON format, Spring for Apache Kafka provides a JsonSerde implementation that uses JSON, delegating to the JsonSerializer and JsonDeserializer described in Serialization, Deserialization, and Message Conversion. The JsonSerde implementation provides the same configuration options through its constructor (target type or ObjectMapper). In the following example, we use the JsonSerde to serialize and deserialize the Cat payload of a Kafka stream (the JsonSerde can be used in a similar fashion wherever an instance is required):

stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");

When constructing the serializer/deserializer programmatically for use in the producer/consumer factory, since version 2.3, you can use the fluent API, which simplifies configuration.

stream.through(
    new JsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");

Using KafkaStreamBrancher

The KafkaStreamBrancher class introduces a more convenient way to build conditional branches on top of KStream.

Consider the following example that does not use KafkaStreamBrancher:

KStream<String, String>[] branches = builder.stream("source").branch(
        (key, value) -> value.contains("A"),
        (key, value) -> value.contains("B"),
        (key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

The following example uses KafkaStreamBrancher:

new KafkaStreamBrancher<String, String>()
        .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
        .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
        //default branch should not necessarily be defined in the end of the chain!
        .defaultBranch(ks -> ks.to("C"))
        .onTopOf(builder.stream("source"));
        //onTopOf method returns the provided stream so we can continue with method chaining

Configuration

To configure the Kafka Streams environment, the StreamsBuilderFactoryBean requires a KafkaStreamsConfiguration instance. See the Apache Kafka documentation for all possible options.

Starting with version 2.2, the stream configuration is now provided as a KafkaStreamsConfiguration object, rather than as a StreamsConfig.

To avoid boilerplate code for most cases, especially when you develop microservices, Spring for Apache Kafka provides the @EnableKafkaStreams annotation, which you should place on a @Configuration class. All you need is to declare a KafkaStreamsConfiguration bean named defaultKafkaStreamsConfig. A StreamsBuilderFactoryBean bean, named defaultKafkaStreamsBuilder, is automatically declared in the application context. You can declare and use any additional StreamsBuilderFactoryBean beans as well. You can perform additional customization of that bean, by providing a bean that implements StreamsBuilderFactoryBeanConfigurer. If there are multiple such beans, they will be applied according to their Ordered.order property.

By default, when the factory bean is stopped, the KafkaStreams.cleanUp() method is called. Starting with version 2.1.2, the factory bean has additional constructors, taking a CleanupConfig object that has properties to let you control whether the cleanUp() method is called during start() or stop() or neither. Starting with version 2.7, the default is to never clean up local state.

Header Enricher

Version 3.0 added the HeaderEnricherProcessor extension of ContextualProcessor; providing the same functionality as the deprecated HeaderEnricher which implemented the deprecated Transformer interface. This can be used to add headers within the stream processing; the header values are SpEL expressions; the root object of the expression evaluation has 3 properties:

  • record - the org.apache.kafka.streams.processor.api.Record (key, value, timestamp, headers)

  • key - the key of the current record

  • value - the value of the current record

  • context - the ProcessorContext, allowing access to the current record metadata

The expressions must return a byte[] or a String (which will be converted to byte[] using UTF-8).

To use the enricher within a stream:

.process(() -> new HeaderEnricherProcessor(expressions))

The processor does not change the key or value; it simply adds headers.

You need a new instance for each record.
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))

Here is a simple example, adding one literal header and one variable:

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .process(() -> supplier)
        .to(OUTPUT);

MessagingProcessor

Version 3.0 added the MessagingProcessor extension of ContextualProcessor, providing the same functionality as the deprecated MessagingTransformer which implemented the deprecated Transformer interface. This allows a Kafka Streams topology to interact with a Spring Messaging component, such as a Spring Integration flow. The transformer requires an implementation of MessagingFunction.

@FunctionalInterface
public interface MessagingFunction {

    Message<?> exchange(Message<?> message);

}

Spring Integration automatically provides an implementation using its GatewayProxyFactoryBean. It also requires a MessagingMessageConverter to convert the key, value and metadata (including headers) to/from a Spring Messaging Message<?>. See [Calling a Spring Integration Flow from a KStream] for more information.

Recovery from Deserialization Exceptions

Version 2.3 introduced the RecoveringDeserializationExceptionHandler which can take some action when a deserialization exception occurs. Refer to the Kafka documentation about DeserializationExceptionHandler, of which the RecoveringDeserializationExceptionHandler is an implementation. The RecoveringDeserializationExceptionHandler is configured with a ConsumerRecordRecoverer implementation. The framework provides the DeadLetterPublishingRecoverer which sends the failed record to a dead-letter topic. See Publishing Dead-letter Records for more information about this recoverer.

To configure the recoverer, add the following properties to your streams configuration:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

Of course, the recoverer() bean can be your own implementation of ConsumerRecordRecoverer.

Kafka Streams Example

The following example combines all the topics we have covered in this chapter:

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
                .reduce((String value1, String value2) -> value1 + value2,
                		Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}