4.0.5-SNAPSHOT
Reference Guide
This guide describes the Apache Kafka implementation of the Spring Cloud Stream Binder. It contains information about its design, usage, and configuration options, as well as information on how the Stream Cloud Stream concepts map onto Apache Kafka specific constructs. In addition, this guide explains the Kafka Streams binding capabilities of Spring Cloud Stream.
1. Apache Kafka Binder
1.1. Usage
To use Apache Kafka binder, you need to add spring-cloud-stream-binder-kafka
as a dependency to your Spring Cloud Stream application, as shown in the following example for Maven:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
Alternatively, you can also use the Spring Cloud Stream Kafka Starter, as shown in the following example for Maven:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
1.2. Overview
The following image shows a simplified diagram of how the Apache Kafka binder operates:
The Apache Kafka Binder implementation maps each destination to an Apache Kafka topic. The consumer group maps directly to the same Apache Kafka concept. Partitioning also maps directly to Apache Kafka partitions as well.
The binder currently uses the Apache Kafka kafka-clients
version 3.1.0
.
This client can communicate with older brokers (see the Kafka documentation), but certain features may not be available.
For example, with versions earlier than 0.11.x.x, native headers are not supported.
Also, 0.11.x.x does not support the autoAddPartitions
property.
1.3. Configuration Options
This section contains the configuration options used by the Apache Kafka binder.
For common configuration options and properties pertaining to the binder, see the binding properties in core documentation.
1.3.1. Kafka Binder Properties
- spring.cloud.stream.kafka.binder.brokers
-
A list of brokers to which the Kafka binder connects.
Default:
localhost
. - spring.cloud.stream.kafka.binder.defaultBrokerPort
-
brokers
allows hosts specified with or without port information (for example,host1,host2:port2
). This sets the default port when no port is configured in the broker list.Default:
9092
. - spring.cloud.stream.kafka.binder.configuration
-
Key/Value map of client properties (both producers and consumer) passed to all clients created by the binder. Due to the fact that these properties are used by both producers and consumers, usage should be restricted to common properties — for example, security settings. Unknown Kafka producer or consumer properties provided through this configuration are filtered out and not allowed to propagate. Properties here supersede any properties set in boot.
Default: Empty map.
- spring.cloud.stream.kafka.binder.consumerProperties
-
Key/Value map of arbitrary Kafka client consumer properties. In addition to support known Kafka consumer properties, unknown consumer properties are allowed here as well. Properties here supersede any properties set in boot and in the
configuration
property above.Default: Empty map.
- spring.cloud.stream.kafka.binder.headers
-
The list of custom headers that are transported by the binder. Only required when communicating with older applications (⇐ 1.3.x) with a
kafka-clients
version < 0.11.0.0. Newer versions support headers natively.Default: empty.
- spring.cloud.stream.kafka.binder.healthTimeout
-
The time to wait to get partition information, in seconds. Health reports as down if this timer expires.
Default: 10.
- spring.cloud.stream.kafka.binder.requiredAcks
-
The number of required acks on the broker. See the Kafka documentation for the producer
acks
property.Default:
1
. - spring.cloud.stream.kafka.binder.minPartitionCount
-
Effective only if
autoCreateTopics
orautoAddPartitions
is set. The global minimum number of partitions that the binder configures on topics on which it produces or consumes data. It can be superseded by thepartitionCount
setting of the producer or by the value ofinstanceCount * concurrency
settings of the producer (if either is larger).Default:
1
. - spring.cloud.stream.kafka.binder.producerProperties
-
Key/Value map of arbitrary Kafka client producer properties. In addition to support known Kafka producer properties, unknown producer properties are allowed here as well. Properties here supersede any properties set in boot and in the
configuration
property above.Default: Empty map.
- spring.cloud.stream.kafka.binder.replicationFactor
-
The replication factor of auto-created topics if
autoCreateTopics
is active. Can be overridden on each binding.If you are using Kafka broker versions prior to 2.4, then this value should be set to at least 1
. Starting with version 3.0.8, the binder uses-1
as the default value, which indicates that the broker 'default.replication.factor' property will be used to determine the number of replicas. Check with your Kafka broker admins to see if there is a policy in place that requires a minimum replication factor, if that’s the case then, typically, thedefault.replication.factor
will match that value and-1
should be used, unless you need a replication factor greater than the minimum.Default:
-1
. - spring.cloud.stream.kafka.binder.autoCreateTopics
-
If set to
true
, the binder creates new topics automatically. If set tofalse
, the binder relies on the topics being already configured. In the latter case, if the topics do not exist, the binder fails to start.This setting is independent of the auto.create.topics.enable
setting of the broker and does not influence it. If the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.Default:
true
. - spring.cloud.stream.kafka.binder.autoAddPartitions
-
If set to
true
, the binder creates new partitions if required. If set tofalse
, the binder relies on the partition size of the topic being already configured. If the partition count of the target topic is smaller than the expected value, the binder fails to start.Default:
false
. - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
Enables transactions in the binder. See
transaction.id
in the Kafka documentation and Transactions in thespring-kafka
documentation. When transactions are enabled, individualproducer
properties are ignored and all producers use thespring.cloud.stream.kafka.binder.transaction.producer.*
properties.Default
null
(no transactions) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
Global producer properties for producers in a transactional binder. See
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
and Kafka Producer Properties and the general producer properties supported by all binders.Default: See individual producer properties.
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
The bean name of a
KafkaHeaderMapper
used for mappingspring-messaging
headers to and from Kafka headers. Use this, for example, if you wish to customize the trusted packages in aBinderHeaderMapper
bean that uses JSON deserialization for the headers. If this customBinderHeaderMapper
bean is not made available to the binder using this property, then the binder will look for a header mapper bean with the namekafkaBinderHeaderMapper
that is of typeBinderHeaderMapper
before falling back to a defaultBinderHeaderMapper
created by the binder.Default: none.
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
-
Flag to set the binder health as
down
, when any partitions on the topic, regardless of the consumer that is receiving data from it, is found without a leader.Default:
false
. - spring.cloud.stream.kafka.binder.certificateStoreDirectory
-
When the truststore or keystore certificate location is given as a non-local file system resource (resources supported by org.springframework.core.io.Resource e.g. CLASSPATH, HTTP, etc.), the binder copies the resource from the path (which is convertible to org.springframework.core.io.Resource) to a location on the filesystem. This is true for both broker level certificates (
ssl.truststore.location
andssl.keystore.location
) and certificates intended for schema registry (schema.registry.ssl.truststore.location
andschema.registry.ssl.keystore.location
). Keep in mind that the truststore and keystore location paths must be provided underspring.cloud.stream.kafka.binder.configuration…
. For example,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location
,spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location
, etc. The file will be copied to the location specified as the value for this property which must be an existing directory on the filesystem that is writable by the process running the application. If this value is not set and the certificate file is a non-local file system resource, then it will be copied to System’s temp directory as returned bySystem.getProperty("java.io.tmpdir")
. This is also true, if this value is present, but the directory cannot be found on the filesystem or is not writable.Default: none.
- spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled
-
When set to true, the offset lag metric of each consumer topic is computed whenever the metric is accessed. When set to false only the periodically calculated offset lag is used.
Default: true
- spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval
-
The interval in which the offset lag for each consumer topic is computed. This value is used whenever
metrics.defaultOffsetLagMetricsEnabled
is disabled or its computation is taking too long.Default: 60 seconds
- spring.cloud.stream.kafka.binder.enableObservation
-
Enable Micrometer observation registry on all the bindings in this binder.
Default: false
1.3.2. Kafka Consumer Properties
To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of spring.cloud.stream.kafka.default.consumer.<property>=<value> .
|
The following properties are available for Kafka consumers only and
must be prefixed with spring.cloud.stream.kafka.bindings.<channelName>.consumer.
.
- admin.configuration
-
Since version 2.1.1, this property is deprecated in favor of
topic.properties
, and support for it will be removed in a future version. - admin.replicas-assignment
-
Since version 2.1.1, this property is deprecated in favor of
topic.replicas-assignment
, and support for it will be removed in a future version. - admin.replication-factor
-
Since version 2.1.1, this property is deprecated in favor of
topic.replication-factor
, and support for it will be removed in a future version. - autoRebalanceEnabled
-
When
true
, topic partitions is automatically rebalanced between the members of a consumer group. Whenfalse
, each consumer is assigned a fixed set of partitions based onspring.cloud.stream.instanceCount
andspring.cloud.stream.instanceIndex
. This requires both thespring.cloud.stream.instanceCount
andspring.cloud.stream.instanceIndex
properties to be set appropriately on each launched instance. The value of thespring.cloud.stream.instanceCount
property must typically be greater than 1 in this case.Default:
true
. - ackEachRecord
-
When
autoCommitOffset
istrue
, this setting dictates whether to commit the offset after each record is processed. By default, offsets are committed after all records in the batch of records returned byconsumer.poll()
have been processed. The number of records returned by a poll can be controlled with themax.poll.records
Kafka property, which is set through the consumerconfiguration
property. Setting this totrue
may cause a degradation in performance, but doing so reduces the likelihood of redelivered records when a failure occurs. Also, see the binderrequiredAcks
property, which also affects the performance of committing offsets. This property is deprecated as of 3.1 in favor of usingackMode
. If theackMode
is not set and batch mode is not enabled,RECORD
ackMode will be used.Default:
false
. - autoCommitOffset
-
Starting with version 3.1, this property is deprecated. See
ackMode
for more details on alternatives. Whether to autocommit offsets when a message has been processed. If set tofalse
, a header with the keykafka_acknowledgment
of the typeorg.springframework.kafka.support.Acknowledgment
header is present in the inbound message. Applications may use this header for acknowledging messages. See the examples section for details. When this property is set tofalse
, Kafka binder sets the ack mode toorg.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL
and the application is responsible for acknowledging records. Also seeackEachRecord
.Default:
true
. - ackMode
-
Specify the container ack mode. This is based on the AckMode enumeration defined in Spring Kafka. If
ackEachRecord
property is set totrue
and consumer is not in batch mode, then this will use the ack mode ofRECORD
, otherwise, use the provided ack mode using this property. - autoCommitOnError
-
In pollable consumers, if set to
true
, it always auto commits on error. If not set (the default) or false, it will not auto commit in pollable consumers. Note that this property is only applicable for pollable consumers.Default: not set.
- resetOffsets
-
Whether to reset offsets on the consumer to the value provided by startOffset. Must be false if a
KafkaBindingRebalanceListener
is provided; see Using a KafkaBindingRebalanceListener. See Resetting Offsets for more information about this property.Default:
false
. - startOffset
-
The starting offset for new groups. Allowed values:
earliest
andlatest
. If the consumer group is set explicitly for the consumer 'binding' (throughspring.cloud.stream.bindings.<channelName>.group
), 'startOffset' is set toearliest
. Otherwise, it is set tolatest
for theanonymous
consumer group. See Resetting Offsets for more information about this property.Default: null (equivalent to
earliest
). - enableDlq
-
When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named
error.<destination>.<group>
. The DLQ topic name can be configurable by setting thedlqName
property or by defining a@Bean
of typeDlqDestinationResolver
. This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome. See Dead-Letter Topic Processing processing for more information. Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers:x-original-topic
,x-exception-message
, andx-exception-stacktrace
asbyte[]
. By default, a failed record is sent to the same partition number in the DLQ topic as the original record. See Dead-Letter Topic Partition Selection for how to change that behavior. Not allowed whendestinationIsPattern
istrue
.Default:
false
. - dlqPartitions
-
When
enableDlq
is true, and this property is not set, a dead letter topic with the same number of partitions as the primary topic(s) is created. Usually, dead-letter records are sent to the same partition in the dead-letter topic as the original record. This behavior can be changed; see Dead-Letter Topic Partition Selection. If this property is set to1
and there is noDqlPartitionFunction
bean, all dead-letter records will be written to partition0
. If this property is greater than1
, you MUST provide aDlqPartitionFunction
bean. Note that the actual partition count is affected by the binder’sminPartitionCount
property.Default:
none
- configuration
-
Map with a key/value pair containing generic Kafka consumer properties. In addition to having Kafka consumer properties, other configuration properties can be passed here. For example some properties needed by the application such as
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar
. Thebootstrap.servers
property cannot be set here; use multi-binder support if you need to connect to multiple clusters.Default: Empty map.
- dlqName
-
The name of the DLQ topic to receive the error messages.
Default: null (If not specified, messages that result in errors are forwarded to a topic named
error.<destination>.<group>
). - dlqProducerProperties
-
Using this, DLQ-specific producer properties can be set. All the properties available through kafka producer properties can be set through this property. When native decoding is enabled on the consumer (i.e., useNativeDecoding: true) , the application must provide corresponding key/value serializers for DLQ. This must be provided in the form of
dlqProducerProperties.configuration.key.serializer
anddlqProducerProperties.configuration.value.serializer
.Default: Default Kafka producer properties.
- standardHeaders
-
Indicates which standard headers are populated by the inbound channel adapter. Allowed values:
none
,id
,timestamp
, orboth
. Useful if using native deserialization and the first component to receive a message needs anid
(such as an aggregator that is configured to use a JDBC message store).Default:
none
- converterBeanName
-
The name of a bean that implements
RecordMessageConverter
. Used in the inbound channel adapter to replace the defaultMessagingMessageConverter
.Default:
null
- idleEventInterval
-
The interval, in milliseconds, between events indicating that no messages have recently been received. Use an
ApplicationListener<ListenerContainerIdleEvent>
to receive these events. See Example: Pausing and Resuming the Consumer for a usage example.Default:
30000
- destinationIsPattern
-
When true, the destination is treated as a regular expression
Pattern
used to match topic names by the broker. When true, topics are not provisioned, andenableDlq
is not allowed, because the binder does not know the topic names during the provisioning phase. Note, the time taken to detect new topics that match the pattern is controlled by the consumer propertymetadata.max.age.ms
, which (at the time of writing) defaults to 300,000ms (5 minutes). This can be configured using theconfiguration
property above.Default:
false
- topic.properties
-
A
Map
of Kafka topic properties used when provisioning new topics — for example,spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0
Default: none.
- topic.replicas-assignment
-
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and the value being the assignments. Used when provisioning new topics. See the
NewTopic
Javadocs in thekafka-clients
jar.Default: none.
- topic.replication-factor
-
The replication factor to use when provisioning topics. Overrides the binder-wide setting. Ignored if
replicas-assignments
is present.Default: none (the binder-wide default of -1 is used).
- pollTimeout
-
Timeout used for polling in pollable consumers.
Default: 5 seconds.
- transactionManager
-
Bean name of a
KafkaAwareTransactionManager
used to override the binder’s transaction manager for this binding. Usually needed if you want to synchronize another transaction with the Kafka transaction, using theChainedKafkaTransactionManaager
. To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.Default: none.
- txCommitRecovered
-
When using a transactional binder, the offset of a recovered record (e.g. when retries are exhausted and the record is sent to a dead letter topic) will be committed via a new transaction, by default. Setting this property to
false
suppresses committing the offset of recovered record.Default: true.
- commonErrorHandlerBeanName
-
CommonErrorHandler
bean name to use per consumer binding. When present, this user providedCommonErrorHandler
takes precedence over any other error handlers defined by the binder. This is a handy way to express error handlers, if the application does not want to use aListenerContainerCustomizer
and then check the destination/group combination to set an error handler.Default: none.
1.3.3. Resetting Offsets
When an application starts, the initial position in each assigned partition depends on two properties startOffset
and resetOffsets
.
If resetOffsets
is false
, normal Kafka consumer auto.offset.reset
semantics apply.
i.e. If there is no committed offset for a partition for the binding’s consumer group, the position is earliest
or latest
.
By default, bindings with an explicit group
use earliest
, and anonymous bindings (with no group
) use latest
.
These defaults can be overridden by setting the startOffset
binding property.
There will be no committed offset(s) the first time the binding is started with a particular group
.
The other condition where no committed offset exists is if the offset has been expired.
With modern brokers (since 2.1), and default broker properties, the offsets are expired 7 days after the last member leaves the group.
See the offsets.retention.minutes
broker property for more information.
When resetOffsets
is true
, the binder applies similar semantics to those that apply when there is no committed offset on the broker, as if this binding has never consumed from the topic; i.e. any current committed offset is ignored.
Following are two use cases when this might be used.
-
Consuming from a compacted topic containing key/value pairs. Set
resetOffsets
totrue
andstartOffset
toearliest
; the binding will perform aseekToBeginning
on all newly assigned partitions. -
Consuming from a topic containing events, where you are only interested in events that occur while this binding is running. Set
resetOffsets
totrue
andstartOffset
tolatest
; the binding will perform aseekToEnd
on all newly assigned partitions.
If a rebalance occurs after the initial assignment, the seeks will only be performed on any newly assigned partitions that were not assigned during the initial assignment. |
For more control over topic offsets, see Using a KafkaBindingRebalanceListener; when a listener is provided, resetOffsets
should not be set to true
, otherwise, that will cause an error.
1.3.4. Consuming Batches
Starting with version 3.0, when spring.cloud.stream.bindings.<name>.consumer.batch-mode
is set to true
, all of the records received by polling the Kafka Consumer
will be presented as a List<?>
to the listener method.
Otherwise, the method will be called with one record at a time.
The size of the batch is controlled by Kafka consumer properties max.poll.records
, fetch.min.bytes
, fetch.max.wait.ms
; refer to the Kafka documentation for more information.
Starting with version 4.0.2
, the binder supports DLQ capabilities when consuming in batch mode.
Keep in mind that, when using DLQ on a consumer binding that is in batch mode, all the records received from the previous poll will be delivered to the DLQ topic.
Retry within the binder is not supported when using batch mode, so maxAttempts will be overridden to 1.
You can configure a DefaultErrorHandler (using a ListenerContainerCustomizer ) to achieve similar functionality to retry in the binder.
You can also use a manual AckMode and call Ackowledgment.nack(index, sleep) to commit the offsets for a partial batch and have the remaining records redelivered.
Refer to the Spring for Apache Kafka documentation for more information about these techniques.
|
1.3.5. Kafka Producer Properties
To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of spring.cloud.stream.kafka.default.producer.<property>=<value> .
|
The following properties are available for Kafka producers only and
must be prefixed with spring.cloud.stream.kafka.bindings.<channelName>.producer.
.
- admin.configuration
-
Since version 2.1.1, this property is deprecated in favor of
topic.properties
, and support for it will be removed in a future version. - admin.replicas-assignment
-
Since version 2.1.1, this property is deprecated in favor of
topic.replicas-assignment
, and support for it will be removed in a future version. - admin.replication-factor
-
Since version 2.1.1, this property is deprecated in favor of
topic.replication-factor
, and support for it will be removed in a future version. - bufferSize
-
Upper limit, in bytes, of how much data the Kafka producer attempts to batch before sending.
Default:
16384
. - sync
-
Whether the producer is synchronous.
Default:
false
. - sendTimeoutExpression
-
A SpEL expression evaluated against the outgoing message used to evaluate the time to wait for ack when synchronous publish is enabled — for example,
headers['mySendTimeout']
. The value of the timeout is in milliseconds. With versions before 3.0, the payload could not be used unless native encoding was being used because, by the time this expression was evaluated, the payload was already in the form of abyte[]
. Now, the expression is evaluated before the payload is converted.Default:
none
. - batchTimeout
-
How long the producer waits to allow more messages to accumulate in the same batch before sending the messages. (Normally, the producer does not wait at all and simply sends all the messages that accumulated while the previous send was in progress.) A non-zero value may increase throughput at the expense of latency.
Default:
0
. - messageKeyExpression
-
A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message — for example,
headers['myKey']
. With versions before 3.0, the payload could not be used unless native encoding was being used because, by the time this expression was evaluated, the payload was already in the form of abyte[]
. Now, the expression is evaluated before the payload is converted. In the case of a regular processor (Function<String, String>
orFunction<Message<?>, Message<?>
), if the produced key needs to be same as the incoming key from the topic, this property can be set as below.spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']
There is an important caveat to keep in mind for reactive functions. In that case, it is up to the application to manually copy the headers from the incoming messages to outbound messages. You can set the header, e.g.myKey
and useheaders['myKey']
as suggested above or, for convenience, simply set theKafkaHeaders.MESSAGE_KEY
header, and you do not need to set this property at all.Default:
none
. - headerPatterns
-
A comma-delimited list of simple patterns to match Spring messaging headers to be mapped to the Kafka
Headers
in theProducerRecord
. Patterns can begin or end with the wildcard character (asterisk). Patterns can be negated by prefixing with!
. Matching stops after the first match (positive or negative). For example!ask,as*
will passash
but notask
.id
andtimestamp
are never mapped.Default:
*
(all headers - except theid
andtimestamp
) - configuration
-
Map with a key/value pair containing generic Kafka producer properties. The
bootstrap.servers
property cannot be set here; use multi-binder support if you need to connect to multiple clusters.Default: Empty map.
- topic.properties
-
A
Map
of Kafka topic properties used when provisioning new topics — for example,spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0
- topic.replicas-assignment
-
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and the value being the assignments. Used when provisioning new topics. See the
NewTopic
Javadocs in thekafka-clients
jar.Default: none.
- topic.replication-factor
-
The replication factor to use when provisioning topics. Overrides the binder-wide setting. Ignored if
replicas-assignments
is present.Default: none (the binder-wide default of -1 is used).
- useTopicHeader
-
Set to
true
to override the default binding destination (topic name) with the value of theKafkaHeaders.TOPIC
message header in the outbound message. If the header is not present, the default binding destination is used.Default:
false
. - recordMetadataChannel
-
The bean name of a
MessageChannel
to which successful send results should be sent; the bean must exist in the application context. The message sent to the channel is the sent message (after conversion, if any) with an additional headerKafkaHeaders.RECORD_METADATA
. The header contains aRecordMetadata
object provided by the Kafka client; it includes the partition and offset where the record was written in the topic.ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
Failed sends go the producer error channel (if configured); see Error Channels.
Default: null.
The Kafka binder uses the partitionCount setting of the producer as a hint to create a topic with the given partition count (in conjunction with the minPartitionCount , the maximum of the two being the value being used).
Exercise caution when configuring both minPartitionCount for a binder and partitionCount for an application, as the larger value is used.
If a topic already exists with a smaller partition count and autoAddPartitions is disabled (the default), the binder fails to start.
If a topic already exists with a smaller partition count and autoAddPartitions is enabled, new partitions are added.
If a topic already exists with a larger number of partitions than the maximum of (minPartitionCount or partitionCount ), the existing partition count is used.
|
- compression
-
Set the
compression.type
producer property. Supported values arenone
,gzip
,snappy
,lz4
andzstd
. If you override thekafka-clients
jar to 2.1.0 (or later), as discussed in the Spring for Apache Kafka documentation, and wish to usezstd
compression, usespring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd
.Default:
none
. - transactionManager
-
Bean name of a
KafkaAwareTransactionManager
used to override the binder’s transaction manager for this binding. Usually needed if you want to synchronize another transaction with the Kafka transaction, using theChainedKafkaTransactionManaager
. To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.Default: none.
- closeTimeout
-
Timeout in number of seconds to wait for when closing the producer.
Default:
30
- allowNonTransactional
-
Normally, all output bindings associated with a transactional binder will publish in a new transaction, if one is not already in process. This property allows you to override that behavior. If set to true, records published to this output binding will not be run in a transaction, unless one is already in process.
Default:
false
1.3.6. Usage examples
In this section, we show the use of the preceding properties for specific scenarios.
Example: Setting ackMode
to MANUAL
and Relying on Manual Acknowledgement
This example illustrates how one may manually acknowledge offsets in a consumer application.
This example requires that spring.cloud.stream.kafka.bindings.input.consumer.ackMode
be set to MANUAL
.
Use the corresponding input channel name for your example.
@SpringBootApplication
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@Bean
public Consumer<Message<?>> process() {
return message -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
};
}
Example: Security Configuration
Apache Kafka 0.9 supports secure connections between client and brokers.
To take advantage of this feature, follow the guidelines in the Apache Kafka Documentation as well as the Kafka 0.9 security guidelines from the Confluent documentation.
Use the spring.cloud.stream.kafka.binder.configuration
option to set security properties for all clients created by the binder.
For example, to set security.protocol
to SASL_SSL
, set the following property:
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
All the other security properties can be set in a similar manner.
When using Kerberos, follow the instructions in the reference documentation for creating and referencing the JAAS configuration.
Spring Cloud Stream supports passing JAAS configuration information to the application by using a JAAS configuration file and using Spring Boot properties.
Using JAAS Configuration Files
The JAAS and (optionally) krb5 file locations can be set for Spring Cloud Stream applications by using system properties. The following example shows how to launch a Spring Cloud Stream application with SASL and Kerberos by using a JAAS configuration file:
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
Using Spring Boot Properties
As an alternative to having a JAAS configuration file, Spring Cloud Stream provides a mechanism for setting up the JAAS configuration for Spring Cloud Stream applications by using Spring Boot properties.
The following properties can be used to configure the login context of the Kafka client:
- spring.cloud.stream.kafka.binder.jaas.loginModule
-
The login module name. Not necessary to be set in normal cases.
Default:
com.sun.security.auth.module.Krb5LoginModule
. - spring.cloud.stream.kafka.binder.jaas.controlFlag
-
The control flag of the login module.
Default:
required
. - spring.cloud.stream.kafka.binder.jaas.options
-
Map with a key/value pair containing the login module options.
Default: Empty map.
The following example shows how to launch a Spring Cloud Stream application with SASL and Kerberos by using Spring Boot configuration properties:
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM
The preceding example represents the equivalent of the following JAAS file:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_client.keytab"
principal="[email protected]";
};
If the topics required already exist on the broker or will be created by an administrator, autocreation can be turned off and only client JAAS properties need to be sent.
Do not mix JAAS configuration files and Spring Boot properties in the same application.
If the -Djava.security.auth.login.config system property is already present, Spring Cloud Stream ignores the Spring Boot properties.
|
Be careful when using the autoCreateTopics and autoAddPartitions with Kerberos.
Usually, applications may use principals that do not have administrative rights in Kafka and Zookeeper.
Consequently, relying on Spring Cloud Stream to create/modify topics may fail.
In secure environments, we strongly recommend creating topics and managing ACLs administratively by using Kafka tooling.
|
Multi-binder configuration and JAAS
When connecting to multiple clusters in which each one requires separate JAAS configuration, then set the JAAS configuration using the property sasl.jaas.config
.
When this property is present in the application, it takes precedence over the other strategies mentioned above.
See this KIP-85 for more details.
For example, if you have two clusters in your application with separate JAAS configuration, then the following is a template that you can use:
spring.cloud.stream:
binders:
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
kafka2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9093
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1-secret\";"
kafka.binder:
configuration:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
Note that both the Kafka clusters, and the sasl.jaas.config
values for each of them are different in the above configuration.
See this sample application for more details on how to setup and run such an application.
Example: Pausing and Resuming the Consumer
If you wish to suspend consumption but not cause a partition rebalance, you can pause and resume the consumer.
This is facilitated by managing the binding lifecycle as shown in Binding visualization and control in the Spring Cloud Stream documentation, using State.PAUSED
and State.RESUMED
.
To resume, you can use an ApplicationListener
(or @EventListener
method) to receive ListenerContainerIdleEvent
instances.
The frequency at which events are published is controlled by the idleEventInterval
property.
1.4. Transactional Binder
Enable transactions by setting spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
to a non-empty value, e.g. tx-
.
When used in a processor application, the consumer starts the transaction; any records sent on the consumer thread participate in the same transaction.
When the listener exits normally, the listener container will send the offset to the transaction and commit it.
A common producer factory is used for all producer bindings configured using spring.cloud.stream.kafka.binder.transaction.producer.*
properties; individual binding Kafka producer properties are ignored.
Normal binder retries (and dead lettering) are not supported with transactions because the retries will run in the original transaction, which may be rolled back and any published records will be rolled back too.
When retries are enabled (the common property maxAttempts is greater than zero) the retry properties are used to configure a DefaultAfterRollbackProcessor to enable retries at the container level.
Similarly, instead of publishing dead-letter records within the transaction, this functionality is moved to the listener container, again via the DefaultAfterRollbackProcessor which runs after the main transaction has rolled back.
|
If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. @Scheduled
method), you must get a reference to the transactional producer factory and define a KafkaTransactionManager
bean using it.
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
Notice that we get a reference to the binder using the BinderFactory
; use null
in the first argument when there is only one binder configured.
If more than one binder is configured, use the binder name to get the reference.
Once we have a reference to the binder, we can obtain a reference to the ProducerFactory
and create a transaction manager.
Then you would use normal Spring transaction support, e.g. TransactionTemplate
or @Transactional
, for example:
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
If you wish to synchronize producer-only transactions with those from some other transaction manager, use a ChainedTransactionManager
.
If you deploy multiple instances of your application, each instance needs a unique transactionIdPrefix .
|
1.5. Error Channels
Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination and can also be configured to send async producer send failures to an error channel. See this section on error handling for more information.
The payload of the ErrorMessage
for a send failure is a KafkaSendFailureException
with properties:
-
failedMessage
: The Spring MessagingMessage<?>
that failed to be sent. -
record
: The rawProducerRecord
that was created from thefailedMessage
There is no automatic handling of producer exceptions (such as sending to a Dead-Letter queue). You can consume these exceptions with your own Spring Integration flow.
1.6. Kafka Metrics
Kafka binder module exposes the following metrics:
spring.cloud.stream.binder.kafka.offset
: This metric indicates how many messages have not been yet consumed from a given binder’s topic by a given consumer group.
The metrics provided are based on the Micrometer library.
The binder creates the KafkaBinderMetrics
bean if Micrometer is on the classpath and no other such beans provided by the application.
The metric contains the consumer group information, topic and the actual lag in committed offset from the latest offset on the topic.
This metric is particularly useful for providing auto-scaling feedback to a PaaS platform.
The metric collection behaviour can be configured by setting properties in the spring.cloud.stream.kafka.binder.metrics
namespace,
refer to the kafka binder properties section for more information.
You can exclude KafkaBinderMetrics
from creating the necessary infrastructure like consumers and then reporting the metrics by providing the following component in the application.
@Component
class NoOpBindingMeters {
NoOpBindingMeters(MeterRegistry registry) {
registry.config().meterFilter(
MeterFilter.denyNameStartsWith(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME));
}
}
More details on how to suppress meters selectively can be found here.
1.7. Tombstone Records (null record values)
When using compacted topics, a record with a null
value (also called a tombstone record) represents the deletion of a key.
To receive such messages in a Spring Cloud Stream function, you can use the following strategy.
@Bean
public Function<Message<Person>, String> myFunction() {
return value -> {
Object v = value.getPayload();
String className = v.getClass().getName();
if (className.isEqualTo("org.springframework.kafka.support.KafkaNull")) {
// this is a tombstone record
}
else {
// continue with processing
}
};
}
1.8. Using a KafkaBindingRebalanceListener
Applications may wish to seek topics/partitions to arbitrary offsets when the partitions are initially assigned, or perform other operations on the consumer.
Starting with version 2.1, if you provide a single KafkaBindingRebalanceListener
bean in the application context, it will be wired into all Kafka consumer bindings.
public interface KafkaBindingRebalanceListener {
/**
* Invoked by the container before any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
}
/**
* Invoked by the container after any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
/**
* Invoked when partitions are initially assigned or after a rebalance.
* Applications might only want to perform seek operations on an initial assignment.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
boolean initial) {
}
}
You cannot set the resetOffsets
consumer property to true
when you provide a rebalance listener.
1.9. Retry and Dead Letter Processing
By default, when you configure retry (e.g. maxAttemts
) and enableDlq
in a consumer binding, these functions are performed within the binder, with no participation by the listener container or Kafka consumer.
There are situations where it is preferable to move this functionality to the listener container, such as:
-
The aggregate of retries and delays will exceed the consumer’s
max.poll.interval.ms
property, potentially causing a partition rebalance. -
You wish to publish the dead letter to a different Kafka cluster.
-
You wish to add retry listeners to the error handler.
-
…
To configure moving this functionality from the binder to the container, define a @Bean
of type ListenerContainerWithDlqAndRetryCustomizer
.
This interface has the following methods:
/**
* Configure the container.
* @param container the container.
* @param destinationName the destination name.
* @param group the group.
* @param dlqDestinationResolver a destination resolver for the dead letter topic (if
* enableDlq).
* @param backOff the backOff using retry properties (if configured).
* @see #retryAndDlqInBinding(String, String)
*/
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff);
/**
* Return false to move retries and DLQ from the binding to a customized error handler
* using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
* configured via
* {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
* @param destinationName the destination name.
* @param group the group.
* @return false to disable retries and DLQ in the binding
*/
default boolean retryAndDlqInBinding(String destinationName, String group) {
return true;
}
The destination resolver and BackOff
are created from the binding properties (if configured). The KafkaTemplate
uses configuration from spring.kafka….
properties. You can then use these to create a custom error handler and dead letter publisher; for example:
@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
return new ListenerContainerWithDlqAndRetryCustomizer() {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff) {
if (destinationName.equals("topicWithLongTotalRetryConfig")) {
ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
dlqDestinationResolver);
container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
}
}
@Override
public boolean retryAndDlqInBinding(String destinationName, String group) {
return !destinationName.contains("topicWithLongTotalRetryConfig");
}
};
}
Now, only a single retry delay needs to be greater than the consumer’s max.poll.interval.ms
property.
When working with several binders, the 'ListenerContainerWithDlqAndRetryCustomizer' bean gets overridden by the 'DefaultBinderFactory'. For the bean to apply, you need to use a 'BinderCustomizer' to set the container customizer (See [binder-customizer]):
@Bean
public BinderCustomizer binderCustomizer(ListenerContainerWithDlqAndRetryCustomizer containerCustomizer) {
return (binder, binderName) -> {
if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
kafkaMessageChannelBinder.setContainerCustomizer(containerCustomizer);
}
else if (binder instanceof KStreamBinder) {
...
}
else if (binder instanceof RabbitMessageChannelBinder) {
...
}
};
}
1.10. Customizing Consumer and Producer configuration
If you want advanced customization of consumer and producer configuration that is used for creating ConsumerFactory
and ProducerFactory
in Kafka,
you can implement the following customizers.
-
ConsumerConfigCustomizer
-
ProducerConfigCustomizer
Both of these interfaces provide a way to configure the config map used for consumer and producer properties.
For example, if you want to gain access to a bean that is defined at the application level, you can inject that in the implementation of the configure
method.
When the binder discovers that these customizers are available as beans, it will invoke the configure
method right before creating the consumer and producer factories.
Both of these interfaces also provide access to both the binding and destination names so that they can be accessed while customizing producer and consumer properties.
1.11. Customizing AdminClient Configuration
As with consumer and producer config customization above, applications can also customize the configuration for admin clients by providing an AdminClientConfigCustomizer
.
AdminClientConfigCustomizer’s configure method provides access to the admin client properties, using which you can define further customization.
Binder’s Kafka topic provisioner gives the highest precedence for the properties given through this customizer.
Here is an example of providing this customizer bean.
@Bean
public AdminClientConfigCustomizer adminClientConfigCustomizer() {
return props -> {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
};
}
1.12. Custom Kafka Binder Health Indicator
Kafka binder activates a default health indicator when Spring Boot actuator is on the classpath.
This health indicator checks the health of the binder and any communication issues with the Kafka broker.
If an application wants to disable this default health check implementation and include a custom implementation, then it can provide an implementation for KafkaBinderHealth
interface.
KafkaBinderHealth
is a marker interface that extends from HealthIndicator
.
In the custom implementation, it must provide an implementation for the health()
method.
The custom implementation must be present in the application configuration as a bean.
When the binder discovers the custom implementation, it will use that instead of the default implementation.
Here is an example of such a custom implementation bean in the application.
@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
return new KafkaBinderHealth() {
@Override
public Health health() {
// custom implementation details.
}
};
}
1.13. Dead-Letter Topic Processing
1.13.1. Dead-Letter Topic Partition Selection
By default, records are published to the Dead-Letter topic using the same partition as the original record. This means the Dead-Letter topic must have at least as many partitions as the original record.
To change this behavior, add a DlqPartitionFunction
implementation as a @Bean
to the application context.
Only one such bean can be present.
The function is provided with the consumer group, the failed ConsumerRecord
and the exception.
For example, if you always want to route to partition 0, you might use:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
If you set a consumer binding’s dlqPartitions property to 1 (and the binder’s minPartitionCount is equal to 1 ), there is no need to supply a DlqPartitionFunction ; the framework will always use partition 0.
If you set a consumer binding’s dlqPartitions property to a value greater than 1 (or the binder’s minPartitionCount is greater than 1 ), you must provide a DlqPartitionFunction bean, even if the partition count is the same as the original topic’s.
|
It is also possible to define a custom name for the DLQ topic.
In order to do so, create an implementation of DlqDestinationResolver
as a @Bean
to the application context.
When the binder detects such a bean, that takes precedence, otherwise it will use the dlqName
property.
If neither of these are found, it will default to error.<destination>.<group>
.
Here is an example of DlqDestinationResolver
as a @Bean
.
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
One important thing to keep in mind when providing an implementation for DlqDestinationResolver
is that the provisioner in the binder will not auto create topics for the application.
This is because there is no way for the binder to infer the names of all the DLQ topics the implementation might send to.
Therefore, if you provide DLQ names using this strategy, it is the application’s responsibility to ensure that those topics are created beforehand.
1.13.2. Handling Records in a Dead-Letter Topic
Because the framework cannot anticipate how users would want to dispose of dead-lettered messages, it does not provide any standard mechanism to handle them. If the reason for the dead-lettering is transient, you may wish to route the messages back to the original topic. However, if the problem is a permanent issue, that could cause an infinite loop. The sample Spring Boot application within this topic is an example of how to route those messages back to the original topic, but it moves them to a “parking lot” topic after three attempts. The application is another spring-cloud-stream application that reads from the dead-letter topic. It exits when no messages are received for 5 seconds.
The examples assume the original destination is so8400out
and the consumer group is so8400
.
There are a couple of strategies to consider:
-
Consider running the rerouting only when the main application is not running. Otherwise, the retries for transient errors are used up very quickly.
-
Alternatively, use a two-stage approach: Use this application to route to a third topic and another to route from there back to the main topic.
The following code listings show the sample application:
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private StreamBridge streamBridge;
@Bean
public Function<Message<?>, Message<?>> reRoute() {
return failed -> {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, retries + 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
streamBridge.send("parkingLot", MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
};
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, exiting");
return;
}
}
}
}
1.14. Partitioning with the Kafka Binder
Apache Kafka supports topic partitioning natively.
Sometimes it is advantageous to send data to specific partitions — for example, when you want to strictly order message processing (all messages for a particular customer should go to the same partition).
The following example shows how to configure the producer and consumer side:
@SpringBootApplication
public class KafkaPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"foo1", "bar1", "qux1",
"foo2", "bar2", "qux2",
"foo3", "bar3", "qux3",
"foo4", "bar4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
.web(false)
.run(args);
}
@Bean
public Supplier<Message<?>> generate() {
return () -> {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
};
}
}
spring:
cloud:
stream:
bindings:
generate-out-0:
destination: partitioned.topic
producer:
partition-key-expression: headers['partitionKey']
partition-count: 12
It is important to keep in mind that, since Apache Kafka supports partitioning natively, there is no need to rely on binder partitioning as described above unless you are using custom partition keys as in the example or an expression that involves the payload itself.
The binder-provided partitioning selection is otherwise intended for middleware technologies that do not support native partitioning.
Note that we are using a custom key called partitionKey in the above example, that will be the determining factor for the partition, thus in this case it is appropriate to use binder partitioning.
When using native Kafka partitioning, i.e, when you do not provide the partition-key-expression , then Apache Kafka will select a partition, which by default will be the hash value of the record key over the available number of partitions.
To add a key to an outbound record, set the KafkaHeaders.KEY header to the desired key value in a spring-messaging Message<?> .
By default, when no record key is provided, Apache Kafka will choose a partition based on the logic described in the Apache Kafka Documentation.
|
The topic must be provisioned to have enough partitions to achieve the desired concurrency for all consumer groups.
The above configuration supports up to 12 consumer instances (6 if their concurrency is 2, 4 if their concurrency is 3, and so on).
It is generally best to “over-provision” the partitions to allow for future increases in consumers or concurrency.
|
The preceding configuration uses the default partitioning (key.hashCode() % partitionCount ).
This may or may not provide a suitably balanced algorithm, depending on the key values. In particular, note that this partitioning strategy differs from the default used by a standalone Kafka producer - such as the one used by Kafka Streams, meaning that the same key value may balance differently across partitions when produced by those clients.
You can override this default by using the partitionSelectorExpression or partitionSelectorClass properties.
|
Since partitions are natively handled by Kafka, no special configuration is needed on the consumer side. Kafka allocates partitions across the instances.
The partitionCount for a kafka topic may change during runtime (e.g. due to an adminstration task). The calculated partitions will be different after that (e.g. new partitions will be used then). Since 4.0.3 of Spring Cloud Stream runtime changes of partition count will be supported. See also parameter 'spring.kafka.producer.properties.metadata.max.age.ms' to configure update interval. Due to some limitations it is not possible to use a 'partition-key-expression' which references the 'payload' of a message, the mechanism will be disabled in that case. The overall behavior is disabled by default and can be enabled using configuration parameter 'producer.dynamicPartitionUpdatesEnabled=true'. |
The following Spring Boot application listens to a Kafka stream and prints (to the console) the partition ID to which each message goes:
@SpringBootApplication
public class KafkaPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
int partition = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION);
System.out.println(message + " received from partition " + partition);
};
}
}
spring:
cloud:
stream:
bindings:
listen-in-0:
destination: partitioned.topic
group: myGroup
You can add instances as needed.
Kafka rebalances the partition allocations.
If the instance count (or instance count * concurrency
) exceeds the number of partitions, some consumers are idle.
2. Reactive Kafka Binder
Kafka binder in Spring Cloud Stream provides a dedicated reactive binder based on the Reactor Kafka project.
This reactive Kafka binder enables full end-to-end reactive capabilities such as backpressure, reactive streams etc. in applications based on Apache Kafka.
When your Spring Cloud Stream Kafka application is written using reactive types (Flux
, Mono
etc.), it is recommended to use this reactive Kafka binder instead of the regular message channel based Kafka binder.
2.1. Maven Coordinates
Following are the maven coordinates for the reactive Kafka binder.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-reactive</artifactId>
</dependency>
2.2. Basic Example using the Reactive Kafka Binder
In this section, we show some basic code snippets for writing a reactive Kafka application using the reactive binder and details around them.
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return s -> s.map(String::toUpperCase);
}
You can use the above upppercase
function with both message channel based Kafka binder (spring-cloud-stream-binder-kafka
) as well as the reactive Kafka binder (spring-cloud-stream-binder-kafka-reactive
), the topic of discussion in this section.
When using this function with the regular Kafka binder, although you are using reactive types in the application (i.e., in the uppercase
function), you only get the reactive streams within the execution of your function.
Outside the function’s execution context, there is no reactive benefits since the underlying binder is not based on the reactive stack.
Therefore, although this might look like it is bringing a full end-to-end reactive stack, this application is only partially reactive.
Now assume that you are using the proper reactive binder for Kafka - spring-cloud-stream-binder-kafka-reactive
with the above function’s application.
This binder implementation will give the full reactive benefits all the way from consumption on the top end to publishing at the bottom end of the chain.
This is because the underlying binder is built on top of Reactor Kafka's core API’s.
On the consumer side, it makes use of the KafkaReceiver which is a reactive implementation of a Kafka consumer.
Similarly, on the producer side, it uses KafkaSender API which is the reactive implementation of a Kafka producer.
Since the foundations of the reactive Kafka binder is built upon a proper reactive Kafka API, applications get the full benefits of using reactive technologies.
Things like automatic back pressure, among other reactive capabilities, are built-in for the application when using this reactive Kafka binder.
Starting with version 4.0.2, you can customize the ReceiverOptions
and SenderOptions
by providing one or more ReceiverOptionsCustomizer
or SenderOptionsCustomizer
beans respectively.
They are BiFunction
s which receive the binding name and initial options, returning the customized options.
The interfaces extend Ordered
so the customizers will be applied in the order required, when more than one are present.
The binder does not commit offsets by default.
Starting with version 4.0.2, the KafkaHeaders.ACKNOWLEDGMENT header contains a ReceiverOffset object which allows you to cause the offset to be committed by calling its acknowledge() or commit() methods.
|
@Bean
public Consumer<Flux<Message<String>> consume() {
return msg -> {
process(msg.getPayload());
msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
}
}
Refer to the reactor-kafka
documentation and javadocs for more information.
In addition, starting with version 4.0.3, the Kafka consumer property reactiveAtmostOnce
can be set to true
and the binder will automatically commit the offsets before records returned by each poll are processed.
Also, starting with version 4.0.3, you can set the consumer property reactiveAutoCommit
to true
and the the binder will automatically commit the offsets after the records returned by each poll are processed.
In these cases, the acknowledgment header is not present.
4.0.2 also provided reactiveAutoCommit , but the implementation was incorrect, it behaved similarly to reactiveAtMostOnce .
|
The following is an example of how to use reaciveAutoCommit
.
@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
return flux -> flux
.doOnNext(inner -> inner
.doOnNext(val -> {
log.info(val.value());
})
.subscribe())
.subscribe();
}
Note that reactor-kafka
returns a Flux<Flux<ConsumerRecord<?, ?>>>
when using auto commit.
Given that Spring has no access to the contents of the inner flux, the application must deal with the native ConsumerRecord
; there is no message conversion or conversion service applied to the contents.
This requires the use of native decoding (by specifying a Deserializer
of the appropriate type in the configuration) to return record keys/values of the desired types.
2.3. Consuming Records in the Raw Format
In the above upppercase
function, we are consuming the record as Flux<String>
and then produce it as Flux<String>
.
There might be occasions in which you need to receive the record in the original received format - the ReceiverRecord
.
Here is such a function.
@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}
In this function, note that, we are consuming the record as Flux<ReceiverRecord<byte[], byte[]>>
and then producing it as Flux<String>
.
ReceiverRecord
is the basic received record which is a specialized Kafka ConsumerRecord
in Reactor Kafka.
When using the reactive Kafka binder, the above function will give you access to the ReceiverRecord
type for each incoming record.
However, in this case, you need to provide a custom implementation for a RecordMessageConverter.
By default, the reactive Kafka binder uses a MessagingMessageConverter that converts the payload and headers from the ConsumerRecord
.
Therefore, by the time your handler method receives it, the payload is already extracted from the received record and passed onto the method as in the case of the first function we looked above.
By providing a custom RecordMessageConverter
implementation in the application, you can override the default behavior.
For example, if you want to consume the record as raw Flux<ReceiverRecord<byte[], byte[]>>
, then you can provide the following bean definition in the application.
@Bean
RecordMessageConverter fullRawReceivedRecord() {
return new RecordMessageConverter() {
private final RecordMessageConverter converter = new MessagingMessageConverter();
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type payloadType) {
return MessageBuilder.withPayload(record).build();
}
@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
return this.converter.fromMessage(message, defaultTopic);
}
};
}
Then, you need to instruct the framework to use this converter for the required binding.
Here is an example based on our lowercase
function.
spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"
lowercase-in-0
is the input binding name for our lowercase
function.
For the outbound (lowecase-out-0
), we still use the regular MessagingMessageConverter
.
In the toMessage
implementation above, we receive the raw ConsumerRecord
(ReceiverRecord
since we are in a reactive binder context) and then wrap it inside a Message
.
Then that message payload which is the ReceiverRecord
is provided to the user method.
If reactiveAutoCommit
is false
(default), call rec.receiverOffset().acknowledge()
(or commit()
) to cause the offset to be committed; if reactiveAutoCommit
is true
, the flux supplies ConsumerRecord
s instead.
Refer to the reactor-kafka
documentation and javadocs for more information.
2.4. Concurrency
When using reactive functions with the reactive Kafka binder, if you set concurrency on the consumer binding, then the binder creates as many dedicated KafkaReceiver
objects as provided by the concurrency value.
In other words, this creates multiple reactive streams with separate Flux
implementations.
This could be useful when you are consuming records from a partitioned topic.
For example, assume that the incoming topic has at least three partitions. Then you can set the following property.
spring.cloud.stream.bindings.lowercase-in-0.consumer.concurrency=3
That will create three dedicated KafkaReceiver
objects that generate three separate Flux
implementations and then stream them to the handler method.
2.5. Multiplex
Starting with version 4.0.3, the common consumer property multiplex
is now supported by the reactive binder, where a single binding can consume from multiple topics.
When false
(default), a separate binding is created for each topic specified in a comma-delimited list in the common destination
property.
2.6. Destination is Pattern
Starting with version 4.0.3, the destination-is-pattern
Kafka binding consumer property is now supported.
The receiver options are conigured with a regex Pattern
, allowing the binding to consume from any topic that matches the pattern.
2.7. Sender Result Channel
Starting with version 4.0.3, you can configure the resultMetadataChannel
to receive SenderResult<?>
s to determine success/failure of sends.
The SenderResult
contains correlationMetadata
to allow you to correlate results with sends; it also contains RecordMetadata
, which indicates the TopicPartition
and offset of the sent record.
The resultMetadataChannel
must be a FluxMessageChannel
instance.
Here is an example of how to use this feature, with correlation metadata of type Integer
:
@Bean
FluxMessageChannel sendResults() {
return new FluxMessageChannel();
}
@ServiceActivator(inputChannel = "sendResults")
void handleResults(SenderResult<Integer> result) {
if (result.exception() != null) {
failureFor(result);
}
else {
successFor(result);
}
}
To set the correlation metadata on an output record, set the CORRELATION_ID
header:
streamBridge.send("words1", MessageBuilder.withPayload("foobar")
.setCorrelationId(42)
.build());
When using the feature with a Function
, the function output type must be a Message<?>
with the correlation id header set to the desired value.
Metadata should be unique, at least for the duration of the send.
3. Kafka Streams Binder
3.1. Usage
For using the Kafka Streams binder, you just need to add it to your Spring Cloud Stream application, using the following maven coordinates:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
A quick way to bootstrap a new project for Kafka Streams binder is to use Spring Initializr and then select "Cloud Streams" and "Spring for Kafka Streams" as shown below
3.2. Overview
Spring Cloud Stream includes a binder implementation designed explicitly for Apache Kafka Streams binding. With this native integration, a Spring Cloud Stream "processor" application can directly use the Apache Kafka Streams APIs in the core business logic.
Kafka Streams binder implementation builds on the foundations provided by the Spring for Apache Kafka project.
Kafka Streams binder provides binding capabilities for the three major types in Kafka Streams - KStream
, KTable
and GlobalKTable
.
Kafka Streams applications typically follow a model in which the records are read from an inbound topic, apply business logic, and then write the transformed records to an outbound topic. Alternatively, a Processor application with no outbound destination can be defined as well.
In the following sections, we are going to look at the details of Spring Cloud Stream’s integration with Kafka Streams.
3.3. Programming Model
When using the programming model provided by Kafka Streams binder, both the high-level Streams DSL and a mix of both the higher level and the lower level Processor-API can be used as options.
When mixing both higher and lower level API’s, this is usually achieved by invoking transform
or process
API methods on KStream
.
3.3.1. Functional Style
Starting with Spring Cloud Stream 3.0.0
, Kafka Streams binder allows the applications to be designed and developed using the functional programming style that is available in Java 8.
This means that the applications can be concisely represented as a lambda expression of types java.util.function.Function
or java.util.function.Consumer
.
Let’s take a very basic example.
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
Albeit simple, this is a complete standalone Spring Boot application that is leveraging Kafka Streams for stream processing.
This is a consumer application with no outbound binding and only a single inbound binding.
The application consumes data and it simply logs the information from the KStream
key and value on the standard output.
The application contains the SpringBootApplication
annotation and a method that is marked as Bean
.
The bean method is of type java.util.function.Consumer
which is parameterized with KStream
.
Then in the implementation, we are returning a Consumer object that is essentially a lambda expression.
Inside the lambda expression, the code for processing the data is provided.
In this application, there is a single input binding that is of type KStream
.
The binder creates this binding for the application with a name process-in-0
, i.e. the name of the function bean name followed by a dash character (-
) and the literal in
followed by another dash and then the ordinal position of the parameter.
You use this binding name to set other properties such as destination.
For example, spring.cloud.stream.bindings.process-in-0.destination=my-topic
.
If the destination property is not set on the binding, a topic is created with the same name as the binding (if there are sufficient privileges for the application) or that topic is expected to be already available. |
Once built as a uber-jar (e.g., kstream-consumer-app.jar
), you can run the above example like the following.
If the applications choose to define the functional beans using Spring’s Component
annotation, the binder also supports that model.
The above functional bean could be rewritten as below.
@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {
@Override
public void accept(KStream<Object, String> input) {
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic
Here is another example, where it is a full processor with both input and output bindings. This is the classic word-count example in which the application receives data from a topic, the number of occurrences for each word is then computed in a tumbling time-window.
@SpringBootApplication
public class WordCountProcessorApplication {
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("word-counts-state-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
}
Here again, this is a complete Spring Boot application. The difference here from the first application is that the bean method is of type java.util.function.Function
.
The first parameterized type for the Function
is for the input KStream
and the second one is for the output.
In the method body, a lambda expression is provided that is of type Function
and as implementation, the actual business logic is given.
Similar to the previously discussed Consumer based application, the input binding here is named as process-in-0
by default. For the output, the binding name is automatically also set to process-out-0
.
Once built as an uber-jar (e.g., wordcount-processor.jar
), you can run the above example like the following.
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
This application will consume messages from the Kafka topic words
and the computed results are published to an output
topic counts
.
Spring Cloud Stream will ensure that the messages from both the incoming and outgoing topics are automatically bound as KStream objects. As a developer, you can exclusively focus on the business aspects of the code, i.e. writing the logic required in the processor. Setting up Kafka Streams specific configuration required by the Kafka Streams infrastructure is automatically handled by the framework.
The two examples we saw above have a single KStream
input binding. In both cases, the bindings received the records from a single topic.
If you want to multiplex multiple topics into a single KStream
binding, you can provide comma separated Kafka topics as destinations below.
spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3
In addition, you can also provide topic patterns as destinations if you want to match topics against a regular exression.
spring.cloud.stream.bindings.process-in-0.destination=input.*
Multiple Input Bindings
Many non-trivial Kafka Streams applications often consume data from more than one topic through multiple bindings.
For instance, one topic is consumed as Kstream
and another as KTable
or GlobalKTable
.
There are many reasons why an application might want to receive data as a table type.
Think of a use-case where the underlying topic is populated through a change data capture (CDC) mechanism from a database or perhaps the application only cares about the latest updates for downstream processing.
If the application specifies that the data needs to be bound as KTable
or GlobalKTable
, then Kafka Streams binder will properly bind the destination to a KTable
or GlobalKTable
and make them available for the application to operate upon.
We will look at a few different scenarios how multiple input bindings are handled in the Kafka Streams binder.
BiFunction in Kafka Streams Binder
Here is an example where we have two inputs and an output. In this case, the application can leverage on java.util.function.BiFunction
.
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
Here again, the basic theme is the same as in the previous examples, but here we have two inputs.
Java’s BiFunction
support is used to bind the inputs to the desired destinations.
The default binding names generated by the binder for the inputs are process-in-0
and process-in-1
respectively. The default output binding is process-out-0
.
In this example, the first parameter of BiFunction
is bound as a KStream
for the first input and the second parameter is bound as a KTable
for the second input.
BiConsumer in Kafka Streams Binder
If there are two inputs, but no outputs, in that case we can use java.util.function.BiConsumer
as shown below.
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {}
}
Beyond two inputs
What if you have more than two inputs? There are situations in which you need more than two inputs. In that case, the binder allows you to chain partial functions. In functional programming jargon, this technique is generally known as currying. With the functional programming support added as part of Java 8, Java now enables you to write curried functions. Spring Cloud Stream Kafka Streams binder can make use of this feature to enable multiple input bindings.
Let’s see an example.
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
return orders -> (
customers -> (
products -> (
orders.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
Let’s look at the details of the binding model presented above.
In this model, we have 3 partially applied functions on the inbound. Let’s call them as f(x)
, f(y)
and f(z)
.
If we expand these functions in the sense of true mathematical functions, it will look like these: f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>
.
The x
variable stands for KStream<Long, Order>
, the y
variable stands for GlobalKTable<Long, Customer>
and the z
variable stands for GlobalKTable<Long, Product>
.
The first function f(x)
has the first input binding of the application (KStream<Long, Order>
) and its output is the function, f(y).
The function f(y)
has the second input binding for the application (GlobalKTable<Long, Customer>
) and its output is yet another function, f(z)
.
The input for the function f(z)
is the third input for the application (GlobalKTable<Long, Product>
) and its output is KStream<Long, EnrichedOrder>
which is the final output binding for the application.
The input from the three partial functions which are KStream
, GlobalKTable
, GlobalKTable
respectively are available for you in the method body for implementing the business logic as part of the lambda expression.
Input bindings are named as enrichOrder-in-0
, enrichOrder-in-1
and enrichOrder-in-2
respectively. Output binding is named as enrichOrder-out-0
.
With curried functions, you can virtually have any number of inputs. However, keep in mind that, anything more than a smaller number of inputs and partially applied functions for them as above in Java might lead to unreadable code. Therefore if your Kafka Streams application requires more than a reasonably smaller number of input bindings, and you want to use this functional model, then you may want to rethink your design and decompose the application appropriately.
Output Bindings
Kafka Streams binder allows types of either KStream
or KTable
as output bindings.
Behind the scenes, the binder uses the to
method on KStream
to send the resultant records to the output topic.
If the application provides a KTable
as output in the function, the binder still uses this technique by delegating to the to
method of KStream
.
For example both functions below will work:
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
Multiple Output Bindings
Kafka Streams allows writing outbound data into multiple topics. This feature is known as branching in Kafka Streams.
When using multiple output bindings, you need to provide an array of KStream (KStream[]
) as the outbound return type.
Here is an example:
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> {
final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.split()
.branch(isEnglish)
.branch(isFrench)
.branch(isSpanish)
.noDefaultBranch();
return stringKStreamMap.values().toArray(new KStream[0]);
};
}
The programming model remains the same, however the outbound parameterized type is KStream[]
.
The default output binding names are process-out-0
, process-out-1
, process-out-2
respectively for the function above.
The reason why the binder generates three output bindings is because it detects the length of the returned KStream
array as three.
Note that in this example, we provide a noDefaultBranch()
; if we have used defaultBranch()
instead, that would have required an extra output binding, essentially returning a KStream
array of length four.
Summary of Function based Programming Styles for Kafka Streams
In summary, the following table shows the various options that can be used in the functional paradigm.
Number of Inputs | Number of Outputs | Component to use |
---|---|---|
1 |
0 |
java.util.function.Consumer |
2 |
0 |
java.util.function.BiConsumer |
1 |
1..n |
java.util.function.Function |
2 |
1..n |
java.util.function.BiFunction |
>= 3 |
0..n |
Use curried functions |
-
In the case of more than one output in this table, the type simply becomes
KStream[]
.
Function composition in Kafka Streams binder
Kafka Streams binder supports minimal forms of functional composition for linear topologies.
Using the Java functional API support, you can write multiple functions and then compose them on your own using the andThen
method.
For example, assume that you have the following two functions.
@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
return input -> input.peek((s, s2) -> {});
}
@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
return input -> input.peek((s, s2) -> {});
}
Even without the functional composition support in the binder, you can compose these two functions as below.
@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
foo().andThen(bar());
}
Then you can provide definitions of the form spring.cloud.function.definition=foo;bar;composed
.
With the functional composition support in the binder, you don’t need to write this third function in which you are doing explicit function composition.
You can simply do this instead:
spring.cloud.function.definition=foo|bar
You can even do this:
spring.cloud.function.definition=foo|bar;foo;bar
The composed function’s default binding names in this example becomes foobar-in-0
and foobar-out-0
.
Limitations of functional composition in Kafka Streams bincer
When you have java.util.function.Function
bean, that can be composed with another function or multiple functions.
The same function bean can be composed with a java.util.function.Consumer
as well. In this case, consumer is the last component composed.
A function can be composed with multiple functions, then end with a java.util.function.Consumer
bean as well.
When composing the beans of type java.util.function.BiFunction
, the BiFunction
must be the first function in the definition.
The composed entities must be either of type java.util.function.Function
or java.util.funciton.Consumer
.
In other words, you cannot take a BiFunction
bean and then compose with another BiFunction
.
You cannot compose with types of BiConsumer
or definitions where Consumer
is the first component.
You cannot also compose with functions where the output is an array (KStream[]
for branching) unless this is the last component in the definition.
The very first Function
of BiFunction
in the function definition may use a curried form also.
For example, the following is possible.
@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
return a -> b ->
a.join(b, (value1, value2) -> value1 + value2);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}
and the function definition could be curriedFoo|bar
.
Behind the scenes, the binder will create two input bindings for the curried function, and an output binding based on the final function in the definition.
The default input bindings in this case are going to be curriedFoobar-in-0
and curriedFoobar-in-1
.
The default output binding for this example becomes curriedFoobar-out-0
.
Special note on using KTable
as output in function composition
Lets say you have the following two functions.
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
You can compose them as foo|bar
, but keep in mind that the second function (bar
in this case) must have a KTable
as input since the first function (foo
) has KTable
as output.
3.4. Ancillaries to the programming model
3.4.1. Multiple Kafka Streams processors within a single application
Binder allows to have multiple Kafka Streams processors within a single Spring Cloud Stream application. You can have an application as below.
@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
...
}
In this case, the binder will create 3 separate Kafka Streams objects with different application ID’s (more on this below). However, if you have more than one processor in the application, you have to tell Spring Cloud Stream, which functions need to be activated. Here is how you activate the functions.
spring.cloud.function.definition: process;anotherProcess;yetAnotherProcess
If you want certain functions to be not activated right away, you can remove that from this list.
This is also true when you have a single Kafka Streams processor and other types of Function
beans in the same application that is handled through a different binder (for e.g., a function bean that is based on the regular Kafka Message Channel binder)
3.4.2. Kafka Streams Application ID
Application id is a mandatory property that you need to provide for a Kafka Streams application. Spring Cloud Stream Kafka Streams binder allows you to configure this application id in multiple ways.
If you only have one single processor in the application, then you can set this at the binder level using the following property:
spring.cloud.stream.kafka.streams.binder.applicationId
.
As a convenience, if you only have a single processor, you can also use spring.application.name
as the property to delegate the application id.
If you have multiple Kafka Streams processors in the application, then you need to set the application id per processor. In the case of the functional model, you can attach it to each function as a property.
For e.g. imagine that you have the following functions.
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
...
}
and
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
Then you can set the application id for each, using the following binder level properties.
spring.cloud.stream.kafka.streams.binder.functions.process.applicationId
and
spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId
For function based model also, this approach of setting application id at the binding level will work. However, setting per function at the binder level as we have seen above is much easier if you are using the functional model.
For production deployments, it is highly recommended to explicitly specify the application ID through configuration. This is especially going to be very critical if you are auto scaling your application in which case you need to make sure that you are deploying each instance with the same application ID.
If the application does not provide an application ID, then in that case the binder will auto generate a static application ID for you.
This is convenient in development scenarios as it avoids the need for explicitly providing the application ID.
The generated application ID in this manner will be static over application restarts.
In the case of functional model, the generated application ID will be the function bean name followed by the literal applicationID
, for e.g process-applicationID
if process
if the function bean name.
Summary of setting Application ID
-
By default, binder will auto generate the application ID per function methods.
-
If you have a single processor, then you can use
spring.kafka.streams.applicationId
,spring.application.name
orspring.cloud.stream.kafka.streams.binder.applicationId
. -
If you have multiple processors, then application ID can be set per function using the property -
spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId
.
3.4.3. Overriding the default binding names generated by the binder with the functional style
By default, the binder uses the strategy discussed above to generate the binding name when using the functional style, i.e. <function-bean-name>-<in>|<out>-[0..n], for e.g. process-in-0, process-out-0 etc. If you want to override those binding names, you can do that by specifying the following properties.
spring.cloud.stream.function.bindings.<default binding name>
. Default binding name is the original binding name generated by the binder.
For e.g. lets say, you have this function.
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
Binder will generate bindings with names, process-in-0
, process-in-1
and process-out-0
.
Now, if you want to change them to something else completely, maybe more domain specific binding names, then you can do so as below.
spring.cloud.stream.function.bindings.process-in-0=users
spring.cloud.stream.function.bindings.process-in-0=regions
and
spring.cloud.stream.function.bindings.process-out-0=clicks
After that, you must set all the binding level properties on these new binding names.
Please keep in mind that with the functional programming model described above, adhering to the default binding names make sense in most situations. The only reason you may still want to do this overriding is when you have larger number of configuration properties and you want to map the bindings to something more domain friendly.
3.4.4. Setting up bootstrap server configuration
When running Kafka Streams applications, you must provide the Kafka broker server information.
If you don’t provide this information, the binder expects that you are running the broker at the default localhost:9092
.
If that is not the case, then you need to override that. There are a couple of ways to do that.
-
Using the boot property -
spring.kafka.bootstrapServers
-
Binder level property -
spring.cloud.stream.kafka.streams.binder.brokers
When it comes to the binder level property, it doesn’t matter if you use the broker property provided through the regular Kafka binder - spring.cloud.stream.kafka.binder.brokers
.
Kafka Streams binder will first check if Kafka Streams binder specific broker property is set (spring.cloud.stream.kafka.streams.binder.brokers
) and if not found, it looks for spring.cloud.stream.kafka.binder.brokers
.
3.5. Record serialization and deserialization
Kafka Streams binder allows you to serialize and deserialize records in two ways. One is the native serialization and deserialization facilities provided by Kafka and the other one is the message conversion capabilities of Spring Cloud Stream framework. Lets look at some details.
3.5.1. Inbound deserialization
Keys are always deserialized using native Serdes.
For values, by default, deserialization on the inbound is natively performed by Kafka. Please note that this is a major change on default behavior from previous versions of Kafka Streams binder where the deserialization was done by the framework.
Kafka Streams binder will try to infer matching Serde
types by looking at the type signature of java.util.function.Function|Consumer
.
Here is the order that it matches the Serdes.
-
If the application provides a bean of type
Serde
and if the return type is parameterized with the actual type of the incoming key or value type, then it will use thatSerde
for inbound deserialization. For e.g. if you have the following in the application, the binder detects that the incoming value type for theKStream
matches with a type that is parameterized on aSerde
bean. It will use that for inbound deserialization.
@Bean
public Serde<Foo> customSerde() {
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
Next, it looks at the types and see if they are one of the types exposed by Kafka Streams. If so, use them. Here are the Serde types that the binder will try to match from Kafka Streams.
Integer, Long, Short, Double, Float, byte[], UUID and String.
-
If none of the Serdes provided by Kafka Streams don’t match the types, then it will use JsonSerde provided by Spring Kafka. In this case, the binder assumes that the types are JSON friendly. This is useful if you have multiple value objects as inputs since the binder will internally infer them to correct Java types. Before falling back to the
JsonSerde
though, the binder checks at the defaultSerde
s set in the Kafka Streams configuration to see if it is aSerde
that it can match with the incoming KStream’s types.
If none of the above strategies worked, then the applications must provide the Serde
s through configuration.
This can be configured in two ways - binding or default.
First the binder will look if a Serde
is provided at the binding level.
For e.g. if you have the following processor,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
then, you can provide a binding level Serde
using the following:
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
If you provide Serde as abover per input binding, then that will takes higher precedence and the binder will stay away from any Serde inference.
|
If you want the default key/value Serdes to be used for inbound deserialization, you can do so at the binder level.
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
If you don’t want the native decoding provided by Kafka, you can rely on the message conversion features that Spring Cloud Stream provides. Since native decoding is the default, in order to let Spring Cloud Stream deserialize the inbound value object, you need to explicitly disable native decoding.
For e.g. if you have the same BiFunction processor as above, then spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
You need to disable native decoding for all the inputs individually. Otherwise, native decoding will still be applied for those you do not disable.
By default, Spring Cloud Stream will use application/json
as the content type and use an appropriate json message converter.
You can use custom message converters by using the following property and an appropriate MessageConverter
bean.
spring.cloud.stream.bindings.process-in-0.contentType
3.5.2. Outbound serialization
Outbound serialization pretty much follows the same rules as above for inbound deserialization. As with the inbound deserialization, one major change from the previous versions of Spring Cloud Stream is that the serialization on the outbound is handled by Kafka natively. Before 3.0 versions of the binder, this was done by the framework itself.
Keys on the outbound are always serialized by Kafka using a matching Serde
that is inferred by the binder.
If it can’t infer the type of the key, then that needs to be specified using configuration.
Value serdes are inferred using the same rules used for inbound deserialization.
First it matches to see if the outbound type is from a provided bean in the application.
If not, it checks to see if it matches with a Serde
exposed by Kafka such as - Integer
, Long
, Short
, Double
, Float
, byte[]
, UUID
and String
.
If that doesnt’t work, then it falls back to JsonSerde
provided by the Spring Kafka project, but first look at the default Serde
configuration to see if there is a match.
Keep in mind that all these happen transparently to the application.
If none of these work, then the user has to provide the Serde
to use by configuration.
Lets say you are using the same BiFunction
processor as above. Then you can configure outbound key/value Serdes as following.
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
If Serde inference fails, and no binding level Serdes are provided, then the binder falls back to the JsonSerde
, but look at the default Serdes for a match.
Default serdes are configured in the same way as above where it is described under deserialization.
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
If your application uses the branching feature and has multiple output bindings, then these have to be configured per binding.
Once again, if the binder is capable of inferring the Serde
types, you don’t need to do this configuration.
If you don’t want the native encoding provided by Kafka, but want to use the framework provided message conversion, then you need to explicitly disable native encoding since since native encoding is the default.
For e.g. if you have the same BiFunction processor as above, then spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
You need to disable native encoding for all the output individually in the case of branching. Otherwise, native encoding will still be applied for those you don’t disable.
When conversion is done by Spring Cloud Stream, by default, it will use application/json
as the content type and use an appropriate json message converter.
You can use custom message converters by using the following property and a corresponding MessageConverter
bean.
spring.cloud.stream.bindings.process-out-0.contentType
When native encoding/decoding is disabled, binder will not do any inference as in the case of native Serdes.
Applications need to explicitly provide all the configuration options.
For that reason, it is generally advised to stay with the default options for de/serialization and stick with native de/serialization provided by Kafka Streams when you write Spring Cloud Stream Kafka Streams applications.
The one scenario in which you must use message conversion capabilities provided by the framework is when your upstream producer is using a specific serialization strategy.
In that case, you want to use a matching deserialization strategy as native mechanisms may fail.
When relying on the default Serde
mechanism, the applications must ensure that the binder has a way forward with correctly map the inbound and outbound with a proper Serde
, as otherwise things might fail.
It is worth to mention that the data de/serialization approaches outlined above are only applicable on the edges of your processors, i.e. - inbound and outbound.
Your business logic might still need to call Kafka Streams API’s that explicitly need Serde
objects.
Those are still the responsibility of the application and must be handled accordingly by the developer.
3.6. Error Handling
Apache Kafka Streams provides the capability for natively handling exceptions from deserialization errors.
For details on this support, please see this.
Out of the box, Apache Kafka Streams provides two kinds of deserialization exception handlers - LogAndContinueExceptionHandler
and LogAndFailExceptionHandler
.
As the name indicates, the former will log the error and continue processing the next records and the latter will log the error and fail. LogAndFailExceptionHandler
is the default deserialization exception handler.
3.6.1. Handling Deserialization Exceptions in the Binder
Kafka Streams binder allows to specify the deserialization exception handlers above using the following property.
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
or
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
In addition to the above two deserialization exception handlers, the binder also provides a third one for sending the erroneous records (poison pills) to a DLQ (dead letter queue) topic. Here is how you enable this DLQ exception handler.
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
When the above property is set, all the records in deserialization error are automatically sent to the DLQ topic.
You can set the topic name where the DLQ messages are published as below.
You can provide an implementation for DlqDestinationResolver
which is a functional interface.
DlqDestinationResolver
takes ConsumerRecord
and the exception as inputs and then allows to specify a topic name as the output.
By gaining access to the Kafka ConsumerRecord
, the header records can be introspected in the implementation of the BiFunction
.
Here is an example of providing an implementation for DlqDestinationResolver
.
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
One important thing to keep in mind when providing an implementation for DlqDestinationResolver
is that the provisioner in the binder will not auto create topics for the application.
This is because there is no way for the binder to infer the names of all the DLQ topics the implementation might send to.
Therefore, if you provide DLQ names using this strategy, it is the application’s responsibility to ensure that those topics are created beforehand.
If DlqDestinationResolver
is present in the application as a bean, that takes higher precedence.
If you do not want to follow this approach and rather provide a static DLQ name using configuration, you can set the following property.
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
If this is set, then the error records are sent to the topic custom-dlq
.
If the application is not using either of the above strategies, then it will create a DLQ topic with the name error.<input-topic-name>.<application-id>
.
For instance, if your binding’s destination topic is inputTopic
and the application ID is process-applicationId
, then the default DLQ topic is error.inputTopic.process-applicationId
.
It is always recommended to explicitly create a DLQ topic for each input binding if it is your intention to enable DLQ.
3.6.2. DLQ per input consumer binding
The property spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
is applicable for the entire application.
This implies that if there are multiple functions in the same application, this property is applied to all of them.
However, if you have multiple processors or multiple input bindings within a single processor, then you can use the finer-grained DLQ control that the binder provides per input consumer binding.
If you have the following processor,
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
and you only want to enable DLQ on the first input binding and skipAndContinue on the second binding, then you can do so on the consumer as below.
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue
Setting deserialization exception handlers this way has a higher precedence than setting at the binder level.
3.6.3. DLQ partitioning
By default, records are published to the Dead-Letter topic using the same partition as the original record. This means the Dead-Letter topic must have at least as many partitions as the original record.
To change this behavior, add a DlqPartitionFunction
implementation as a @Bean
to the application context.
Only one such bean can be present.
The function is provided with the consumer group (which is the same as the application ID in most situations), the failed ConsumerRecord
and the exception.
For example, if you always want to route to partition 0, you might use:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
If you set a consumer binding’s dlqPartitions property to 1 (and the binder’s minPartitionCount is equal to 1 ), there is no need to supply a DlqPartitionFunction ; the framework will always use partition 0.
If you set a consumer binding’s dlqPartitions property to a value greater than 1 (or the binder’s minPartitionCount is greater than 1 ), you must provide a DlqPartitionFunction bean, even if the partition count is the same as the original topic’s.
|
A couple of things to keep in mind when using the exception handling feature in Kafka Streams binder.
-
The property
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
is applicable for the entire application. This implies that if there are multiple functions in the same application, this property is applied to all of them. -
The exception handling for deserialization works consistently with native deserialization and framework provided message conversion.
3.6.4. Handling Production Exceptions in the Binder
Unlike the support for deserialization exception handlers as described above, the binder does not provide such first class mechanisms for handling production exceptions.
However, you still can configure production exception handlers using the StreamsBuilderFactoryBean
customizer which you can find more details about, in a subsequent section below.
3.7. Retrying critical business logic
There are scenarios in which you might want to retry parts of your business logic that are critical to the application.
There maybe an external call to a relational database or invoking a REST endpoint from the Kafka Streams processor.
These calls can fail for various reasons such as network issues or remote service unavailability.
More often, these failures may self resolve if you can try them again.
By default, Kafka Streams binder creates RetryTemplate
beans for all the input bindings.
If the function has the following signature,
@Bean
public java.util.function.Consumer<KStream<Object, String>> process()
and with default binding name, the RetryTemplate
will be registered as process-in-0-RetryTemplate
.
This is following the convention of binding name (process-in-0
) followed by the literal -RetryTemplate
.
In the case of multiple input bindings, there will be a separate RetryTemplate
bean available per binding.
If there is a custom RetryTemplate
bean available in the application and provided through spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName
, then that takes precedence over any input binding level retry template configuration properties.
Once the RetryTemplate
from the binding is injected into the application, it can be used to retry any critical sections of the application.
Here is an example:
@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
retryTemplate.execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
Or you can use a custom RetryTemplate
as below.
@EnableAutoConfiguration
public static class CustomRetryTemplateApp {
@Bean
@StreamRetryTemplate
RetryTemplate fooRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1);
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
fooRetryTemplate().execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
}
Note that when retries are exhausted, by default, the last exception will be thrown, causing the processor to terminate.
If you wish to handle the exception and continue processing, you can add a RecoveryCallback to the execute
method:
Here is an example.
retryTemplate.execute(context -> {
//Critical business logic goes here.
}, context -> {
//Recovery logic goes here.
return null;
));
Refer to the Spring Retry project for more information about the RetryTemplate, retry policies, backoff policies and more.
3.8. State Store
State stores are created automatically by Kafka Streams when the high level DSL is used and appropriate calls are made those trigger a state store.
If you want to materialize an incoming KTable
binding as a named state store, then you can do so by using the following strategy.
Lets say you have the following function.
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
Then by setting the following property, the incoming KTable
data will be materialized in to the named state store.
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store
You can define custom state stores as beans in your application and those will be detected and added to the Kafka Streams builder by the binder. Especially when the processor API is used, you need to register a state store manually. In order to do so, you can create the StateStore as a bean in the application. Here are examples of defining such beans.
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
These state stores can be then accessed by the applications directly.
During the bootstrap, the above beans will be processed by the binder and passed on to the Streams builder object.
Accessing the state store:
Processor<Object, Product>() {
WindowStore<Object, String> state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}
This will not work when it comes to registering global state stores.
In order to register a global state store, please see the section below on customizing StreamsBuilderFactoryBean
.
3.9. Interactive Queries
Kafka Streams binder API exposes a class called InteractiveQueryService
to interactively query the state stores.
You can access this as a Spring bean in your application. An easy way to get access to this bean from your application is to autowire
the bean.
@Autowired
private InteractiveQueryService interactiveQueryService;
Once you gain access to this bean, then you can query for the particular state-store that you are interested. See below.
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
During the startup, the above method call to retrieve the store might fail. For example, it might still be in the middle of initializing the state store. In such cases, it will be useful to retry this operation. Kafka Streams binder provides a simple retry mechanism to accommodate this.
Following are the two properties that you can use to control this retrying.
-
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - Default is
1
. -
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - Default is
1000
milliseconds.
If there are multiple instances of the kafka streams application running, then before you can query them interactively, you need to identify which application instance hosts the particular key that you are querying.
InteractiveQueryService
API provides methods for identifying the host information.
In order for this to work, you must configure the property application.server
as below:
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
Here are some code snippets:
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
For more information on these host finding methods, please see the Javadoc on the methods. For these methods also, during startup, if the underlying KafkaStreams objects are not ready, they might throw exceptions. The aforementioned retry properties are applicable for these methods as well.
3.9.1. Other API methods available through the InteractiveQueryService
Use the following API method to retrieve the KeyQueryMetadata
object associated with the combination of given store and key.
public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)
Use the following API method to retrieve the KakfaStreams
object associated with the combination of given store and key.
public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)
3.9.2. Customizing Store Query Parameters
Sometimes it is necessary that you need to fine tune the store query parameters before querying the store through InteractiveQueryService
.
For this purpose, starting with the 4.0.1
version of the binder, you can provide a bean for StoreQueryParametersCustomizer
which is a functional interface with a customize
method that takes a StoreQueryParameter
as the argument.
Here is its method signature.
StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);
Using this approach, applications can further customize the StoreQueryParameters
such as enabling stale stores.
When this bean is present in this application, InteractiveQueryService
will call its customize
method before querying the state store.
Keep in mind that, there must be a unique bean for StoreQueryParametersCustomizer available in the application.
|
3.10. Health Indicator
The health indicator requires the dependency spring-boot-starter-actuator
. For maven use:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Spring Cloud Stream Kafka Streams Binder provides a health indicator to check the state of the underlying streams threads.
Spring Cloud Stream defines a property management.health.binders.enabled
to enable the health indicator. See the
Spring Cloud Stream documentation.
The health indicator provides the following details for each stream thread’s metadata:
-
Thread name
-
Thread state:
CREATED
,RUNNING
,PARTITIONS_REVOKED
,PARTITIONS_ASSIGNED
,PENDING_SHUTDOWN
orDEAD
-
Active tasks: task ID and partitions
-
Standby tasks: task ID and partitions
By default, only the global status is visible (UP
or DOWN
). To show the details, the property management.endpoint.health.show-details
must be set to ALWAYS
or WHEN_AUTHORIZED
.
For more details about the health information, see the
Spring Boot Actuator documentation.
The status of the health indicator is UP if all the Kafka threads registered are in the RUNNING state.
|
Since there are three individual binders in Kafka Streams binder (KStream
, KTable
and GlobalKTable
), all of them will report the health status.
When enabling show-details
, some of the information reported may be redundant.
When there are multiple Kafka Streams processors present in the same application, then the health checks will be reported for all of them and will be categorized by the application ID of Kafka Streams.
3.11. Accessing Kafka Streams Metrics
Spring Cloud Stream Kafka Streams binder provides Kafka Streams metrics which can be exported through a Micrometer MeterRegistry
.
For Spring Boot version 2.2.x, the metrics support is provided through a custom Micrometer metrics implementation by the binder. For Spring Boot version 2.3.x, the Kafka Streams metrics support is provided natively through Micrometer.
When accessing metrics through the Boot actuator endpoint, make sure to add metrics
to the property management.endpoints.web.exposure.include
.
Then you can access /acutator/metrics
to get a list of all the available metrics, which then can be individually accessed through the same URI (/actuator/metrics/<metric-name>
).
3.12. Mixing high level DSL and low level Processor API
Kafka Streams provides two variants of APIs.
It has a higher level DSL like API where you can chain various operations that maybe familiar to a lot of functional programmers.
Kafka Streams also gives access to a low level Processor API.
The processor API, although very powerful and gives the ability to control things in a much lower level, is imperative in nature.
Kafka Streams binder for Spring Cloud Stream, allows you to use either the high level DSL or mixing both the DSL and the processor API.
Mixing both of these variants give you a lot of options to control various use cases in an application.
Applications can use the transform
or process
method API calls to get access to the processor API.
Here is a look at how one may combine both the DSL and the processor API in a Spring Cloud Stream application using the process
API.
@Bean
public Consumer<KStream<Object, String>> process() {
return input ->
input.process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, String value) {
//business logic
}
@Override
public void close() {
});
}
Here is an example using the transform
API.
@Bean
public Consumer<KStream<Object, String>> process() {
return (input, a) ->
input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void close() {
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}
The process
API method call is a terminal operation while the transform
API is non terminal and gives you a potentially transformed KStream
using which you can continue further processing using either the DSL or the processor API.
3.13. Partition support on the outbound
A Kafka Streams processor usually sends the processed output into an outbound Kafka topic.
If the outbound topic is partitioned and the processor needs to send the outgoing data into particular partitions, the applications needs to provide a bean of type StreamPartitioner
.
See StreamPartitioner for more details.
Let’s see some examples.
This is the same processor we already saw multiple times,
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
...
}
Here is the output binding destination:
spring.cloud.stream.bindings.process-out-0.destination: outputTopic
If the topic outputTopic
has 4 partitions, if you don’t provide a partitioning strategy, Kafka Streams will use default partitioning strategy which may not be the outcome you want depending on the particular use case.
Let’s say, you want to send any key that matches to spring
to partition 0, cloud
to partition 1, stream
to partition 2, and everything else to partition 3.
This is what you need to do in the application.
@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
return (t, k, v, n) -> {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("stream")) {
return 2;
}
else {
return 3;
}
};
}
This is a rudimentary implementation, however, you have access to the key/value of the record, the topic name and the total number of partitions. Therefore, you can implement complex partitioning strategies if need be.
You also need to provide this bean name along with the application configuration.
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner
Each output topic in the application needs to be configured separately like this.
3.14. StreamsBuilderFactoryBean Additional Customizations
It is often required to customize the StreamsBuilderFactoryBean
that creates the KafkaStreams
objects.
Based on the underlying support provided by Spring Kafka, the binder allows you to customize the StreamsBuilderFactoryBean
.
You can use the org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer
from Spring for Apache Kafka project to customize/configure the StreamsBuilderFactoryBean
itself.
Here is an example of using the StreamsBuilderFactoryBeanConfigurer
.
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
The above is shown as an illustration of the things you can do to configure the StreamsBuilderFactoryBean
.
You can essentially call any available mutation operations from StreamsBuilderFactoryBean
to configure it.
This configurer will be invoked by the binder right before the factory bean is started.
Once you get access to the StreamsBuilderFactoryBean
, you can also customize the underlying KafkaStreams
object via the KafkaStreamsCustomizer
.
Here is a blueprint for doing so.
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
KafkaStreamsCustomizer
will be called by the StreamsBuilderFactoryBean
right before the underlying KafkaStreams
gets started.
There can only be one StreamsBuilderFactoryBeanConfigurer
in the entire application.
Then how do we account for multiple Kafka Streams processors as each of them are backed up by individual StreamsBuilderFactoryBean
objects?
In that case, if the customization needs to be different for those processors, then the application needs to apply some filter based on the application ID.
For e.g,
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
}
};
3.14.1. Using Customizer to register a global state store
As mentioned above, the binder does not provide a first class way to register global state stores as a feature. For that, you need to use the customizer. Here is how that can be done.
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
try {
final StreamsBuilder streamsBuilder = fb.getObject();
streamsBuilder.addGlobalStore(...);
}
catch (Exception e) {
}
};
}
Again, if you have multiple processors, you want to attach the global state store to the right StreamsBuilder
by filtering out the other StreamsBuilderFactoryBean
objects using the application id as outlined above.
3.14.2. Using customizer to register a production exception handler
In the error handling section, we indicated that the binder does not provide a first class way to deal with production exceptions.
Though that is the case, you can still use the StreamsBuilderFacotryBean
customizer to register production exception handlers. See below.
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}
Once again, if you have multiple processors, you may want to set it appropriately against the correct StreamsBuilderFactoryBean
.
You may also add such production exception handlers using the configuration property (See below for more on that), but this is an option if you choose to go with a programmatic approach.
3.15. Timestamp extractor
Kafka Streams allows you to control the processing of the consumer records based on various notions of timestamp.
By default, Kafka Streams extracts the timestamp metadata embedded in the consumer record.
You can change this default behavior by providing a different TimestampExtractor
implementation per input binding.
Here are some details on how that can be done.
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
return orderStream ->
customers ->
products -> orderStream;
}
@Bean
public TimestampExtractor timestampExtractor() {
return new WallclockTimestampExtractor();
}
Then you set the above TimestampExtractor
bean name per consumer binding.
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"
If you skip an input consumer binding for setting a custom timestamp extractor, that consumer will use the default settings.
3.16. Multi binders with Kafka Streams based binders and regular Kafka Binder
You can have an application where you have both a function/consumer/supplier that is based on the regular Kafka binder and a Kafka Streams based processor. However, you cannot mix both of them within a single function or consumer.
Here is an example, where you have both binder based components within the same application.
@Bean
public Function<String, String> process() {
return s -> s;
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {
return input -> input;
}
This is the relevant parts from the configuration:
spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
Things become a bit more complex if you have the same application as above, but is dealing with two different Kafka clusters, for e.g. the regular process
is acting upon both Kafka cluster 1 and cluster 2 (receiving data from cluster-1 and sending to cluster-2) and the Kafka Streams processor is acting upon Kafka cluster 2.
Then you have to use the multi binder facilities provided by Spring Cloud Stream.
Here is how your configuration may change in that scenario.
# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2
# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
Pay attention to the above configuration.
We have two kinds of binders, but 3 binders all in all, first one is the regular Kafka binder based on cluster 1 (kafka1
), then another Kafka binder based on cluster 2 (kafka2
) and finally the kstream
one (kafka3
).
The first processor in the application receives data from kafka1
and publishes to kafka2
where both binders are based on regular Kafka binder but differnt clusters.
The second processor, which is a Kafka Streams processor consumes data from kafka3
which is the same cluster as kafka2
, but a different binder type.
Since there are three different binder types available in the Kafka Streams family of binders - kstream
, ktable
and globalktable
- if your application has multiple bindings based on any of these binders, that needs to be explicitly provided as the binder type.
For e.g if you have a processor as below,
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
...
}
then, this has to be configured in a multi binder scenario as the following. Please note that this is only needed if you have a true multi-binder scenario where there are multiple processors dealing with multiple clusters within a single application. In that case, the binders need to be explicitly provided with the bindings to distinguish from other processor’s binder types and clusters.
spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream
# rest of the configuration is omitted.
3.17. State Cleanup
By default, no local state is cleaned up when the binding is stopped.
This is the same behavior effective from Spring Kafka version 2.7.
See Spring Kafka documentation for more details.
To modify this behavior simply add a single CleanupConfig
@Bean
(configured to clean up on start, stop, or neither) to the application context; the bean will be detected and wired into the factory bean.
3.18. Kafka Streams topology visualization
Kafka Streams binder provides the following actuator endpoints for retrieving the topology description using which you can visualize the topology using external tools.
/actuator/kafkastreamstopology
/actuator/kafkastreamstopology/<application-id of the processor>
You need to include the actuator and web dependencies from Spring Boot to access these endpoints.
Further, you also need to add kafkastreamstopology
to management.endpoints.web.exposure.include
property.
By default, the kafkastreamstopology
endpoint is disabled.
3.19. Event type based routing in Kafka Streams applications
Routing functions available in regular message channel based binders are not supported in Kafka Streams binder. However, Kafka Streams binder still provides routing capabilities through the event type record header on the inbound records.
To enable routing based on event types, the application must provide the following property.
spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes
.
This can be a comma separated value.
For example, lets assume we have this function:
@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
return input -> input;
}
Let us also assume that we only want the business logic in this function to be executed, if the incoming record has event types as foo
or bar
.
That can be expressed as below using the eventTypes
property on the binding.
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar
Now, when the application runs, the binder checks each incoming records for the header event_type
and see if it has value set as foo
or bar
.
If it does not find either of them, then the function execution will be skipped.
By default, the binder expects the record header key to be event_type
, but that can be changed per binding.
For instance, if we want to change the header key on this binding to my_event
instead of the default, that can be changed as below.
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event
.
When using the event routing feature in Kafkfa Streams binder, it uses the byte array Serde
to deserialze all incoming records.
If the record headers match the event type, then only it uses the actual Serde
to do a proper deserialization using either the configured or the inferred Serde
.
This introduces issues if you set a deserialization exception handler on the binding as the expected deserialization only happens down the stack causing unexpected errors.
In order to address this issue, you can set the following property on the binding to force the binder to use the configured or inferred Serde
instead of byte array Serde
.
spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents
This way, the application can detect deserialization issues right away when using the event routing feature and can take appropriate handling decisions.
3.20. Binding visualization and control in Kafka Streams binder
Starting with version 3.1.2, Kafka Streams binder supports binding visualization and control.
The only two lifecycle phases supported are STOPPED
and STARTED
.
The lifecycle phases PAUSED
and RESUMED
are not available in Kafka Streams binder.
In order to activate binding visualization and control, the application needs to include the following two dependencies.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
If you prefer using webflux, you can then include spring-boot-starter-webflux
instead of the standard web dependency.
In addition, you also need to set the following property:
management.endpoints.web.exposure.include=bindings
To illustrate this feature further, let us use the following application as a guide:
@SpringBootApplication
public class KafkaStreamsApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsApplication.class, args);
}
@Bean
public Consumer<KStream<String, String>> consumer() {
return s -> s.foreach((key, value) -> System.out.println(value));
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> function() {
return ks -> ks;
}
}
As we can see, the application has two Kafka Streams functions - one, a consumer and another a function.
The consumer binding is named by default as consumer-in-0
.
Similarly, for the function, the input binding is function-in-0
and the output binding is function-out-0
.
Once the application is started, we can find details about the bindings using the following bindings endpoint.
curl http://localhost:8080/actuator/bindings | jq .
[
{
"bindingName": "consumer-in-0",
"name": "consumer-in-0",
"group": "consumer-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-in-0",
"name": "function-in-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-out-0",
"name": "function-out-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": false,
"extendedInfo": {}
}
]
The details about all three bindings can be found above.
Let us now stop the consumer-in-0 binding.
curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
At this point, no records will be received through this binding.
Start the binding again.
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
When there are multiple bindings present on a single function, invoking these operations on any of those bindings will work.
This is because all the bindings on a single function are backed by the same StreamsBuilderFactoryBean
.
Therefore, for the function above, either function-in-0
or function-out-0
will work.
3.21. Manually starting Kafka Streams processors
Spring Cloud Stream Kafka Streams binder offers an abstraction called StreamsBuilderFactoryManager
on top of the StreamsBuilderFactoryBean
from Spring for Apache Kafka.
This manager API is used for controlling the multiple StreamsBuilderFactoryBean
per processor in a binder based application.
Therefore, when using the binder, if you manually want to control the auto starting of the various StreamsBuilderFactoryBean
objects in the application, you need to use StreamsBuilderFactoryManager
.
You can use the property spring.kafka.streams.auto-startup
and set this to false
in order to turn off auto starting of the processors.
Then, in the application, you can use something as below to start the processors using StreamsBuilderFactoryManager
.
@Bean
public ApplicationRunner runner(StreamsBuilderFactoryManager sbfm) {
return args -> {
sbfm.start();
};
}
This feature is handy, when you want your application to start in the main thread and let Kafka Streams processors start separately.
For example, when you have a large state store that needs to be restored, if the processors are started normally as is the default case, this may block your application to start.
If you are using some sort of liveness probe mechanism (for example on Kubernetes), it may think that the application is down and attempt a restart.
In order to correct this, you can set spring.kafka.streams.auto-startup
to false
and follow the approach above.
Keep in mind that, when using the Spring Cloud Stream binder, you are not directly dealing with StreamsBuilderFactoryBean
from Spring for Apache Kafka, rather StreamsBuilderFactoryManager
, as the StreamsBuilderFactoryBean
objects are internally managed by the binder.
3.22. Manually starting Kafka Streams processors selectively
While the approach laid out above will unconditionally apply auto start false
to all the Kafka Streams processors in the application through StreamsBuilderFactoryManager
, it is often desirable that only individually selected Kafka Streams processors are not auto started.
For instance, let us assume that you have three different functions (processors) in your application and for one of the processors, you do not want to start it as part of the application startup.
Here is an example of such a situation.
@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {
}
@Bean
public Consumer<KStream<?, ?>> process2() {
}
@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {
}
In this scenario above, if you set spring.kafka.streams.auto-startup
to false
, then none of the processors will auto start during the application startup.
In that case, you have to programmatically start them as described above by calling start()
on the underlying StreamsBuilderFactoryManager
.
However, if we have a use case to selectively disable only one processor, then you have to set auto-startup
on the individual binding for that processor.
Let us assume that we don’t want our process3
function to auto start.
This is a BiFunction
with two input bindings - process3-in-0
and process3-in-1
.
In order to avoid auto start for this processor, you can pick any of these input bindings and set auto-startup
on them.
It does not matter which binding you pick; if you wish, you can set auto-startup
to false
on both of them, but one will be sufficient.
Because they share the same factory bean, you don’t have to set autoStartup to false on both bindings, but it probably makes sense to do so, for clarity.
Here is the Spring Cloud Stream property that you can use to disable auto startup for this processor.
spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false
or
spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false
Then, you can manually start the processor either using the REST endpoint or using the BindingsEndpoint
API as shown below.
For this, you need to ensure that you have the Spring Boot actuator dependency on the classpath.
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process3-in-0
or
@Autowired
BindingsEndpoint endpoint;
@Bean
public ApplicationRunner runner() {
return args -> {
endpoint.changeState("process3-in-0", State.STARTED);
};
}
See this section from the reference docs for more details on this mechanism.
When controlling the bindings by disabling auto-startup as described in this section, please note that this is only available for consumer bindings.
In other words, if you use the producer binding, process3-out-0 , that does not have any effect in terms of disabling the auto starting of the processor, although this producer binding uses the same StreamsBuilderFactoryBean as the consumer bindings.
|
3.23. Tracing using Spring Cloud Sleuth
When Spring Cloud Sleuth is on the classpath of a Spring Cloud Stream Kafka Streams binder based application, both its consumer and producer are automatically instrumented with tracing information.
However, in order to trace any application specific operations, those need to be explicitly instrumented by the user code.
This can be done by injecting the KafkaStreamsTracing
bean from Spring Cloud Sleuth in the application and then invoke various Kafka Streams operations through this injected bean.
Here are some examples of using it.
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
return new KeyValue<>(value.getRegion(),
value.getClicks());
}))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum, Materialized.as(CLICK_UPDATES))
.toStream());
}
In the example above, there are two places where it adds explicit tracing instrumentation.
First, we are logging the key/value information from the incoming KStream
.
When this information is logged, the associated span and trace IDs get logged as well so that a monitoring system can track them and correlate with the same span id.
Second, when we call a map
operation, instead of calling it directly on the KStream
class, we wrap it inside a transform
operation and then call map
from KafkaStreamsTracing
.
In this case also, the logged message will contain the span ID and trace ID.
Here is another example, where we use the low-level transformer API for accessing the various Kafka Streams headers. When spring-cloud-sleuth is on the classpath, all the tracing headers can also be accessed like this.
@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
return input -> input.transform(kafkaStreamsTracing.transformer(
"transformer-1",
() -> new Transformer<String, String, KeyValue<String, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, String> transform(String key, String value) {
LOG.info("Headers: " + this.context.headers());
LOG.info("K/V:" + key + "/" + value);
// More transformations, business logic execution, etc. go here.
return KeyValue.pair(key, value);
}
@Override
public void close() {
}
}));
}
3.24. Configuration Options
This section contains the configuration options used by the Kafka Streams binder.
For common configuration options and properties pertaining to binder, refer to the core documentation.
3.24.1. Kafka Streams Binder Properties
The following properties are available at the binder level and must be prefixed with spring.cloud.stream.kafka.streams.binder.
Any Kafka binder provided properties re-used in Kafka Streams binder must be prefixed with spring.cloud.stream.kafka.streams.binder
instead of spring.cloud.stream.kafka.binder
.
The only exception to this rule is when defining the Kafka bootstrap server property in which case either prefix works.
- configuration
-
Map with a key/value pair containing properties pertaining to Apache Kafka Streams API. This property must be prefixed with
spring.cloud.stream.kafka.streams.binder.
. Following are some examples of using this property.
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
For more information about all the properties that may go into streams configuration, see StreamsConfig
JavaDocs in Apache Kafka Streams docs.
All configuration that you can set from StreamsConfig
can be set through this.
When using this property, it is applicable against the entire application since this is a binder level property.
If you have more than processors in the application, all of them will acquire these properties.
In the case of properties like application.id
, this will become problematic and therefore you have to carefully examine how the properties from StreamsConfig
are mapped using this binder level configuration
property.
- functions.<function-bean-name>.applicationId
-
Applicable only for functional style processors. This can be used for setting application ID per function in the application. In the case of multiple functions, this is a handy way to set the application ID.
- functions.<function-bean-name>.configuration
-
Applicable only for functional style processors. Map with a key/value pair containing properties pertaining to Apache Kafka Streams API. This is similar to the binder level
configuration
property describe above, but this level ofconfiguration
property is restricted only against the named function. When you have multiple processors and you want to restrict access to the configuration based on particular functions, you might want to use this. AllStreamsConfig
properties can be used here. - brokers
-
Broker URL
Default:
localhost
- zkNodes
-
Zookeeper URL
Default:
localhost
- deserializationExceptionHandler
-
Deserialization error handler type. This handler is applied at the binder level and thus applied against all input binding in the application. There is a way to control it in a more fine-grained way at the consumer binding level. Possible values are -
logAndContinue
,logAndFail
,skipAndContinue
orsendToDlq
Default:
logAndFail
- applicationId
-
Convenient way to set the application.id for the Kafka Streams application globally at the binder level. If the application contains multiple functions, then the application id should be set differently. See above where setting the application id is discussed in detail.
Default: application will generate a static application ID. See the application ID section for more details.
- stateStoreRetry.maxAttempts
-
Max attempts for trying to connect to a state store.
Default: 1
- stateStoreRetry.backoffPeriod
-
Backoff period when trying to connect to a state store on a retry.
Default: 1000 ms
- consumerProperties
-
Arbitrary consumer properties at the binder level.
- producerProperties
-
Arbitrary producer properties at the binder level.
- includeStoppedProcessorsForHealthCheck
-
When bindings for processors are stopped through actuator, then this processor will not participate in the health check by default. Set this property to
true
to enable health check for all processors including the ones that are currently stopped through bindings actuator endpoint.Default: false
3.24.2. Kafka Streams Producer Properties
The following properties are only available for Kafka Streams producers and must be prefixed with spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.
For convenience, if there are multiple output bindings and they all require a common value, that can be configured by using the prefix spring.cloud.stream.kafka.streams.default.producer.
.
- keySerde
-
key serde to use
Default: See the above discussion on message de/serialization
- valueSerde
-
value serde to use
Default: See the above discussion on message de/serialization
- useNativeEncoding
-
flag to enable/disable native encoding
Default:
true
. - streamPartitionerBeanName
-
Custom outbound partitioner bean name to be used at the consumer. Applications can provide custom
StreamPartitioner
as a Spring bean and the name of this bean can be provided to the producer to use instead of the default one.Default: See the discussion above on outbound partition support.
- producedAs
-
Custom name for the sink component to which the processor is producing to.
Deafult:
none
(generated by Kafka Streams)
3.24.3. Kafka Streams Consumer Properties
The following properties are available for Kafka Streams consumers and must be prefixed with spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.
For convenience, if there are multiple input bindings and they all require a common value, that can be configured by using the prefix spring.cloud.stream.kafka.streams.default.consumer.
.
- applicationId
-
Setting application.id per input binding.
Default: See above.
- keySerde
-
key serde to use
Default: See the above discussion on message de/serialization
- valueSerde
-
value serde to use
Default: See the above discussion on message de/serialization
- materializedAs
-
state store to materialize when using incoming KTable types
Default:
none
. - useNativeDecoding
-
flag to enable/disable native decoding
Default:
true
. - dlqName
-
DLQ topic name.
Default: See above on the discussion of error handling and DLQ.
- startOffset
-
Offset to start from if there is no committed offset to consume from. This is mostly used when the consumer is consuming from a topic for the first time. Kafka Streams uses
earliest
as the default strategy and the binder uses the same default. This can be overridden tolatest
using this property.Default:
earliest
.
Note: Using resetOffsets
on the consumer does not have any effect on Kafka Streams binder.
Unlike the message channel based binder, Kafka Streams binder does not seek to beginning or end on demand.
- deserializationExceptionHandler
-
Deserialization error handler type. This handler is applied per consumer binding as opposed to the binder level property described before. Possible values are -
logAndContinue
,logAndFail
,skipAndContinue
orsendToDlq
Default:
logAndFail
- timestampExtractorBeanName
-
Specific time stamp extractor bean name to be used at the consumer. Applications can provide
TimestampExtractor
as a Spring bean and the name of this bean can be provided to the consumer to use instead of the default one.Default: See the discussion above on timestamp extractors.
- eventTypes
-
Comma separated list of supported event types for this binding.
Default:
none
- eventTypeHeaderKey
-
Event type header key on each incoming records through this binding.
Default:
event_type
- consumedAs
-
Custom name for the source component from which the processor is consuming from.
Deafult:
none
(generated by Kafka Streams)
3.24.4. Special note on concurrency
In Kafka Streams, you can control of the number of threads a processor can create using the num.stream.threads
property.
This, you can do using the various configuration
options described above under binder, functions, producer or consumer level.
You can also use the concurrency
property that core Spring Cloud Stream provides for this purpose.
When using this, you need to use it on the consumer.
When you have more than one input binding, set this on the first input binding.
For e.g. when setting spring.cloud.stream.bindings.process-in-0.consumer.concurrency
, it will be translated as num.stream.threads
by the binder.
If you have multiple processors and one processor defines binding level concurrency, but not the others, those ones with no binding level concurrency will default back to the binder wide property specified through
spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads
.
If this binder configuration is not available, then the application will use the default set by Kafka Streams.
4. Tips, Tricks and Recipes
4.1. Simple DLQ with Kafka
4.1.1. Problem Statement
As a developer, I want to write a consumer application that processes records from a Kafka topic. However, if some error occurs in processing, I don’t want the application to stop completely. Instead, I want to send the record in error to a DLT (Dead-Letter-Topic) and then continue processing new records.
4.1.2. Solution
The solution for this problem is to use the DLQ feature in Spring Cloud Stream. For the purposes of this discussion, let us assume that the following is our processor function.
@Bean
public Consumer<byte[]> processData() {
return s -> {
throw new RuntimeException();
};
}
This is a very trivial function that throws an exception for all the records that it processes, but you can take this function and extend it to any other similar situations.
In order to send the records in error to a DLT, we need to provide the following configuration.
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
In order to activate DLQ, the application must provide a group name.
Anonymous consumers cannot use the DLQ facilities.
We also need to enable DLQ by setting the enableDLQ
property on the Kafka consumer binding to true
.
Finally, we can optionally provide the DLT name by providing the dlqName
on Kafka consumer binding, which otherwise default to error.input-topic.my-group
in this case.
Note that in the example consumer provided above, the type of the payload is byte[]
.
By default, the DLQ producer in Kafka binder expects the payload of type byte[]
.
If that is not the case, then we need to provide the configuration for proper serializer.
For example, let us re-write the consumer function as below:
@Bean
public Consumer<String> processData() {
return s -> {
throw new RuntimeException();
};
}
Now, we need to tell Spring Cloud Stream, how we want to serialize the data when writing to the DLT. Here is the modified configuration for this scenario:
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
4.2. DLQ with Advanced Retry Options
4.2.1. Problem Statement
This is similar to the recipe above, but as a developer I would like to configure the way retries are handled.
4.2.2. Solution
If you followed the above recipe, then you get the default retry options built into the Kafka binder when the processing encounters an error.
By default, the binder retires for a maximum of 3 attempts with a one second initial delay, 2.0 multiplier with each back off with a max delay of 10 seconds. You can change all these configurations as below:
spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval
If you want, you can also provide a list of retryable exceptions by providing a map of boolean values. For example,
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false
By default, any exceptions not listed in the map above will be retried. If that is not desired, then you can disable that by providing,
spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false
You can also provide your own RetryTemplate
and mark it as @StreamRetryTemplate
which will be scanned and used by the binder.
This is useful when you want more sophisticated retry strategies and policies.
If you have multiple @StreamRetryTemplate
beans, then you can specify which one your binding wants by using the property,
spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>
4.3. Handling Deserialization errors with DLQ
4.3.1. Problem Statement
I have a processor that encounters a deserialization exception in Kafka consumer. I would expect that the Spring Cloud Stream DLQ mechanism will catch that scenario, but it does not. How can I handle this?
4.3.2. Solution
The normal DLQ mechanism offered by Spring Cloud Stream will not help when Kafka consumer throws an irrecoverable deserialization exception.
This is because, this exception happens even before the consumer’s poll()
method returns.
Spring for Apache Kafka project offers some great ways to help the binder with this situation.
Let us explore those.
Assuming this is our function:
@Bean
public Consumer<String> functionName() {
return s -> {
System.out.println(s);
};
}
It is a trivial function that takes a String
parameter.
We want to bypass the message converters provided by Spring Cloud Stream and want to use native deserializers instead.
In the case of String
types, it does not make much sense, but for more complex types like AVRO etc. you have to rely on external deserializers and therefore want to delegate the conversion to Kafka.
Now when the consumer receives the data, let us assume that there is a bad record that causes a deserialization error, maybe someone passed an Integer
instead of a String
for example.
In that case, if you don’t do something in the application, the exception will be propagated through the chain and your application will exit eventually.
In order to handle this, you can add a ListenerContainerCustomizer
@Bean
that configures a DefaultErrorHandler
.
This DefaultErrorHandler
is configured with a DeadLetterPublishingRecoverer
.
We also need to configure an ErrorHandlingDeserializer
for the consumer.
That sounds like a lot of complex things, but in reality, it boils down to these 3 beans in this case.
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setCommonErrorHandler(errorHandler);
};
}
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
Let us analyze each of them.
The first one is the ListenerContainerCustomizer
bean that takes a DefaultErrorHandler
.
The container is now customized with that particular error handler.
You can learn more about container customization here.
The second bean is the DefaultErrorHandler
that is configured with a publishing to a DLT
.
See here for more details on DefaultErrorHandler
.
The third bean is the DeadLetterPublishingRecoverer
that is ultimately responsible for sending to the DLT
.
By default, the DLT
topic is named as the ORIGINAL_TOPIC_NAME.DLT.
You can change that though.
See the docs for more details.
We also need to configure an ErrorHandlingDeserializer through application config.
The ErrorHandlingDeserializer
delegates to the actual deserializer.
In case of errors, it sets key/value of the record to be null and includes the raw bytes of the message.
It then sets the exception in a header and passes this record to the listener, which then calls the registered error handler.
Following is the configuration required:
spring.cloud.stream:
function:
definition: functionName
bindings:
functionName-in-0:
group: group-name
destination: input-topic
consumer:
use-native-decoding: true
kafka:
bindings:
functionName-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
We are providing the ErrorHandlingDeserializer
through the configuration
property on the binding.
We are also indicating that the actual deserializer to delegate is the StringDeserializer
.
Keep in mind that none of the dlq properties above are relevant for the discussions in this recipe. They are purely meant for addressing any application level errors only.
4.4. Basic offset management in Kafka binder
4.4.1. Problem Statement
I want to write a Spring Cloud Stream Kafka consumer application and not sure about how it manages Kafka consumer offsets. Can you explain?
4.4.2. Solution
We encourage you read the docs section on this to get a thorough understanding on it.
Here is it in a gist:
Kafka supports two types of offsets to start with by default - earliest
and latest
.
Their semantics are self-explanatory from their names.
Assuming you are running the consumer for the first time.
If you miss the group.id in your Spring Cloud Stream application, then it becomes an anonymous consumer.
Whenever, you have an anonymous consumer, in that case, Spring Cloud Stream application by default will start from the latest
available offset in the topic partition.
On the other hand, if you explicitly specify a group.id, then by default, the Spring Cloud Stream application will start from the earliest
available offset in the topic partition.
In both cases above (consumers with explicit groups and anonymous groups), the starting offset can be switched around by using the property spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset
and setting it to either earliest
or latest
.
Now, assume that you already ran the consumer before and now starting it again.
In this case, the starting offset semantics in the above case do not apply as the consumer finds an already committed offset for the consumer group (In the case of an anonymous consumer, although the application does not provide a group.id, the binder will auto generate one for you).
It simply picks up from the last committed offset onward.
This is true, even when you have a startOffset
value provided.
However, you can override the default behavior where the consumer starts from the last committed offset by using the resetOffsets
property.
In order to do that, set the property spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets
to true
(which is false
by default).
Then make sure you provide the startOffset
value (either earliest
or latest
).
When you do that and then start the consumer application, each time you start, it starts as if this is starting for the first time and ignore any committed offsets for the partition.
4.5. Seeking to arbitrary offsets in Kafka
4.5.1. Problem Statement
Using Kafka binder, I know that it can set the offset to either earliest
or latest
, but I have a requirement to seek the offset to something in the middle, an arbitrary offset.
Is there a way to achieve this using Spring Cloud Stream Kafka binder?
4.5.2. Solution
Previously we saw how Kafka binder allows you to tackle basic offset management. By default, the binder does not allow you to rewind to an arbitrary offset, at least through the mechanism we saw in that recipe. However, there are some low-level strategies that the binder provides to achieve this use case. Let’s explore them.
First of all, when you want to reset to an arbitrary offset other than earliest
or latest
, make sure to leave the resetOffsets
configuration to its defaults, which is false
.
Then you have to provide a custom bean of type KafkaBindingRebalanceListener
, which will be injected into all consumer bindings.
It is an interface that comes with a few default methods, but here is the method that we are interested in:
/**
* Invoked when partitions are initially assigned or after a rebalance. Applications
* might only want to perform seek operations on an initial assignment. While the
* 'initial' argument is true for each thread (when concurrency is greater than 1),
* implementations should keep track of exactly which partitions have been sought.
* There is a race in that a rebalance could occur during startup and so a topic/
* partition that has been sought on one thread may be re-assigned to another
* thread and you may not wish to re-seek it at that time.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment on the current thread.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
// do nothing
}
Let us look at the details.
In essence, this method will be invoked each time during the initial assignment for a topic partition or after a rebalance.
For better illustration, let us assume that our topic is foo
and it has 4 partitions.
Initially, we are only starting a single consumer in the group and this consumer will consume from all partitions.
When the consumer starts for the first time, all 4 partitions are getting initially assigned.
However, we do not want to start the partitions to consume at the defaults (earliest
since we define a group), rather for each partition, we want them to consume after seeking to arbitrary offsets.
Imagine that you have a business case to consume from certain offsets as below.
Partition start offset
0 1000
1 2000
2 2000
3 1000
This could be achieved by implementing the above method as below.
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);
if (initial) {
partitions.forEach(tp -> {
if (topicPartitionOffset.containsKey(tp)) {
final Long offset = topicPartitionOffset.get(tp);
try {
consumer.seek(tp, offset);
}
catch (Exception e) {
// Handle exceptions carefully.
}
}
});
}
}
This is just a rudimentary implementation.
Real world use cases are much more complex than this and you need to adjust accordingly, but this certainly gives you a basic sketch.
When consumer seek
fails, it may throw some runtime exceptions and you need to decide what to do in those cases.
4.5.3. What if we start a second consumer with the same group id?
When we add a second consumer, a rebalance will occur and some partitions will be moved around.
Let’s say that the new consumer gets partitions 2
and 3
.
When this new Spring Cloud Stream consumer calls this onPartitionsAssigned
method, it will see that this is the initial assignment for partition 2
and 3
on this consumer.
Therefore, it will do the seek operation because of the conditional check on the initial
argument.
In the case of the first consumer, it now only has partitions 0
and 1
However, for this consumer it was simply a rebalance event and not considered as an intial assignment.
Thus, it will not re-seek to the given offsets because of the conditional check on the initial
argument.
4.6. How do I manually acknowledge using Kafka binder?
4.6.1. Problem Statement
Using Kafka binder, I want to manually acknowledge messages in my consumer. How do I do that?
4.6.2. Solution
By default, Kafka binder delegates to the default commit settings in Spring for Apache Kafka project.
The default ackMode
in Spring Kafka is batch
.
See here for more details on that.
There are situations in which you want to disable this default commit behavior and rely on manual commits. Following steps allow you to do that.
Set the property spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode
to either MANUAL
or MANUAL_IMMEDIATE
.
When it is set like that, then there will be a header called kafka_acknowledgment
(from KafkaHeaders.ACKNOWLEDGMENT
) present in the message received by the consumer method.
For example, imagine this as your consumer method.
@Bean
public Consumer<Message<String>> myConsumer() {
return msg -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
};
}
Then you set the property spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode
to MANUAL
or MANUAL_IMMEDIATE
.
4.7. How do I override the default binding names in Spring Cloud Stream?
4.7.1. Problem Statement
Spring Cloud Stream creates default bindings based on the function definition and signature, but how do I override these to more domain friendly names?
4.7.2. Solution
Assume that following is your function signature.
@Bean
public Function<String, String> uppercase(){
...
}
By default, Spring Cloud Stream will create the bindings as below.
-
uppercase-in-0
-
uppercase-out-0
You can override these bindings to something by using the following properties.
spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out
After this, all binding properties must be made on the new names, my-transformer-in
and my-transformer-out
.
Here is another example with Kafka Streams and multiple inputs.
@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}
By default, Spring Cloud Stream will create three different binding names for this function.
-
processOrder-in-0
-
processOrder-in-1
-
processOrder-out-0
You have to use these binding names each time you want to set some configuration on these bindings. You don’t like that, and you want to use more domain-friendly and readable binding names, for example, something like.
-
orders
-
accounts
-
enrichedOrders
You can easily do that by simply setting these three properties
-
spring.cloud.stream.function.bindings.processOrder-in-0=orders
-
spring.cloud.stream.function.bindings.processOrder-in-1=accounts
-
spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders
Once you do that, it overrides the default binding names and any properties that you want to set on them must be on these new binding names.
4.8. How do I send a message key as part of my record?
4.8.1. Problem Statement
I need to send a key along with the payload of the record, is there a way to do that in Spring Cloud Stream?
4.8.2. Solution
It is often necessary that you want to send associative data structure like a map as the record with a key and value. Spring Cloud Stream allows you to do that in a straightforward manner. Following is a basic blueprint for doing this, but you may want to adapt it to your paricular use case.
Here is sample producer method (aka Supplier
).
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
This is a trivial function that sends a message with a String
payload, but also with a key.
Note that we set the key as a message header using KafkaHeaders.MESSAGE_KEY
.
If you want to change the key from the default kafka_messageKey
, then in the configuration, we need to specify this property:
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']
Please note that we use the binding name supplier-out-0
since that is our function name, please update accordingly.
Then, we use this new key when we produce the message.
4.9. How do I use native serializer and deserializer instead of message conversion done by Spring Cloud Stream?
4.9.1. Problem Statement
Instead of using the message converters in Spring Cloud Stream, I want to use native Serializer and Deserializer in Kafka. By default, Spring Cloud Stream takes care of this conversion using its internal built-in message converters. How can I bypass this and delegate the responsibility to Kafka?
4.9.2. Solution
This is really easy to do.
All you have to do is to provide the following property to enable native serialization.
spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true
Then, you need to also set the serializers. There are a couple of ways to do this.
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
or using the binder configuration.
spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
When using the binder way, it is applied against all the bindings whereas setting them at the bindings are per binding.
On the deserializing side, you just need to provide the deserializers as configuration.
For example,
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
You can also set them at the binder level.
There is an optional property that you can set to force native decoding.
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true
However, in the case of Kafka binder, this is unnecessary, as by the time it reaches the binder, Kafka already deserializes them using the configured deserializers.
4.10. Explain how offset resetting work in Kafka Streams binder
4.10.1. Problem Statement
By default, Kafka Streams binder always starts from the earliest offset for a new consumer. Sometimes, it is beneficial or required by the application to start from the latest offset. Kafka Streams binder allows you to do that.
4.10.2. Solution
Before we look at the solution, let us look at the following scenario.
@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
(s, t) -> s.join(t, ...)
...
}
We have a BiConsumer
bean that requires two input bindings.
In this case, the first binding is for a KStream
and the second one is for a KTable
.
When running this application for the first time, by default, both bindings start from the earliest
offset.
What about I want to start from the latest
offset due to some requirements?
You can do this by enabling the following properties.
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest
If you want only one binding to start from the latest
offset and the other to consumer from the default earliest
, then leave the latter binding out from the configuration.
Keep in mind that, once there are committed offsets, these setting are not honored and the committed offsets take precedence.
4.11. Keeping track of successful sending of records (producing) in Kafka
4.11.1. Problem Statement
I have a Kafka producer application and I want to keep track of all my successful sendings.
4.11.2. Solution
Let us assume that we have this following supplier in the application.
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
Then, we need to define a new MessageChannel
bean to capture all the successful send information.
@Bean
public MessageChannel fooRecordChannel() {
return new DirectChannel();
}
Next, define this property in the application configuration to provide the bean name for the recordMetadataChannel
.
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel
At this point, successful sent information will be sent to the fooRecordChannel
.
You can write an IntegrationFlow
as below to see the information.
@Bean
public IntegrationFlow integrationFlow() {
return f -> f.channel("fooRecordChannel")
.handle((payload, messageHeaders) -> payload);
}
In the handle
method, the payload is what got sent to Kafka and the message headers contain a special key called kafka_recordMetadata
.
Its value is a RecordMetadata
that contains information about topic partition, current offset etc.
4.12. Adding custom header mapper in Kafka
4.12.1. Problem Statement
I have a Kafka producer application that sets some headers, but they are missing in the consumer application. Why is that?
4.12.2. Solution
Under normal circumstances, this should be fine.
Imagine, you have the following producer.
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}
On the consumer side, you should still see the header "foo", and the following should not give you any issues.
@Bean
public Consumer<Message<String>> consume() {
return s -> {
final String foo = (String)s.getHeaders().get("foo");
System.out.println(foo);
};
}
If you provide a custom header mapper in the application, then this won’t work.
Let’s say you have an empty KafkaHeaderMapper
in the application.
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
}
};
}
If that is your implementation, then you will miss the foo
header on the consumer.
Chances are that, you may have some logic inside those KafkaHeaderMapper
methods.
You need the following to populate the foo
header.
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String foo = (String) headers.get("foo");
target.add("foo", foo.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header foo = source.lastHeader("foo");
target.put("foo", new String(foo.value()));
}
}
That will properly populate the foo
header from the producer to consumer.
4.12.3. Special note on the id header
In Spring Cloud Stream, the id
header is a special header, but some applications may want to have special custom id headers - something like custom-id
or ID
or Id
.
The first one (custom-id
) will propagate without any custom header mapper from producer to consumer.
However, if you produce with a variant of the framework reserved id
header - such as ID
, Id
, iD
etc. then you will run into issues with the internals of the framework.
See this StackOverflow thread fore more context on this use case.
In that case, you must use a custom KafkaHeaderMapper
to map the case-sensitive id header.
For example, let’s say you have the following producer.
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}
The header Id
above will be gone from the consuming side as it clashes with the framework id
header.
You can provide a custom KafkaHeaderMapper
to solve this issue.
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String myId = (String) headers.get("Id");
target.add("Id", myId.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header Id = source.lastHeader("Id");
target.put("Id", new String(Id.value()));
}
};
}
By doing this, both id
and Id
headers will be available from the producer to the consumer side.
4.13. Producing to multiple topics in transaction
4.13.1. Problem Statement
How do I produce transactional messages to multiple Kafka topics?
For more context, see this StackOverflow question.
4.13.2. Solution
Use transactional support in Kafka binder for transactions and then provide an AfterRollbackProcessor
.
In order to produce to multiple topics, use StreamBridge
API.
Below are the code snippets for this:
@Autowired
StreamBridge bridge;
@Bean
Consumer<String> input() {
return str -> {
System.out.println(str);
this.bridge.send("left", str.toUpperCase());
this.bridge.send("right", str.toLowerCase());
if (str.equals("Fail")) {
throw new RuntimeException("test");
}
};
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
container.setAfterRollbackProcessor(rollbackProcessor);
};
}
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
return new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}
4.13.3. Required Configuration
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right
spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1
in order to test, you can use the following:
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
System.in.read();
template.send("input", "Fail".getBytes());
template.send("input", "Good".getBytes());
};
}
Some important notes:
Please ensure that you don’t have any DLQ settings on the application configuration as we manually configure DLT (By default it will be published to a topic named input.DLT
based on the initial consumer function).
Also, reset the maxAttempts
on consumer binding to 1
in order to avoid retries by the binder.
It will be max tried a total of 3 in the example above (initial try + the 2 attempts in the FixedBackoff
).
See the StackOverflow thread for more details on how to test this code.
If you are using Spring Cloud Stream to test it by adding more consumer functions, make sure to set the isolation-level
on the consumer binding to read-committed
.
This StackOverflow thread is also related to this discussion.
4.14. Pitfalls to avoid when running multiple pollable consumers
4.14.1. Problem Statement
How can I run multiple instances of the pollable consumers and generate unique client.id
for each instance?
4.14.2. Solution
Assuming that I have the following definition:
spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group
When running the application, the Kafka consumer generates a client.id (something like consumer-my-group-1
).
For each instance of the application that is running, this client.id
will be the same, causing unexpected issues.
In order to fix this, you can add the following property on each instance of the application:
spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}
See this GitHub issue for more details.