Consuming Batches

Starting with version 3.0, when spring.cloud.stream.bindings.<name>.consumer.batch-mode is set to true, all of the records received by polling the Kafka Consumer will be presented as a List<?> to the listener method. Otherwise, the method will be called with one record at a time. The size of the batch is controlled by Kafka consumer properties max.poll.records, fetch.min.bytes, fetch.max.wait.ms; refer to the Kafka documentation for more information.

When receiving the batches, the following type signatures are allowed:

List<Person>
Message<List<Person>>

In the first option of List<Person>, the listener will not get any message headers. If the second type signature (Message<List<Person>>) is used, then the headers can be accessed; however, all the headers are still be in the form of a Collection. Let’s take the following example.

Assume that the Message contains a list with ten Person objects. The MessageHeaders of the Message contains a map of headers with key as the header name and value as a list. This list contains the header value for that header in the same order as the payload list. Therefore, it is up to the application to correctly access the header from the MessageHeaders map based on the iteration of the payload list.

Note that, type signatures in the form of List<Message<Person>> is not allowed when consuming in batch-mode.

Starting with version 4.0.2, the binder supports DLQ capabilities when consuming in batch mode. Keep in mind that, when using DLQ on a consumer binding that is in batch mode, all the records received from the previous poll will be delivered to the DLQ topic.

Retry within the binder is not supported when using batch mode, so maxAttempts will be overridden to 1. You can configure a DefaultErrorHandler (using a ListenerContainerCustomizer) to achieve similar functionality to retry in the binder. You can also use a manual AckMode and call Ackowledgment.nack(index, sleep) to commit the offsets for a partial batch and have the remaining records redelivered. Refer to the Spring for Apache Kafka documentation for more information about these techniques.
When receiving KafkaNull objects in the batch-mode, the received list will contain a null element for the corresponding KafkaNull object. This is true for both List<Person> and Message<List<Person>> style type signatures.