Configuration Options

This section contains the configuration options used by the Apache Kafka binder.

For common configuration options and properties pertaining to the binder, see the binding properties in core documentation.

Kafka Binder Properties

spring.cloud.stream.kafka.binder.brokers

A list of brokers to which the Kafka binder connects.

Default: localhost.

spring.cloud.stream.kafka.binder.defaultBrokerPort

brokers allows hosts specified with or without port information (for example, host1,host2:port2). This sets the default port when no port is configured in the broker list.

Default: 9092.

spring.cloud.stream.kafka.binder.configuration

Key/Value map of client properties (both producers and consumer) passed to all clients created by the binder. Due to the fact that these properties are used by both producers and consumers, usage should be restricted to common properties — for example, security settings. Unknown Kafka producer or consumer properties provided through this configuration are filtered out and not allowed to propagate. Properties here supersede any properties set in boot.

Default: Empty map.

spring.cloud.stream.kafka.binder.consumerProperties

Key/Value map of arbitrary Kafka client consumer properties. In addition to support known Kafka consumer properties, unknown consumer properties are allowed here as well. Properties here supersede any properties set in boot and in the configuration property above.

Default: Empty map.

spring.cloud.stream.kafka.binder.headers

The list of custom headers that are transported by the binder. Only required when communicating with older applications (⇐ 1.3.x) with a kafka-clients version < 0.11.0.0. Newer versions support headers natively.

Default: empty.

spring.cloud.stream.kafka.binder.healthTimeout

The time to wait to get partition information, in seconds. Health reports as down if this timer expires.

Default: 60.

spring.cloud.stream.kafka.binder.requiredAcks

The number of required acks on the broker. See the Kafka documentation for the producer acks property.

Default: 1.

spring.cloud.stream.kafka.binder.minPartitionCount

Effective only if autoCreateTopics or autoAddPartitions is set. The global minimum number of partitions that the binder configures on topics on which it produces or consumes data. It can be superseded by the partitionCount setting of the producer or by the value of instanceCount * concurrency settings of the producer (if either is larger).

Default: 1.

spring.cloud.stream.kafka.binder.producerProperties

Key/Value map of arbitrary Kafka client producer properties. In addition to support known Kafka producer properties, unknown producer properties are allowed here as well. Properties here supersede any properties set in boot and in the configuration property above.

Default: Empty map.

spring.cloud.stream.kafka.binder.replicationFactor

The replication factor of auto-created topics if autoCreateTopics is active. Can be overridden on each binding.

If you are using Kafka broker versions prior to 2.4, then this value should be set to at least 1. Starting with version 3.0.8, the binder uses -1 as the default value, which indicates that the broker 'default.replication.factor' property will be used to determine the number of replicas. Check with your Kafka broker admins to see if there is a policy in place that requires a minimum replication factor, if that’s the case then, typically, the default.replication.factor will match that value and -1 should be used, unless you need a replication factor greater than the minimum.

Default: -1.

spring.cloud.stream.kafka.binder.autoCreateTopics

If set to true, the binder creates new topics automatically. If set to false, the binder relies on the topics being already configured. In the latter case, if the topics do not exist, the binder fails to start.

This setting is independent of the auto.create.topics.enable setting of the broker and does not influence it. If the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.

Default: true.

spring.cloud.stream.kafka.binder.autoAddPartitions

If set to true, the binder creates new partitions if required. If set to false, the binder relies on the partition size of the topic being already configured. If the partition count of the target topic is smaller than the expected value, the binder fails to start.

Default: false.

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

Enables transactions in the binder. See transaction.id in the Kafka documentation and Transactions in the spring-kafka documentation. When transactions are enabled, individual producer properties are ignored and all producers use the spring.cloud.stream.kafka.binder.transaction.producer.* properties.

Default null (no transactions)

spring.cloud.stream.kafka.binder.transaction.producer.*

Global producer properties for producers in a transactional binder. See spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix and Kafka Producer Properties and the general producer properties supported by all binders.

Default: See individual producer properties.

spring.cloud.stream.kafka.binder.headerMapperBeanName

The bean name of a KafkaHeaderMapper used for mapping spring-messaging headers to and from Kafka headers. Use this, for example, if you wish to customize the trusted packages in a BinderHeaderMapper bean that uses JSON deserialization for the headers. If this custom BinderHeaderMapper bean is not made available to the binder using this property, then the binder will look for a header mapper bean with the name kafkaBinderHeaderMapper that is of type BinderHeaderMapper before falling back to a default BinderHeaderMapper created by the binder.

Default: none.

spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader

Flag to set the binder health as down, when any partitions on the topic, regardless of the consumer that is receiving data from it, is found without a leader.

Default: true.

spring.cloud.stream.kafka.binder.certificateStoreDirectory

