For the latest stable version, please use Spring for Apache Kafka 3.3.1! |
Accessing Delivery Attempts
To access blocking and non-blocking delivery attempts, add these headers to your @KafkaListener
method signature:
@Header(KafkaHeaders.DELIVERY_ATTEMPT) int blockingAttempts,
@Header(name = RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, required = false) Integer nonBlockingAttempts
Blocking delivery attempts are only provided if you set ContainerProperties
's deliveryAttemptHeader to true
.
Note that the non blocking attempts will be null
for the initial delivery.
Starting with version 3.0.10, a convenient KafkaMessageHeaderAccessor
is provided to allow simpler access to these headers; the accessor can be provided as a parameter for the listener method:
@RetryableTopic(backoff = @Backoff(...)) @KafkaListener(id = "dh1", topics = "dh1") void listen(Thing thing, KafkaMessageHeaderAccessor accessor) { ... }
Use accessor.getBlockingRetryDeliveryAttempt()
and accessor.getNonBlockingRetryDeliveryAttempt()
to get the values.
The accessor will throw an IllegalStateException
if blocking retries are not enabled; for non-blocking retries, the accessor returns 1
for the initial delivery.