Apache Kafka Streams Support
Starting with version 1.1.4, Spring for Apache Kafka provides first-class support for Kafka Streams.
To use it from a Spring application, the kafka-streams
jar must be present on classpath.
It is an optional dependency of the Spring for Apache Kafka project and is not downloaded transitively.
Basics
The reference Apache Kafka Streams documentation suggests the following way of using the API:
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
So, we have two main components:
-
StreamsBuilder
: With an API to buildKStream
(orKTable
) instances. -
KafkaStreams
: To manage the lifecycle of those instances.
All KStream instances exposed to a KafkaStreams instance by a single StreamsBuilder are started and stopped at the same time, even if they have different logic.
In other words, all streams defined by a StreamsBuilder are tied with a single lifecycle control.
Once a KafkaStreams instance has been closed by streams.close() , it cannot be restarted.
Instead, a new KafkaStreams instance to restart stream processing must be created.
|
Spring Management
To simplify using Kafka Streams from the Spring application context perspective and use the lifecycle management through a container, Spring for Apache Kafka introduces StreamsBuilderFactoryBean
.
This is an AbstractFactoryBean
implementation to expose a StreamsBuilder
singleton instance as a bean.
The following example creates such a bean:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
Starting with version 2.2, the stream configuration is now provided as a KafkaStreamsConfiguration object rather than a StreamsConfig .
|
The StreamsBuilderFactoryBean
also implements SmartLifecycle
to manage the lifecycle of an internal KafkaStreams
instance.
Similar to the Kafka Streams API, you must define the KStream
instances before you start the KafkaStreams
.
That also applies for the Spring API for Kafka Streams.
Therefore, when you use default autoStartup = true
on the StreamsBuilderFactoryBean
, you must declare KStream
instances on the StreamsBuilder
before the application context is refreshed.
For example, KStream
can be a regular bean definition, while the Kafka Streams API is used without any impacts.
The following example shows how to do so:
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
If you would like to control the lifecycle manually (for example, stopping and starting by some condition), you can reference the StreamsBuilderFactoryBean
bean directly by using the factory bean (&
) prefix.
Since StreamsBuilderFactoryBean
uses its internal KafkaStreams
instance, it is safe to stop and restart it again.
A new KafkaStreams
is created on each start()
.
You might also consider using different StreamsBuilderFactoryBean
instances, if you would like to control the lifecycles for KStream
instances separately.
You also can specify KafkaStreams.StateListener
, Thread.UncaughtExceptionHandler
, and StateRestoreListener
options on the StreamsBuilderFactoryBean
, which are delegated to the internal KafkaStreams
instance.
Also, apart from setting those options indirectly on StreamsBuilderFactoryBean
, starting with version 2.1.5, you can use a KafkaStreamsCustomizer
callback interface to configure an inner KafkaStreams
instance.
Note that KafkaStreamsCustomizer
overrides the options provided by StreamsBuilderFactoryBean
.
If you need to perform some KafkaStreams
operations directly, you can access that internal KafkaStreams
instance by using StreamsBuilderFactoryBean.getKafkaStreams()
.
You can autowire StreamsBuilderFactoryBean
bean by type, but you should be sure to use the full type in the bean definition, as the following example shows:
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
Alternatively, you can add @Qualifier
for injection by name if you use interface bean definition.
The following example shows how to do so:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
Starting with version 2.4.1, the factory bean has a new property infrastructureCustomizer
with type KafkaStreamsInfrastructureCustomizer
; this allows customization of the StreamsBuilder
(e.g. to add a state store) and/or the Topology
before the stream is created.
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
Default no-op implementations are provided to avoid having to implement both methods if one is not required.
A CompositeKafkaStreamsInfrastructureCustomizer
is provided, for when you need to apply multiple customizers.
KafkaStreams Micrometer Support
Introduced in version 2.5.3, you can configure a KafkaStreamsMicrometerListener
to automatically register micrometer meters for the KafkaStreams
object managed by the factory bean:
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
Streams JSON Serialization and Deserialization
For serializing and deserializing data when reading or writing to topics or state stores in JSON format, Spring for Apache Kafka provides a JsonSerde
implementation that uses JSON, delegating to the JsonSerializer
and JsonDeserializer
described in Serialization, Deserialization, and Message Conversion.
The JsonSerde
implementation provides the same configuration options through its constructor (target type or ObjectMapper
).
In the following example, we use the JsonSerde
to serialize and deserialize the Cat
payload of a Kafka stream (the JsonSerde
can be used in a similar fashion wherever an instance is required):
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
When constructing the serializer/deserializer programmatically for use in the producer/consumer factory, since version 2.3, you can use the fluent API, which simplifies configuration.
stream.through(
new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
Using KafkaStreamBrancher
The KafkaStreamBrancher
class introduces a more convenient way to build conditional branches on top of KStream
.
Consider the following example that does not use KafkaStreamBrancher
:
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
The following example uses KafkaStreamBrancher
:
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
Configuration
To configure the Kafka Streams environment, the StreamsBuilderFactoryBean
requires a KafkaStreamsConfiguration
instance.
See the Apache Kafka documentation for all possible options.
Starting with version 2.2, the stream configuration is now provided as a KafkaStreamsConfiguration object, rather than as a StreamsConfig .
|
To avoid boilerplate code for most cases, especially when you develop microservices, Spring for Apache Kafka provides the @EnableKafkaStreams
annotation, which you should place on a @Configuration
class.
All you need is to declare a KafkaStreamsConfiguration
bean named defaultKafkaStreamsConfig
.
A StreamsBuilderFactoryBean
bean, named defaultKafkaStreamsBuilder
, is automatically declared in the application context.
You can declare and use any additional StreamsBuilderFactoryBean
beans as well.
You can perform additional customization of that bean, by providing a bean that implements StreamsBuilderFactoryBeanConfigurer
.
If there are multiple such beans, they will be applied according to their Ordered.order
property.
Cleanup & Stop configuration
When the factory is stopped, the KafkaStreams.close()
is called with 2 parameters :
-
closeTimeout : how long to to wait for the threads to shutdown (defaults to
DEFAULT_CLOSE_TIMEOUT
set to 10 seconds). Can be configured usingStreamsBuilderFactoryBean.setCloseTimeout()
. -
leaveGroupOnClose : to trigger consumer leave call from the group (defaults to
false
). Can be configured usingStreamsBuilderFactoryBean.setLeaveGroupOnClose()
.
By default, when the factory bean is stopped, the KafkaStreams.cleanUp()
method is called.
Starting with version 2.1.2, the factory bean has additional constructors, taking a CleanupConfig
object that has properties to let you control whether the cleanUp()
method is called during start()
or stop()
or neither.
Starting with version 2.7, the default is to never clean up local state.
Header Enricher
Version 3.0 added the HeaderEnricherProcessor
extension of ContextualProcessor
; providing the same functionality as the deprecated HeaderEnricher
which implemented the deprecated Transformer
interface.
This can be used to add headers within the stream processing; the header values are SpEL expressions; the root object of the expression evaluation has 3 properties:
-
record
- theorg.apache.kafka.streams.processor.api.Record
(key
,value
,timestamp
,headers
) -
key
- the key of the current record -
value
- the value of the current record -
context
- theProcessorContext
, allowing access to the current record metadata
The expressions must return a byte[]
or a String
(which will be converted to byte[]
using UTF-8
).
To use the enricher within a stream:
.process(() -> new HeaderEnricherProcessor(expressions))
The processor does not change the key
or value
; it simply adds headers.
You need a new instance for each record. |
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
Here is a simple example, adding one literal header and one variable:
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.process(() -> supplier)
.to(OUTPUT);
MessagingProcessor
Version 3.0 added the MessagingProcessor
extension of ContextualProcessor
, providing the same functionality as the deprecated MessagingTransformer
which implemented the deprecated Transformer
interface.
This allows a Kafka Streams topology to interact with a Spring Messaging component, such as a Spring Integration flow.
The transformer requires an implementation of MessagingFunction
.
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring Integration automatically provides an implementation using its GatewayProxyFactoryBean
.
It also requires a MessagingMessageConverter
to convert the key, value and metadata (including headers) to/from a Spring Messaging Message<?>
.
See [Calling a Spring Integration Flow from a KStream
] for more information.
Recovery from Deserialization Exceptions
Version 2.3 introduced the RecoveringDeserializationExceptionHandler
which can take some action when a deserialization exception occurs.
Refer to the Kafka documentation about DeserializationExceptionHandler
, of which the RecoveringDeserializationExceptionHandler
is an implementation.
The RecoveringDeserializationExceptionHandler
is configured with a ConsumerRecordRecoverer
implementation.
The framework provides the DeadLetterPublishingRecoverer
which sends the failed record to a dead-letter topic.
See Publishing Dead-letter Records for more information about this recoverer.
To configure the recoverer, add the following properties to your streams configuration:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
Of course, the recoverer()
bean can be your own implementation of ConsumerRecordRecoverer
.
Interactive Query Support
Starting with version 3.2, Spring for Apache Kafka provides basic facilities required for interactive queries in Kafka Streams.
Interactive queries are useful in stateful Kafka Streams applications since they provide a way to constantly query the stateful stores in the application.
Thus, if an application wants to materialize the current view of the system under consideration, interactive queries provide a way to do that.
To learn more about interacive queries, see this article.
The support in Spring for Apache Kafka is centered around an API called KafkaStreamsInteractiveQueryService
which is a facade around interactive queries APIs in Kafka Streams library.
An application can create an instance of this service as a bean and then later on use it to retrieve the state store by its name.
The following code snippet shows an example.
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
return kafkaStreamsInteractiveQueryService;
}
Assuming that a Kafka Streams application has a state store called app-store
, then that store can be retrieved via the KafkStreamsInteractiveQuery
API as show below.
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
ReadOnlyKeyValueStore<Object, Object> appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());
Once an application gains access to the state store, then it can query from it for key-value information.
In this case, the state store that the application uses is a read-only key value store.
There are other types of state stores that a Kafka Streams application can use.
For instance, if an application prefers to query a window based store, it can build that store in the Kafka Streams application business logic and then later on retrieve it.
Because of this reason, the API to retrieve the queryable store in KafkaStreamsInteractiveQueryService
has a generic store type signature, so that the end-user can assign the proper type.
Here is the type signature from the API.
public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)
When calling this method, the user can specifially ask for the proper state store type, as we have done in the above example.
Retrying State Store Retrieval
When trying to retrieve the state store using the KafkaStreamsInteractiveQueryService
, there is a chance that the state store might not be found for various reasons.
If those reasons are transitory, KafkaStreamsInteractiveQueryService
provides an option to retry the retrieval of the state store by allowing to inject a custom RetryTemplate
.
By default, the RetryTemmplate
that is used in KafkaStreamsInteractiveQueryService
uses a maximum attempts of three with a fixed backoff of one second.
Here is how you can inject a custom RetryTemmplate
into KafkaStreamsInteractiveQueryService
with the maximum attempts of ten.
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
retryTemplate.setRetryPolicy(retryPolicy);
kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
return kafkaStreamsInteractiveQueryService;
}
Querying Remote State Stores
The API shown above for retrieving the state store - retrieveQueryableStore
is intended for locally available key-value state stores.
In productions settings, Kafka Streams applications are most likely distributed based on the number of partitions.
If a topic has four partitions and there are four instances of the same Kafka Streams processor running, then each instance maybe responsible for processing a single partition from the topic.
In this scenario, calling retrieveQueryableStore
may not give the correct result that an instance is looking for, although it might return a valid store.
Let’s assume that the topic with four partitions has data about various keys and a single partition is always responsible for a specific key.
If the instance that is calling retrieveQueryableStore
is looking for information about a key that this instance does not host, then it will not receive any data.
This is because the current Kafka Streams instance does not know anything about this key.
To fix this, the calling instance first needs to make sure that they have the host information for the Kafka Streams processor instance where the particular key is hosted.
This can be retrieved from any Kafka Streams instance under the same application.id
as below.
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());
In the example code above, the calling instance is querying for a particular key 12345
from the state-store named app-store
.
The API also needs a corresponding key serializer, which in this case is the IntegerSerializer
.
Kafka Streams looks through all it’s instances under the same application.id
and tries to find which instance hosts this particular key,
Once found, it returns that host information as a HostInfo
object.
This is how the API looks like:
public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)
When using multiple instances of the Kafka Streams processors of the same application.id
in a distributed way like this, the application is supposed to provide an RPC layer where the state stores can be queried over an RPC endpoint such as a REST one.
See this article for more details on this.
When using Spring for Apache Kafka, it is very easy to add a Spring based REST endpoint by using the spring-web technologies.
Once there is a REST endpoint, then that can be used to query the state stores from any Kafka Streams instance, given the HostInfo
where the key is hosted is known to the instance.
If the key hosting the instance is the current instance, then the application does not need to call the RPC mechanism, but rather make an in-JVM call.
However, the trouble is that an application may not know that the instance that is making the call is where the key is hosted because a particular server may lose a partition due to a consumer rebalance.
To fix this issue, KafkaStreamsInteractiveQueryService
provides a convenient API for querying the current host information via an API method getCurrentKafkaStreamsApplicationHostInfo()
that returns the current HostInfo
.
The idea is that the application can first acquire information about where the key is held, and then compare the HostInfo
with the one about the current instance.
If the HostInfo
data matches, then it can proceed with a simple JVM call via the retrieveQueryableStore
, otherwise go with the RPC option.
Kafka Streams Example
The following example combines the various topics we have covered in this chapter:
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}