When the truststore or keystore certificate location is given as a non-local file system resource (resources supported by org.springframework.core.io.Resource e.g. CLASSPATH, HTTP, etc.), the binder copies the resource from the path (which is convertible to org.springframework.core.io.Resource) to a location on the filesystem. This is true for both broker level certificates (ssl.truststore.location and ssl.keystore.location) and certificates intended for schema registry (schema.registry.ssl.truststore.location and schema.registry.ssl.keystore.location). Keep in mind that the truststore and keystore location paths must be provided under spring.cloud.stream.kafka.binder.configuration…​. For example, spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location, spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location, etc. The file will be copied to the location specified as the value for this property which must be an existing directory on the filesystem that is writable by the process running the application. If this value is not set and the certificate file is a non-local file system resource, then it will be copied to System’s temp directory as returned by System.getProperty("java.io.tmpdir"). This is also true, if this value is present, but the directory cannot be found on the filesystem or is not writable.

Default: none.

spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled

When set to true, the offset lag metric of each consumer topic is computed whenever the metric is accessed. When set to false only the periodically calculated offset lag is used.

Default: true

spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval

The interval in which the offset lag for each consumer topic is computed. This value is used whenever metrics.defaultOffsetLagMetricsEnabled is disabled or its computation is taking too long.

Default: 60 seconds

spring.cloud.stream.kafka.binder.enableObservation

Enable Micrometer observation registry on all the bindings in this binder.

Default: false

spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup

KafkaHealthIndicator metadata consumer group.id. This consumer is used by the HealthIndicator to query the metadata about the topics in use.

Default: none.

Kafka Consumer Properties

The following properties are available for Kafka consumers only and must be prefixed with spring.cloud.stream.kafka.bindings.<channelName>.consumer..

To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of spring.cloud.stream.kafka.default.consumer.<property>=<value>.
admin.configuration

Since version 2.1.1, this property is deprecated in favor of topic.properties, and support for it will be removed in a future version.

admin.replicas-assignment

Since version 2.1.1, this property is deprecated in favor of topic.replicas-assignment, and support for it will be removed in a future version.

admin.replication-factor

Since version 2.1.1, this property is deprecated in favor of topic.replication-factor, and support for it will be removed in a future version.

autoRebalanceEnabled

When true, topic partitions is automatically rebalanced between the members of a consumer group. When false, each consumer is assigned a fixed set of partitions based on spring.cloud.stream.instanceCount and spring.cloud.stream.instanceIndex. This requires both the spring.cloud.stream.instanceCount and spring.cloud.stream.instanceIndex properties to be set appropriately on each launched instance. The value of the spring.cloud.stream.instanceCount property must typically be greater than 1 in this case.

Default: true.

ackEachRecord

When autoCommitOffset is true, this setting dictates whether to commit the offset after each record is processed. By default, offsets are committed after all records in the batch of records returned by consumer.poll() have been processed. The number of records returned by a poll can be controlled with the max.poll.records Kafka property, which is set through the consumer configuration property. Setting this to true may cause a degradation in performance, but doing so reduces the likelihood of redelivered records when a failure occurs. Also, see the binder requiredAcks property, which also affects the performance of committing offsets. This property is deprecated as of 3.1 in favor of using ackMode. If the ackMode is not set and batch mode is not enabled, RECORD ackMode will be used.

Default: false.

autoCommitOffset

Starting with version 3.1, this property is deprecated. See ackMode for more details on alternatives. Whether to autocommit offsets when a message has been processed. If set to false, a header with the key kafka_acknowledgment of the type org.springframework.kafka.support.Acknowledgment header is present in the inbound message. Applications may use this header for acknowledging messages. See the examples section for details. When this property is set to false, Kafka binder sets the ack mode to org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL and the application is responsible for acknowledging records. Also see ackEachRecord.

Default: true.

ackMode

Specify the container ack mode. This is based on the AckMode enumeration defined in Spring Kafka. If ackEachRecord property is set to true and consumer is not in batch mode, then this will use the ack mode of RECORD, otherwise, use the provided ack mode using this property.

autoCommitOnError

In pollable consumers, if set to true, it always auto commits on error. If not set (the default) or false, it will not auto commit in pollable consumers. Note that this property is only applicable for pollable consumers.

Default: not set.

resetOffsets

Whether to reset offsets on the consumer to the value provided by startOffset. Must be false if a KafkaBindingRebalanceListener is provided; see rebalance listener See reset-offsets for more information about this property.

Default: false.

startOffset

The starting offset for new groups. Allowed values: earliest and latest. If the consumer group is set explicitly for the consumer 'binding' (through spring.cloud.stream.bindings.<channelName>.group), 'startOffset' is set to earliest. Otherwise, it is set to latest for the anonymous consumer group. See reset-offsets for more information about this property.

