For the latest stable version, please use Spring for Apache Kafka 3.3.1! |
Asynchronous @KafkaListener
Return Types
Starting with version 3.2, @KafkaListener
(and @KafkaHandler
) methods can be specified with asynchronous return types, letting the reply be sent asynchronously.
return types include CompletableFuture<?>
, Mono<?>
and Kotlin suspend
functions.
@KafkaListener(id = "myListener", topics = "myTopic")
public CompletableFuture<String> listen(String data) {
...
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("done");
return future;
}
@KafkaListener(id = "myListener", topics = "myTopic")
public Mono<Void> listen(String data) {
...
return Mono.empty();
}
The AckMode will be automatically set the MANUAL and enable out-of-order commits when async return types are detected; instead, the asynchronous completion will ack when the async operation completes.
When the async result is completed with an error, whether the message is recover or not depends on the container error handler.
If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be ack or recover.
|
If a KafkaListenerErrorHandler
is configured on a listener with an async return type (including Kotlin suspend functions), the error handler is invoked after a failure.
See Handling Exceptions for more information about this error handler and its purpose.