Error Handling
Apache Kafka Streams provides the capability for natively handling exceptions from deserialization errors.
For details on this support, please see this.
Out of the box, Apache Kafka Streams provides two kinds of deserialization exception handlers - LogAndContinueExceptionHandler
and LogAndFailExceptionHandler
.
As the name indicates, the former will log the error and continue processing the next records and the latter will log the error and fail. LogAndFailExceptionHandler
is the default deserialization exception handler.
Handling Deserialization Exceptions in the Binder
Kafka Streams binder allows to specify the deserialization exception handlers above using the following property.
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
or
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
In addition to the above two deserialization exception handlers, the binder also provides a third one for sending the erroneous records (poison pills) to a DLQ (dead letter queue) topic. Here is how you enable this DLQ exception handler.
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
When the above property is set, all the records in deserialization error are automatically sent to the DLQ topic.
You can set the topic name where the DLQ messages are published as below.
You can provide an implementation for DlqDestinationResolver
which is a functional interface.
DlqDestinationResolver
takes ConsumerRecord
and the exception as inputs and then allows to specify a topic name as the output.
By gaining access to the Kafka ConsumerRecord
, the header records can be introspected in the implementation of the BiFunction
.
Here is an example of providing an implementation for DlqDestinationResolver
.
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
One important thing to keep in mind when providing an implementation for DlqDestinationResolver
is that the provisioner in the binder will not auto create topics for the application.
This is because there is no way for the binder to infer the names of all the DLQ topics the implementation might send to.
Therefore, if you provide DLQ names using this strategy, it is the application’s responsibility to ensure that those topics are created beforehand.
If DlqDestinationResolver
is present in the application as a bean, that takes higher precedence.
If you do not want to follow this approach and rather provide a static DLQ name using configuration, you can set the following property.
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
If this is set, then the error records are sent to the topic custom-dlq
.
If the application is not using either of the above strategies, then it will create a DLQ topic with the name error.<input-topic-name>.<application-id>
.
For instance, if your binding’s destination topic is inputTopic
and the application ID is process-applicationId
, then the default DLQ topic is error.inputTopic.process-applicationId
.
It is always recommended to explicitly create a DLQ topic for each input binding if it is your intention to enable DLQ.
DLQ per input consumer binding
The property spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
is applicable for the entire application.
This implies that if there are multiple functions in the same application, this property is applied to all of them.
However, if you have multiple processors or multiple input bindings within a single processor, then you can use the finer-grained DLQ control that the binder provides per input consumer binding.
If you have the following processor,
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
and you only want to enable DLQ on the first input binding and skipAndContinue on the second binding, then you can do so on the consumer as below.
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue
Setting deserialization exception handlers this way has a higher precedence than setting at the binder level.
DLQ partitioning
By default, records are published to the Dead-Letter topic using the same partition as the original record. This means the Dead-Letter topic must have at least as many partitions as the original record.
To change this behavior, add a DlqPartitionFunction
implementation as a @Bean
to the application context.
Only one such bean can be present.
The function is provided with the consumer group (which is the same as the application ID in most situations), the failed ConsumerRecord
and the exception.
For example, if you always want to route to partition 0, you might use:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
If you set a consumer binding’s dlqPartitions property to 1 (and the binder’s minPartitionCount is equal to 1 ), there is no need to supply a DlqPartitionFunction ; the framework will always use partition 0.
If you set a consumer binding’s dlqPartitions property to a value greater than 1 (or the binder’s minPartitionCount is greater than 1 ), you must provide a DlqPartitionFunction bean, even if the partition count is the same as the original topic’s.
|
A couple of things to keep in mind when using the exception handling feature in Kafka Streams binder.
-
The property
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
is applicable for the entire application. This implies that if there are multiple functions in the same application, this property is applied to all of them. -
The exception handling for deserialization works consistently with native deserialization and framework provided message conversion.
Handling Production Exceptions in the Binder
Unlike the support for deserialization exception handlers as described above, the binder does not provide such first class mechanisms for handling production exceptions.
However, you still can configure production exception handlers using the StreamsBuilderFactoryBean
customizer which you can find more details about, in a subsequent section below.
Runtime Error Handling
When it comes to handling errors from application code, i.e. from the business logic execution, it is usually up to the application to handle that.
Because, the Kafka Streams binder does not have a way to interfere with the application code.
However, to make things a bit easier for the application, the binder provides a convenient RecordRecoverableProcessor
, using which, you can dictate how you want to handle the application level errors.
Consider the following code.
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.map(...);
}
If the business code inside your map
call above throws an exception, it is your responsibility to handle that error.
This is where RecordRecoverableProcessor
becomes handy.
By default, RecordRecoverableProcessor
, will simply log the error and let the application move on.
Let’s say that you want to publish the failed record to a DLT, rather than handling it within the application.
In that case, you must use a custom implementation of RecordRecoverableProcessor
called DltAwareProcessor
.
Here is how you can do that.
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process(DltPublishingContext dltSenderContext) {
return input -> input
.process(() -> new DltAwareProcessor<>(record -> {
throw new RuntimeException("error");
}, "hello-dlt-1", dltPublishingContext));
}
The business logic code from the original map
call now has been moved as part of KStream#process
method call, which takes a ProcessorSupplier
.
We, then, pass in the custom DltAwareProcessor,
which is capable to publishing to a DLT.
The constructor for DltAwareProcessor
above takes three parameters - a Function
that takes the input record and then the business logic operation as part of the Function
body, the DLT topic, and finally a DltPublishingContext
.
When the Function’s lambda expression throws an exception, the `DltAwareProcessor
will send the input record to a DLT.
The DltPublishingContext
provides DltAwareProcessor
the necessary publishing infrastructure beans.
The DltPublishingContext
is autoconfigured by the binder, so that you can inject directly this into the application.
If you do not want the binder to publish failed records to a DLT, then you must use the RecordRecoverableProcessor
directly instead of the DltAwareProcessor
.
You can provide your own recoverer as a BiConsumer
that takes the input Record
and the exception as arguments.
Assume a scenario, in which you do not want to send the record to the DLT, but simply log the message and move on.
Below an example of how you can accomplish that.
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.process(() -> new RecordRecoverableProcessor<>(record -> {
throw new RuntimeException("error");
},
(record, exception) -> {
// Handle the record
}));
}
In this case, when the record fails, the RecordRecoverableProcessor
, uses the user provided recoverer which is a BiConsumer
that takes the failed record and the exception thrown as arguments.
Handling Record Keys in DltAwareProcessor
When sending failed records to a DLT using DltAwareProcessor
, if you want to send the record keys to the DLT topic, then you need to set the proper serializer on the DLT binding.
This is because, DltAwareProcessor
uses StreamBridge
which uses the regular Kafka binder (message-channel based) which by default uses a ByteArraySerializer
for keys.
In the case of record values, Spring Cloud Stream converts the payload to proper byte[]
; however, that is not the case with keys, as it simply pass along what it received in the header as a key.
If you are providing a non-byte array key, then that might cause class cast exceptions and to avoid that you need to set a serializer on the DLT binding as below.
Assuming that the DLT destination is hello-dlt-1
and the record key is of String datatype.
spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer