Apache Kafka Streams Support
Starting with version 1.1.4, Spring for Apache Kafka provides first-class support for Kafka Streams.
To use it from a Spring application, the kafka-streams
jar must be present on classpath.
It is an optional dependency of the Spring for Apache Kafka project and is not downloaded transitively.
Basics
The reference Apache Kafka Streams documentation suggests the following way of using the API:
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
So, we have two main components:
-
StreamsBuilder
: With an API to buildKStream
(orKTable
) instances. -
KafkaStreams
: To manage the lifecycle of those instances.
All KStream instances exposed to a KafkaStreams instance by a single StreamsBuilder are started and stopped at the same time, even if they have different logic.
In other words, all streams defined by a StreamsBuilder are tied with a single lifecycle control.
Once a KafkaStreams instance has been closed by streams.close() , it cannot be restarted.
Instead, a new KafkaStreams instance to restart stream processing must be created.
|
Spring Management
To simplify using Kafka Streams from the Spring application context perspective and use the lifecycle management through a container, Spring for Apache Kafka introduces StreamsBuilderFactoryBean
.
This is an AbstractFactoryBean
implementation to expose a StreamsBuilder
singleton instance as a bean.
The following example creates such a bean:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
Starting with version 2.2, the stream configuration is now provided as a KafkaStreamsConfiguration object rather than a StreamsConfig .
|
The StreamsBuilderFactoryBean
also implements SmartLifecycle
to manage the lifecycle of an internal KafkaStreams
instance.
Similar to the Kafka Streams API, you must define the KStream
instances before you start the KafkaStreams
.
That also applies for the Spring API for Kafka Streams.
Therefore, when you use default autoStartup = true
on the StreamsBuilderFactoryBean
, you must declare KStream
instances on the StreamsBuilder
before the application context is refreshed.
For example, KStream
can be a regular bean definition, while the Kafka Streams API is used without any impacts.
The following example shows how to do so:
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
If you would like to control the lifecycle manually (for example, stopping and starting by some condition), you can reference the StreamsBuilderFactoryBean
bean directly by using the factory bean (&
) prefix.
Since StreamsBuilderFactoryBean
uses its internal KafkaStreams
instance, it is safe to stop and restart it again.
A new KafkaStreams
is created on each start()
.
You might also consider using different StreamsBuilderFactoryBean
instances, if you would like to control the lifecycles for KStream
instances separately.
You also can specify KafkaStreams.StateListener
, Thread.UncaughtExceptionHandler
, and StateRestoreListener
options on the StreamsBuilderFactoryBean
, which are delegated to the internal KafkaStreams
instance.
Also, apart from setting those options indirectly on StreamsBuilderFactoryBean
, starting with version 2.1.5, you can use a KafkaStreamsCustomizer
callback interface to configure an inner KafkaStreams
instance.
Note that KafkaStreamsCustomizer
overrides the options provided by StreamsBuilderFactoryBean
.
If you need to perform some KafkaStreams
operations directly, you can access that internal KafkaStreams
instance by using StreamsBuilderFactoryBean.getKafkaStreams()
.
You can autowire StreamsBuilderFactoryBean
bean by type, but you should be sure to use the full type in the bean definition, as the following example shows:
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
Alternatively, you can add @Qualifier
for injection by name if you use interface bean definition.
The following example shows how to do so:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
Starting with version 2.4.1, the factory bean has a new property infrastructureCustomizer
with type KafkaStreamsInfrastructureCustomizer
; this allows customization of the StreamsBuilder
(e.g. to add a state store) and/or the Topology
before the stream is created.
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
Default no-op implementations are provided to avoid having to implement both methods if one is not required.
A CompositeKafkaStreamsInfrastructureCustomizer
is provided, for when you need to apply multiple customizers.
KafkaStreams Micrometer Support
Introduced in version 2.5.3, you can configure a KafkaStreamsMicrometerListener
to automatically register micrometer meters for the KafkaStreams
object managed by the factory bean:
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
Streams JSON Serialization and Deserialization
For serializing and deserializing data when reading or writing to topics or state stores in JSON format, Spring for Apache Kafka provides a JsonSerde
implementation that uses JSON, delegating to the JsonSerializer
and JsonDeserializer
described in Serialization, Deserialization, and Message Conversion.
The JsonSerde
implementation provides the same configuration options through its constructor (target type or ObjectMapper
).
In the following example, we use the JsonSerde
to serialize and deserialize the Cat
payload of a Kafka stream (the JsonSerde
can be used in a similar fashion wherever an instance is required):
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
When constructing the serializer/deserializer programmatically for use in the producer/consumer factory, since version 2.3, you can use the fluent API, which simplifies configuration.
stream.through(
new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
Using KafkaStreamBrancher
The KafkaStreamBrancher
class introduces a more convenient way to build conditional branches on top of KStream
.
Consider the following example that does not use KafkaStreamBrancher
:
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
The following example uses KafkaStreamBrancher
:
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
Configuration
To configure the Kafka Streams environment, the StreamsBuilderFactoryBean
requires a KafkaStreamsConfiguration
instance.
See the Apache Kafka documentation for all possible options.
Starting with version 2.2, the stream configuration is now provided as a KafkaStreamsConfiguration object, rather than as a StreamsConfig .
|
To avoid boilerplate code for most cases, especially when you develop microservices, Spring for Apache Kafka provides the @EnableKafkaStreams
annotation, which you should place on a @Configuration
class.
All you need is to declare a KafkaStreamsConfiguration
bean named defaultKafkaStreamsConfig
.
A StreamsBuilderFactoryBean
bean, named defaultKafkaStreamsBuilder
, is automatically declared in the application context.
You can declare and use any additional StreamsBuilderFactoryBean
beans as well.
You can perform additional customization of that bean, by providing a bean that implements StreamsBuilderFactoryBeanConfigurer
.
If there are multiple such beans, they will be applied according to their Ordered.order
property.
By default, when the factory bean is stopped, the KafkaStreams.cleanUp()
method is called.
Starting with version 2.1.2, the factory bean has additional constructors, taking a CleanupConfig
object that has properties to let you control whether the cleanUp()
method is called during start()
or stop()
or neither.
Starting with version 2.7, the default is to never clean up local state.
Header Enricher
Version 3.0 added the HeaderEnricherProcessor
extension of ContextualProcessor
; providing the same functionality as the deprecated HeaderEnricher
which implemented the deprecated Transformer
interface.
This can be used to add headers within the stream processing; the header values are SpEL expressions; the root object of the expression evaluation has 3 properties:
-
record
- theorg.apache.kafka.streams.processor.api.Record
(key
,value
,timestamp
,headers
) -
key
- the key of the current record -
value
- the value of the current record -
context
- theProcessorContext
, allowing access to the current record metadata
The expressions must return a byte[]
or a String
(which will be converted to byte[]
using UTF-8
).
To use the enricher within a stream:
.process(() -> new HeaderEnricherProcessor(expressions))
The processor does not change the key
or value
; it simply adds headers.
You need a new instance for each record. |
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
Here is a simple example, adding one literal header and one variable:
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.process(() -> supplier)
.to(OUTPUT);
MessagingProcessor
Version 3.0 added the MessagingProcessor
extension of ContextualProcessor
, providing the same functionality as the deprecated MessagingTransformer
which implemented the deprecated Transformer
interface.
This allows a Kafka Streams topology to interact with a Spring Messaging component, such as a Spring Integration flow.
The transformer requires an implementation of MessagingFunction
.
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring Integration automatically provides an implementation using its GatewayProxyFactoryBean
.
It also requires a MessagingMessageConverter
to convert the key, value and metadata (including headers) to/from a Spring Messaging Message<?>
.
See [Calling a Spring Integration Flow from a KStream
] for more information.
Recovery from Deserialization Exceptions
Version 2.3 introduced the RecoveringDeserializationExceptionHandler
which can take some action when a deserialization exception occurs.
Refer to the Kafka documentation about DeserializationExceptionHandler
, of which the RecoveringDeserializationExceptionHandler
is an implementation.
The RecoveringDeserializationExceptionHandler
is configured with a ConsumerRecordRecoverer
implementation.
The framework provides the DeadLetterPublishingRecoverer
which sends the failed record to a dead-letter topic.
See Publishing Dead-letter Records for more information about this recoverer.
To configure the recoverer, add the following properties to your streams configuration:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
Of course, the recoverer()
bean can be your own implementation of ConsumerRecordRecoverer
.
Kafka Streams Example
The following example combines all the topics we have covered in this chapter:
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}