For the latest stable version, please use spring-cloud-stream 4.2.0! |
Event type based routing in Kafka Streams applications
Routing functions available in regular message channel based binders are not supported in Kafka Streams binder. However, Kafka Streams binder still provides routing capabilities through the event type record header on the inbound records.
To enable routing based on event types, the application must provide the following property.
spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes
.
This can be a comma separated value.
For example, lets assume we have this function:
@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
return input -> input;
}
Let us also assume that we only want the business logic in this function to be executed, if the incoming record has event types as foo
or bar
.
That can be expressed as below using the eventTypes
property on the binding.
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar
Now, when the application runs, the binder checks each incoming records for the header event_type
and see if it has value set as foo
or bar
.
If it does not find either of them, then the function execution will be skipped.
By default, the binder expects the record header key to be event_type
, but that can be changed per binding.
For instance, if we want to change the header key on this binding to my_event
instead of the default, that can be changed as below.
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event
.
When using the event routing feature in Kafkfa Streams binder, it uses the byte array Serde
to deserialze all incoming records.
If the record headers match the event type, then only it uses the actual Serde
to do a proper deserialization using either the configured or the inferred Serde
.
This introduces issues if you set a deserialization exception handler on the binding as the expected deserialization only happens down the stack causing unexpected errors.
In order to address this issue, you can set the following property on the binding to force the binder to use the configured or inferred Serde
instead of byte array Serde
.
spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents
This way, the application can detect deserialization issues right away when using the event routing feature and can take appropriate handling decisions.