Default: null (equivalent to earliest).

enableDlq

When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named error.<destination>.<group>. The DLQ topic name can be configurable by setting the dlqName property or by defining a @Bean of type DlqDestinationResolver. This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome. See kafka dlq processing for more information. Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers: x-original-topic, x-exception-message, and x-exception-stacktrace as byte[]. By default, a failed record is sent to the same partition number in the DLQ topic as the original record. See dlq partition selection for how to change that behavior. Not allowed when destinationIsPattern is true.

Default: false.

dlqPartitions

When enableDlq is true, and this property is not set, a dead letter topic with the same number of partitions as the primary topic(s) is created. Usually, dead-letter records are sent to the same partition in the dead-letter topic as the original record. This behavior can be changed; see dlq partition selection. If this property is set to 1 and there is no DqlPartitionFunction bean, all dead-letter records will be written to partition 0. If this property is greater than 1, you MUST provide a DlqPartitionFunction bean. Note that the actual partition count is affected by the binder’s minPartitionCount property.

Default: none

configuration

Map with a key/value pair containing generic Kafka consumer properties. In addition to having Kafka consumer properties, other configuration properties can be passed here. For example some properties needed by the application such as spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar. The bootstrap.servers property cannot be set here; use multi-binder support if you need to connect to multiple clusters.

Default: Empty map.

dlqName

The name of the DLQ topic to receive the error messages.

Default: null (If not specified, messages that result in errors are forwarded to a topic named error.<destination>.<group>).

dlqProducerProperties

Using this, DLQ-specific producer properties can be set. All the properties available through kafka producer properties can be set through this property. When native decoding is enabled on the consumer (i.e., useNativeDecoding: true) , the application must provide corresponding key/value serializers for DLQ. This must be provided in the form of dlqProducerProperties.configuration.key.serializer and dlqProducerProperties.configuration.value.serializer.

Default: Default Kafka producer properties.

standardHeaders

Indicates which standard headers are populated by the inbound channel adapter. Allowed values: none, id, timestamp, or both. Useful if using native deserialization and the first component to receive a message needs an id (such as an aggregator that is configured to use a JDBC message store).

Default: none

converterBeanName

The name of a bean that implements RecordMessageConverter. Used in the inbound channel adapter to replace the default MessagingMessageConverter.

Default: null

idleEventInterval

The interval, in milliseconds, between events indicating that no messages have recently been received. Use an ApplicationListener<ListenerContainerIdleEvent> to receive these events. See pause-resume for a usage example.

Default: 30000

destinationIsPattern

When true, the destination is treated as a regular expression Pattern used to match topic names by the broker. When true, topics are not provisioned, and enableDlq is not allowed, because the binder does not know the topic names during the provisioning phase. Note, the time taken to detect new topics that match the pattern is controlled by the consumer property metadata.max.age.ms, which (at the time of writing) defaults to 300,000ms (5 minutes). This can be configured using the configuration property above.

Default: false

topic.properties

A Map of Kafka topic properties used when provisioning new topics — for example, spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0

Default: none.

topic.replicas-assignment

A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and the value being the assignments. Used when provisioning new topics. See the NewTopic Javadocs in the kafka-clients jar.

Default: none.

topic.replication-factor

The replication factor to use when provisioning topics. Overrides the binder-wide setting. Ignored if replicas-assignments is present.

Default: none (the binder-wide default of -1 is used).

pollTimeout

Timeout used for polling in pollable consumers.

Default: 5 seconds.

transactionManager

Bean name of a KafkaAwareTransactionManager used to override the binder’s transaction manager for this binding. Usually needed if you want to synchronize another transaction with the Kafka transaction, using the ChainedKafkaTransactionManaager. To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.

Default: none.

txCommitRecovered

When using a transactional binder, the offset of a recovered record (e.g. when retries are exhausted and the record is sent to a dead letter topic) will be committed via a new transaction, by default. Setting this property to false suppresses committing the offset of recovered record.

Default: true.

commonErrorHandlerBeanName

CommonErrorHandler bean name to use per consumer binding. When present, this user provided CommonErrorHandler takes precedence over any other error handlers defined by the binder. This is a handy way to express error handlers, if the application does not want to use a ListenerContainerCustomizer and then check the destination/group combination to set an error handler.

Default: none.

Kafka Producer Properties

The following properties are available for Kafka producers only and must be prefixed with spring.cloud.stream.kafka.bindings.<channelName>.producer..

To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of spring.cloud.stream.kafka.default.producer.<property>=<value>.
admin.configuration

Since version 2.1.1, this property is deprecated in favor of topic.properties, and support for it will be removed in a future version.

admin.replicas-assignment

Since version 2.1.1, this property is deprecated in favor of topic.replicas-assignment, and support for it will be removed in a future version.

admin.replication-factor

Since version 2.1.1, this property is deprecated in favor of topic.replication-factor, and support for it will be removed in a future version.

bufferSize

Upper limit, in bytes, of how much data the Kafka producer attempts to batch before sending.

Default: 16384.

sync

Whether the producer is synchronous.

Default: false.

sendTimeoutExpression

A SpEL expression evaluated against the outgoing message used to evaluate the time to wait for ack when synchronous publish is enabled — for example, headers['mySendTimeout']. The value of the timeout is in milliseconds. With versions before 3.0, the payload could not be used unless native encoding was being used because, by the time this expression was evaluated, the payload was already in the form of a byte[]. Now, the expression is evaluated before the payload is converted.

Default: none.

batchTimeout

How long the producer waits to allow more messages to accumulate in the same batch before sending the messages. (Normally, the producer does not wait at all and simply sends all the messages that accumulated while the previous send was in progress.) A non-zero value may increase throughput at the expense of latency.

Default: 0.

messageKeyExpression

A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message — for example, headers['myKey']. With versions before 3.0, the payload could not be used unless native encoding was being used because, by the time this expression was evaluated, the payload was already in the form of a byte[]. Now, the expression is evaluated before the payload is converted. In the case of a regular processor (Function<String, String> or Function<Message<?>, Message<?>), if the produced key needs to be same as the incoming key from the topic, this property can be set as below. spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey'] There is an important caveat to keep in mind for reactive functions. In that case, it is up to the application to manually copy the headers from the incoming messages to outbound messages. You can set the header, e.g. myKey and use headers['myKey'] as suggested above or, for convenience, simply set the KafkaHeaders.MESSAGE_KEY header, and you do not need to set this property at all.

Default: none.

headerPatterns

A comma-delimited list of simple patterns to match Spring messaging headers to be mapped to the Kafka Headers in the ProducerRecord. Patterns can begin or end with the wildcard character (asterisk). Patterns can be negated by prefixing with !. Matching stops after the first match (positive or negative). For example !ask,as* will pass ash but not ask. id and timestamp are never mapped.

Default: * (all headers - except the id and timestamp)

configuration

Map with a key/value pair containing generic Kafka producer properties. The bootstrap.servers property cannot be set here; use multi-binder support if you need to connect to multiple clusters.

Default: Empty map.

topic.properties

A Map of Kafka topic properties used when provisioning new topics — for example, spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0

topic.replicas-assignment

A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and the value being the assignments. Used when provisioning new topics. See the NewTopic Javadocs in the kafka-clients jar.

Default: none.

topic.replication-factor

The replication factor to use when provisioning topics. Overrides the binder-wide setting. Ignored if replicas-assignments is present.

Default: none (the binder-wide default of -1 is used).

useTopicHeader

Set to true to override the default binding destination (topic name) with the value of the KafkaHeaders.TOPIC message header in the outbound message. If the header is not present, the default binding destination is used.

Default: false.

recordMetadataChannel

The bean name of a MessageChannel to which successful send results should be sent; the bean must exist in the application context. The message sent to the channel is the sent message (after conversion, if any) with an additional header KafkaHeaders.RECORD_METADATA. The header contains a RecordMetadata object provided by the Kafka client; it includes the partition and offset where the record was written in the topic.

ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)

Failed sends go the producer error channel (if configured); see Kafka error channels.

Default: null.

The Kafka binder uses the partitionCount setting of the producer as a hint to create a topic with the given partition count (in conjunction with the minPartitionCount, the maximum of the two being the value being used). Exercise caution when configuring both minPartitionCount for a binder and partitionCount for an application, as the larger value is used. If a topic already exists with a smaller partition count and autoAddPartitions is disabled (the default), the binder fails to start. If a topic already exists with a smaller partition count and autoAddPartitions is enabled, new partitions are added. If a topic already exists with a larger number of partitions than the maximum of (minPartitionCount or partitionCount), the existing partition count is used.
compression

Set the compression.type producer property. Supported values are none, gzip, snappy, lz4 and zstd. If you override the kafka-clients jar to 2.1.0 (or later), as discussed in the Spring for Apache Kafka documentation, and wish to use zstd compression, use spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd.

Default: none.

transactionManager

Bean name of a KafkaAwareTransactionManager used to override the binder’s transaction manager for this binding. Usually needed if you want to synchronize another transaction with the Kafka transaction, using the ChainedKafkaTransactionManaager. To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.

Default: none.

closeTimeout

Timeout in number of seconds to wait for when closing the producer.

Default: 30

allowNonTransactional

Normally, all output bindings associated with a transactional binder will publish in a new transaction, if one is not already in process. This property allows you to override that behavior. If set to true, records published to this output binding will not be run in a transaction, unless one is already in process.

Default: false