This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.1.3! |
Consuming Batches
Starting with version 3.0, when spring.cloud.stream.bindings.<name>.consumer.batch-mode
is set to true
, all of the records received by polling the Kafka Consumer
will be presented as a List<?>
to the listener method.
Otherwise, the method will be called with one record at a time.
The size of the batch is controlled by Kafka consumer properties max.poll.records
, fetch.min.bytes
, fetch.max.wait.ms
; refer to the Kafka documentation for more information.
When receiving the batches, the following type signatures are allowed:
List<Person>
Message<List<Person>>
In the first option of List<Person>
, the listener will not get any message headers.
If the second type signature (Message<List<Person>>
) is used, then the headers can be accessed; however, all the headers are still be in the form of a Collection
.
Let’s take the following example.
Assume that the Message
contains a list with ten Person
objects.
The MessageHeaders
of the Message
contains a map of headers with key as the header name and value as a list.
This list contains the header value for that header in the same order as the payload list.
Therefore, it is up to the application to correctly access the header from the MessageHeaders
map based on the iteration of the payload list.
Note that, type signatures in the form of List<Message<Person>>
is not allowed when consuming in batch-mode.
Starting with version 4.0.2
, the binder supports DLQ capabilities when consuming in batch mode.
Keep in mind that, when using DLQ on a consumer binding that is in batch mode, all the records received from the previous poll will be delivered to the DLQ topic.
Retry within the binder is not supported when using batch mode, so maxAttempts will be overridden to 1.
You can configure a DefaultErrorHandler (using a ListenerContainerCustomizer ) to achieve similar functionality to retry in the binder.
You can also use a manual AckMode and call Ackowledgment.nack(index, sleep) to commit the offsets for a partial batch and have the remaining records redelivered.
Refer to the Spring for Apache Kafka documentation for more information about these techniques.
|
When receiving KafkaNull objects in the batch-mode, the received list will contain a null element for the corresponding KafkaNull object.
This is true for both List<Person> and Message<List<Person>> style type signatures.
|
Observability when consuming in batch mode
When consuming records in batches, observation tracing propagation feature is not supported directly.
This is because Spring for Apache Kafka library that is used by the Kafka binder does not support tracing on batch listeners; it is only supported for record listeners.
In a batch listener, the received records could be from multiple topics/partitions and from multiple producers where adding tracing information was optional.
Since there may not be any correlations between records in the batch, the framework cannot make any assumptions about tracing them, such as providing them as a single trace ID, etc.
If you use the type signature of Message<List<String>>
, you can then get a header called kafka_batchConvertedHeaders
, which contains a list with the same number of entries as your payload.
This list has a Map
that contains the tracing headers.
However, it is up to the application to iterate over this properly and start an observation.