Error Handling

Apache Kafka Streams provides the capability for natively handling exceptions from deserialization errors. For details on this support, please see this. Out of the box, Apache Kafka Streams provides two kinds of deserialization exception handlers - LogAndContinueExceptionHandler and LogAndFailExceptionHandler. As the name indicates, the former will log the error and continue processing the next records and the latter will log the error and fail. LogAndFailExceptionHandler is the default deserialization exception handler.

Handling Deserialization Exceptions in the Binder

Kafka Streams binder allows to specify the deserialization exception handlers above using the following property.

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue

or

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail

In addition to the above two deserialization exception handlers, the binder also provides a third one for sending the erroneous records (poison pills) to a DLQ (dead letter queue) topic. Here is how you enable this DLQ exception handler.

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq

When the above property is set, all the records in deserialization error are automatically sent to the DLQ topic.

You can set the topic name where the DLQ messages are published as below.

You can provide an implementation for DlqDestinationResolver which is a functional interface. DlqDestinationResolver takes ConsumerRecord and the exception as inputs and then allows to specify a topic name as the output. By gaining access to the Kafka ConsumerRecord, the header records can be introspected in the implementation of the BiFunction.

Here is an example of providing an implementation for DlqDestinationResolver.

@Bean
public DlqDestinationResolver dlqDestinationResolver() {
    return (rec, ex) -> {
        if (rec.topic().equals("word1")) {
            return "topic1-dlq";
        }
        else {
            return "topic2-dlq";
        }
    };
}

One important thing to keep in mind when providing an implementation for DlqDestinationResolver is that the provisioner in the binder will not auto create topics for the application. This is because there is no way for the binder to infer the names of all the DLQ topics the implementation might send to. Therefore, if you provide DLQ names using this strategy, it is the application’s responsibility to ensure that those topics are created beforehand.

If DlqDestinationResolver is present in the application as a bean, that takes higher precedence. If you do not want to follow this approach and rather provide a static DLQ name using configuration, you can set the following property.

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)

If this is set, then the error records are sent to the topic custom-dlq. If the application is not using either of the above strategies, then it will create a DLQ topic with the name error.<input-topic-name>.<application-id>. For instance, if your binding’s destination topic is inputTopic and the application ID is process-applicationId, then the default DLQ topic is error.inputTopic.process-applicationId. It is always recommended to explicitly create a DLQ topic for each input binding if it is your intention to enable DLQ.

DLQ per input consumer binding

The property spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler is applicable for the entire application. This implies that if there are multiple functions in the same application, this property is applied to all of them. However, if you have multiple processors or multiple input bindings within a single processor, then you can use the finer-grained DLQ control that the binder provides per input consumer binding.

If you have the following processor,

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

and you only want to enable DLQ on the first input binding and skipAndContinue on the second binding, then you can do so on the consumer as below.

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue

Setting deserialization exception handlers this way has a higher precedence than setting at the binder level.

DLQ partitioning

By default, records are published to the Dead-Letter topic using the same partition as the original record. This means the Dead-Letter topic must have at least as many partitions as the original record.

To change this behavior, add a DlqPartitionFunction implementation as a @Bean to the application context. Only one such bean can be present. The function is provided with the consumer group (which is the same as the application ID in most situations), the failed ConsumerRecord and the exception. For example, if you always want to route to partition 0, you might use:

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}
If you set a consumer binding’s dlqPartitions property to 1 (and the binder’s minPartitionCount is equal to 1), there is no need to supply a DlqPartitionFunction; the framework will always use partition 0. If you set a consumer binding’s dlqPartitions property to a value greater than 1 (or the binder’s minPartitionCount is greater than 1), you must provide a DlqPartitionFunction bean, even if the partition count is the same as the original topic’s.

A couple of things to keep in mind when using the exception handling feature in Kafka Streams binder.

  • The property spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler is applicable for the entire application. This implies that if there are multiple functions in the same application, this property is applied to all of them.

  • The exception handling for deserialization works consistently with native deserialization and framework provided message conversion.

Handling Production Exceptions in the Binder

Unlike the support for deserialization exception handlers as described above, the binder does not provide such first class mechanisms for handling production exceptions. However, you still can configure production exception handlers using the StreamsBuilderFactoryBean customizer which you can find more details about, in a subsequent section below.

Runtime Error Handling

When it comes to handling errors from application code, i.e. from the business logic execution, it is usually up to the application to handle that. Because, the Kafka Streams binder does not have a way to interfere with the application code. However, to make things a bit easier for the application, the binder provides a convenient RecordRecoverableProcessor, using which, you can dictate how you want to handle the application level errors.

Consider the following code.

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
    return input -> input
        .map(...);
}

If the business code inside your map call above throws an exception, it is your responsibility to handle that error. This is where RecordRecoverableProcessor becomes handy. By default, RecordRecoverableProcessor, will simply log the error and let the application move on. Let’s say that you want to publish the failed record to a DLT, rather than handling it within the application. In that case, you must use a custom implementation of RecordRecoverableProcessor called DltAwareProcessor. Here is how you can do that.

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process(DltPublishingContext dltSenderContext) {
    return input -> input
        .process(() -> new DltAwareProcessor<>(record -> {
					throw new RuntimeException("error");
				}, "hello-dlt-1", dltPublishingContext));
}

The business logic code from the original map call now has been moved as part of KStream#process method call, which takes a ProcessorSupplier. We, then, pass in the custom DltAwareProcessor, which is capable to publishing to a DLT. The constructor for DltAwareProcessor above takes three parameters - a Function that takes the input record and then the business logic operation as part of the Function body, the DLT topic, and finally a DltPublishingContext. When the Function’s lambda expression throws an exception, the `DltAwareProcessor will send the input record to a DLT. The DltPublishingContext provides DltAwareProcessor the necessary publishing infrastructure beans. The DltPublishingContext is autoconfigured by the binder, so that you can inject directly this into the application.

If you do not want the binder to publish failed records to a DLT, then you must use the RecordRecoverableProcessor directly instead of the DltAwareProcessor. You can provide your own recoverer as a BiConsumer that takes the input Record and the exception as arguments. Assume a scenario, in which you do not want to send the record to the DLT, but simply log the message and move on. Below an example of how you can accomplish that.

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
    return input -> input
        .process(() -> new RecordRecoverableProcessor<>(record -> {
					throw new RuntimeException("error");
				},
                (record, exception) -> {
                  // Handle the record
                }));
}

In this case, when the record fails, the RecordRecoverableProcessor, uses the user provided recoverer which is a BiConsumer that takes the failed record and the exception thrown as arguments.

Handling Record Keys in DltAwareProcessor

When sending failed records to a DLT using DltAwareProcessor, if you want to send the record keys to the DLT topic, then you need to set the proper serializer on the DLT binding. This is because, DltAwareProcessor uses StreamBridge which uses the regular Kafka binder (message-channel based) which by default uses a ByteArraySerializer for keys. In the case of record values, Spring Cloud Stream converts the payload to proper byte[]; however, that is not the case with keys, as it simply pass along what it received in the header as a key. If you are providing a non-byte array key, then that might cause class cast exceptions and to avoid that you need to set a serializer on the DLT binding as below.

Assuming that the DLT destination is hello-dlt-1 and the record key is of String datatype.

spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer