This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.2.0!

Configuration Options

This section contains the configuration options used by the Kafka Streams binder.

For common configuration options and properties pertaining to binder, refer to the core documentation.

Kafka Streams Binder Properties

The following properties are available at the binder level and must be prefixed with spring.cloud.stream.kafka.streams.binder. Any Kafka binder provided properties re-used in Kafka Streams binder must be prefixed with spring.cloud.stream.kafka.streams.binder instead of spring.cloud.stream.kafka.binder. The only exception to this rule is when defining the Kafka bootstrap server property in which case either prefix works.

configuration

Map with a key/value pair containing properties pertaining to Apache Kafka Streams API. This property must be prefixed with spring.cloud.stream.kafka.streams.binder.. Following are some examples of using this property.

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

For more information about all the properties that may go into streams configuration, see StreamsConfig JavaDocs in Apache Kafka Streams docs. All configuration that you can set from StreamsConfig can be set through this. When using this property, it is applicable against the entire application since this is a binder level property. If you have more than one processor in the application, all of them will acquire these properties. In the case of properties like application.id, this will become problematic and therefore you have to carefully examine how the properties from StreamsConfig are mapped using this binder level configuration property.

functions.<function-bean-name>.applicationId

Applicable only for functional style processors. This can be used for setting application ID per function in the application. In the case of multiple functions, this is a handy way to set the application ID.

functions.<function-bean-name>.configuration

Applicable only for functional style processors. Map with a key/value pair containing properties pertaining to Apache Kafka Streams API. This is similar to the binder level configuration property describe above, but this level of configuration property is restricted only against the named function. When you have multiple processors and you want to restrict access to the configuration based on particular functions, you might want to use this. All StreamsConfig properties can be used here.

brokers

Broker URL

Default: localhost

zkNodes

Zookeeper URL

Default: localhost

deserializationExceptionHandler

Deserialization error handler type. This handler is applied at the binder level and thus applied against all input binding in the application. There is a way to control it in a more fine-grained way at the consumer binding level. Possible values are - logAndContinue, logAndFail, skipAndContinue or sendToDlq

Default: logAndFail

applicationId

Convenient way to set the application.id for the Kafka Streams application globally at the binder level. If the application contains multiple functions, then the application id should be set differently. See above where setting the application id is discussed in detail.

Default: application will generate a static application ID. See the application ID section for more details.

stateStoreRetry.maxAttempts

Max attempts for trying to connect to a state store.

Default: 1

stateStoreRetry.backoffPeriod

Backoff period when trying to connect to a state store on a retry.

Default: 1000 ms

consumerProperties

Arbitrary consumer properties at the binder level.

producerProperties

Arbitrary producer properties at the binder level.

includeStoppedProcessorsForHealthCheck

When bindings for processors are stopped through actuator, then this processor will not participate in the health check by default. Set this property to true to enable health check for all processors including the ones that are currently stopped through bindings actuator endpoint.

Default: false

Kafka Streams Producer Properties

The following properties are only available for Kafka Streams producers and must be prefixed with spring.cloud.stream.kafka.streams.bindings.<binding name>.producer. For convenience, if there are multiple output bindings and they all require a common value, that can be configured by using the prefix spring.cloud.stream.kafka.streams.default.producer..

keySerde

key serde to use

Default: See the above discussion on message de/serialization

valueSerde

value serde to use

Default: See the above discussion on message de/serialization

useNativeEncoding

flag to enable/disable native encoding

Default: true.

streamPartitionerBeanName

Custom outbound partitioner bean name to be used at the consumer. Applications can provide custom StreamPartitioner as a Spring bean and the name of this bean can be provided to the producer to use instead of the default one.

Default: See the discussion above on outbound partition support.

producedAs

Custom name for the sink component to which the processor is producing to.

Deafult: none (generated by Kafka Streams)

Kafka Streams Consumer Properties

The following properties are available for Kafka Streams consumers and must be prefixed with spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer. For convenience, if there are multiple input bindings and they all require a common value, that can be configured by using the prefix spring.cloud.stream.kafka.streams.default.consumer..

applicationId

Setting application.id per input binding.

Default: See above.

keySerde

key serde to use

Default: See the above discussion on message de/serialization

valueSerde

value serde to use

Default: See the above discussion on message de/serialization

materializedAs

state store to materialize when using incoming KTable types

Default: none.

useNativeDecoding

flag to enable/disable native decoding

Default: true.

dlqName

DLQ topic name.

Default: See above on the discussion of error handling and DLQ.

startOffset

Offset to start from if there is no committed offset to consume from. This is mostly used when the consumer is consuming from a topic for the first time. Kafka Streams uses earliest as the default strategy and the binder uses the same default. This can be overridden to latest using this property.

Default: earliest.

Note: Using resetOffsets on the consumer does not have any effect on Kafka Streams binder. Unlike the message channel based binder, Kafka Streams binder does not seek to beginning or end on demand.

deserializationExceptionHandler

Deserialization error handler type. This handler is applied per consumer binding as opposed to the binder level property described before. Possible values are - logAndContinue, logAndFail, skipAndContinue or sendToDlq

Default: logAndFail

timestampExtractorBeanName

Specific time stamp extractor bean name to be used at the consumer. Applications can provide TimestampExtractor as a Spring bean and the name of this bean can be provided to the consumer to use instead of the default one.

Default: See the discussion above on timestamp extractors.

eventTypes

Comma separated list of supported event types for this binding.

Default: none

eventTypeHeaderKey

Event type header key on each incoming records through this binding.

Default: event_type

consumedAs

Custom name for the source component from which the processor is consuming from.

Deafult: none (generated by Kafka Streams)

Special note on concurrency

In Kafka Streams, you can control of the number of threads a processor can create using the num.stream.threads property. This, you can do using the various configuration options described above under binder, functions, producer or consumer level. You can also use the concurrency property that core Spring Cloud Stream provides for this purpose. When using this, you need to use it on the consumer. When you have more than one input binding, set this on the first input binding. For e.g. when setting spring.cloud.stream.bindings.process-in-0.consumer.concurrency, it will be translated as num.stream.threads by the binder. If you have multiple processors and one processor defines binding level concurrency, but not the others, those ones with no binding level concurrency will default back to the binder wide property specified through spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads. If this binder configuration is not available, then the application will use the default set by Kafka Streams.