|
This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 4.0.5! |
Kafka Queues (Share Consumer)
Starting with version 4.0, Spring for Apache Kafka provides support for Kafka Queues through share consumers, implementing KIP-932 (Queues for Kafka). As of Spring for Apache Kafka 4.1, backed by Apache Kafka 4.2, share consumers are promoted to full production support.
Kafka Queues enable a different consumption model compared to traditional consumer groups. Instead of the partition-based assignment model where each partition is exclusively assigned to one consumer, share consumers can cooperatively consume from the same partitions, with records being distributed among the consumers in the share group.
Share Consumer Factory
The ShareConsumerFactory is responsible for creating share consumer instances.
Spring Kafka provides the DefaultShareConsumerFactory implementation.
Configuration
You can configure a DefaultShareConsumerFactory similar to how you configure a regular ConsumerFactory:
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-share-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultShareConsumerFactory<>(props);
}
Constructor Options
The DefaultShareConsumerFactory provides several constructor options:
// Basic configuration
new DefaultShareConsumerFactory<>(configs);
// With deserializer suppliers
new DefaultShareConsumerFactory<>(configs, keyDeserializerSupplier, valueDeserializerSupplier);
// With deserializer instances
new DefaultShareConsumerFactory<>(configs, keyDeserializer, valueDeserializer, configureDeserializers);
Deserializer Configuration
You can configure deserializers in several ways:
-
Via Configuration Properties (recommended for simple cases):
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -
Via Setters:
factory.setKeyDeserializer(new StringDeserializer()); factory.setValueDeserializer(new StringDeserializer()); -
Via Suppliers (for cases where deserializers need to be created per consumer):
factory.setKeyDeserializerSupplier(() -> new StringDeserializer()); factory.setValueDeserializerSupplier(() -> new StringDeserializer());
Set configureDeserializers to false if your deserializers are already fully configured and should not be reconfigured by the factory.
Lifecycle Listeners
You can add listeners to monitor the lifecycle of share consumers:
factory.addListener(new ShareConsumerFactory.Listener<String, String>() {
@Override
public void consumerAdded(String id, ShareConsumer<String, String> consumer) {
// Called when a new consumer is created
System.out.println("Consumer added: " + id);
}
@Override
public void consumerRemoved(String id, ShareConsumer<String, String> consumer) {
// Called when a consumer is closed
System.out.println("Consumer removed: " + id);
}
});
Share Message Listener Containers
ShareKafkaMessageListenerContainer
The ShareKafkaMessageListenerContainer provides a container for share consumers with support for concurrent processing:
@Bean
public ShareKafkaMessageListenerContainer<String, String> container(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ContainerProperties containerProps = new ContainerProperties("my-topic");
containerProps.setGroupId("my-share-group");
ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("Received: " + record.value());
}
});
return container;
}
Container Properties
Share containers support a subset of the container properties available for regular consumers:
-
topics: Array of topic names to subscribe to -
groupId: The share group ID -
clientId: The client ID for the consumer -
shareAckMode: The acknowledgment mode (EXPLICIT,MANUAL, orIMPLICIT; see Record Acknowledgment) -
shareAcknowledgmentTimeout: Timeout for detecting unacknowledged records inMANUALmode -
syncShareCommits: Whether to usecommitSync()(default) orcommitAsync()for share consumer acknowledgments (see Synchronous vs Asynchronous Commits) -
kafkaConsumerProperties: Additional consumer properties
|
Share consumers do not support:
|
Lifecycle Events
The ShareKafkaMessageListenerContainer publishes Spring application events across the consumer lifecycle:
-
ConsumerStartingEvent: published when a share consumer thread is first started, before it starts polling. -
ConsumerStartedEvent: published when a share consumer is about to start polling. -
ConsumerFailedToStartEvent: published when a share consumer fails to start (for example, the underlyingShareConsumercould not be created). Any already-constructedShareConsumerinstances from the same startup attempt are closed quietly and do not publish stopping or stopped events because they never reached the poll loop. -
ShareConsumerStoppingEvent: published by each share consumer just before its underlyingShareConsumeris closed. UnlikeConsumerStoppingEvent, this event carries aShareConsumerreference (not aConsumer) becauseShareConsumeris a separate client API and is not assigned to specific partitions. -
ConsumerStoppedEvent: published after the share consumer is closed, with areason:-
NORMAL- the container was stopped. -
ABNORMAL- the consumer thread exited because of anException. -
ERROR- ajava.lang.Errorwas thrown.
-
These events can be consumed with @EventListener (see Event Consumption) to integrate share containers with the same monitoring and restart patterns available for regular containers.
Concurrency
The ShareKafkaMessageListenerContainer supports concurrent processing by creating multiple consumer threads within a single container.
Each thread runs its own ShareConsumer instance that participates in the same share group.
Unlike traditional consumer groups where concurrency involves partition distribution, share consumers leverage Kafka’s record-level distribution at the broker. This means multiple consumer threads in the same container work together as part of the share group, with the Kafka broker distributing records across all consumer instances.
|
Concurrency is Additive Across Application Instances From the share group’s perspective, each For example:
* Application Instance 1: This means setting |
Configuring Concurrency Programmatically
@Bean
public ShareKafkaMessageListenerContainer<String, String> concurrentContainer(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ContainerProperties containerProps = new ContainerProperties("my-topic");
containerProps.setGroupId("my-share-group");
ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
// Set concurrency to create 5 consumer threads
container.setConcurrency(5);
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("Received on " + Thread.currentThread().getName() + ": " + record.value());
}
});
return container;
}
Configuring Concurrency via Factory
You can set default concurrency at the factory level, which applies to all containers created by that factory:
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Set default concurrency for all containers created by this factory
factory.setConcurrency(3);
return factory;
}
Per-Listener Concurrency
The concurrency setting can be overridden per listener using the concurrency attribute:
@Component
public class ConcurrentShareListener {
@KafkaListener(
topics = "high-throughput-topic",
containerFactory = "shareKafkaListenerContainerFactory",
groupId = "my-share-group",
concurrency = "10" // Override factory default
)
public void listen(ConsumerRecord<String, String> record) {
// This listener will use 10 consumer threads
System.out.println("Processing: " + record.value());
}
}
Concurrency Considerations
-
Thread Safety: Each consumer thread has its own
ShareConsumerinstance and manages its own acknowledgments independently -
Client IDs: Each consumer thread receives a unique client ID with a numeric suffix (e.g.,
myContainer-0,myContainer-1, etc.) -
Metrics: Metrics from all consumer threads are aggregated and accessible via
container.metrics() -
Lifecycle: All consumer threads start and stop together as a unit
-
Work Distribution: The Kafka broker handles record distribution across all consumer instances in the share group
-
Acknowledgment: Each thread independently manages acknowledgments for its own records; unacknowledged records in one thread do not block other threads
Concurrency with MANUAL Acknowledgment
Concurrency works seamlessly with ShareAckMode.MANUAL.
Each consumer thread independently tracks and acknowledges its own records:
@KafkaListener(
topics = "order-queue",
containerFactory = "manualShareKafkaListenerContainerFactory",
groupId = "order-processors",
concurrency = "5"
)
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
try {
processOrderLogic(record.value());
acknowledgment.acknowledge(); // ACCEPT
}
catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
}
catch (Exception e) {
acknowledgment.reject(); // Permanent failure
}
}
|
Record Acquisition and Distribution Behavior: Share consumers use a pull-based model where each consumer thread calls
Apache Kafka 4.2 introduces the
This is a pass-through consumer property. Configure it directly in the consumer properties:
While records are acquired by one consumer, they are not available to other consumers. When the acquisition lock expires, unacknowledged records automatically return to "Available" state and can be delivered to another consumer. The broker limits the number of records that can be acquired per partition using Implications for Concurrency:
Configuration:
|
Annotation-Driven Listeners
@KafkaListener with Share Consumers
You can use @KafkaListener with share consumers by configuring a ShareKafkaListenerContainerFactory:
@Configuration
@EnableKafka
public class ShareConsumerConfig {
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultShareConsumerFactory<>(props);
}
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}
}
Then use it in your listener:
@Component
public class ShareMessageListener {
@KafkaListener(
topics = "my-queue-topic",
containerFactory = "shareKafkaListenerContainerFactory",
groupId = "my-share-group"
)
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received from queue: " + record.value());
// Record is automatically acknowledged with ACCEPT
}
}
Share Group Offset Reset
Unlike regular consumer groups, share groups use a different configuration for offset reset behavior. You can configure this programmatically:
private void configureShareGroup(String bootstrapServers, String groupId) throws Exception {
Map<String, Object> adminProps = new HashMap<>();
adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (Admin admin = Admin.create(adminProps)) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
ConfigEntry configEntry = new ConfigEntry("share.auto.offset.reset", "earliest");
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
configResource, List.of(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))
);
admin.incrementalAlterConfigs(configs).all().get();
}
}
Record Acknowledgment
Spring Kafka exposes three acknowledgment modes through ContainerProperties.ShareAckMode, controlling how records are acknowledged after processing.
EXPLICIT (Default): Container-Managed Acknowledgment
In EXPLICIT mode the container takes full responsibility for acknowledgment.
After the listener returns normally, the container sends ACCEPT for the record.
On listener exceptions, the ShareConsumerRecordRecoverer decides the outcome (ACCEPT, RELEASE, or REJECT; default: REJECT).
This is the closest analogue to disabling auto.commit on a regular consumer — the container manages all acknowledgment decisions without any listener involvement.
This is the default mode; no additional configuration is required:
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
// EXPLICIT is the default — no shareAckMode configuration needed
return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}
@KafkaListener(topics = "my-queue-topic", containerFactory = "shareKafkaListenerContainerFactory",
groupId = "my-share-group")
public void listen(ConsumerRecord<String, String> record) {
// Container sends ACCEPT on success; recoverer decides on error (default: REJECT)
process(record.value());
}
MANUAL: Listener-Managed Acknowledgment
In MANUAL mode the listener drives every per-record acknowledgment decision.
Each record is delivered with a non-null ShareAcknowledgment instance that the listener must call exactly once with a terminal operation (acknowledge(), release(), or reject()).
Subsequent polls are blocked until all records from the previous poll are acknowledged.
Use MANUAL mode when your business logic determines the acknowledgment outcome record by record.
@Bean
public ShareKafkaListenerContainerFactory<String, String> manualShareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
factory.getContainerProperties().setShareAckMode(ContainerProperties.ShareAckMode.MANUAL);
return factory;
}
The listener must accept a ShareAcknowledgment parameter (that is, implement AcknowledgingShareConsumerAwareMessageListener):
@KafkaListener(topics = "order-queue", containerFactory = "manualShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
try {
processOrderLogic(record.value());
acknowledgment.acknowledge(); // ACCEPT — success
}
catch (TransientException e) {
acknowledgment.release(); // RELEASE — redelivery
}
catch (Exception e) {
acknowledgment.reject(); // REJECT — permanent failure
}
}
|
|
IMPLICIT: Kafka Client Implicit Mode
In IMPLICIT mode, acknowledgment is delegated entirely to the Kafka broker, which automatically accepts all acquired records regardless of processing outcome.
No per-record acknowledgment API is available; the ShareAcknowledgment argument is always null.
This maps directly to setting share.acknowledgement.mode=implicit in the Kafka client configuration.
@Bean
public ShareKafkaListenerContainerFactory<String, String> implicitShareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
factory.getContainerProperties().setShareAckMode(ContainerProperties.ShareAckMode.IMPLICIT);
return factory;
}
|
In |
Acknowledgment Mode Comparison
| Mode | Who acknowledges | On success | On listener error |
|---|---|---|---|
|
Container |
Container sends |
Recoverer decides ( |
|
Listener code |
Listener calls |
Listener calls |
|
Kafka broker |
Broker auto-ACCEPTs |
Broker auto-ACCEPTs (no recovery) |
Migration from 4.0
In 4.0, share consumer acknowledgment was controlled by setExplicitShareAcknowledgment(boolean).
In 4.1, this is replaced by setShareAckMode(ShareAckMode).
The deprecated method still works: true maps to MANUAL, false maps to EXPLICIT.
Default behavior is unchanged.
The 4.0 default (setExplicitShareAcknowledgment(false)) was container-managed acknowledgment, which is exactly what ShareAckMode.EXPLICIT does.
No action is required for applications using the default.
If you used setExplicitShareAcknowledgment(true), replace it:
// Before (4.0)
factory.getContainerProperties().setExplicitShareAcknowledgment(true);
// After (4.1)
factory.getContainerProperties().setShareAckMode(ContainerProperties.ShareAckMode.MANUAL);
If you set share.acknowledgement.mode=implicit in the factory configuration (via ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG), note that in 4.0 this setting had no effect because the container always called consumer.acknowledge() regardless.
In 4.1, the container detects this conflict, logs a warning, and overrides the factory setting with explicit mode.
To use Kafka client implicit mode, set it explicitly on the container:
factory.getContainerProperties().setShareAckMode(ContainerProperties.ShareAckMode.IMPLICIT);
Acknowledgment Types
Share consumers support four acknowledgment types:
-
ACCEPT— Record processed successfully; mark as completed. -
RELEASE— Temporary failure; make the record available for redelivery to this or another consumer. -
REJECT— Permanent failure; archive the record and do not redeliver. -
RENEW— Extend the acquisition lock (non-terminal). Userenew()when processing may exceed the broker’s lock duration (group.share.record.lock.duration.ms, default 30 seconds). A terminal acknowledgment (acknowledge(),release(), orreject()) is still required.
ShareAcknowledgment API
The ShareAcknowledgment interface is only non-null in ShareAckMode.MANUAL mode:
public interface ShareAcknowledgment {
void acknowledge(); // ACCEPT — record successfully processed
void release(); // RELEASE — redelivery on transient failure
void reject(); // REJECT — permanent failure, do not retry
void renew(); // RENEW (non-terminal) — extend acquisition lock
}
Listener Interfaces
Basic Message Listener
Use the standard MessageListener in EXPLICIT (default) or IMPLICIT mode:
@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record) {
// In EXPLICIT mode (default): container sends ACCEPT on success, REJECT on error
process(record.value());
}
AcknowledgingShareConsumerAwareMessageListener
This interface provides access to the ShareConsumer instance and, in MANUAL mode, to the ShareAcknowledgment.
The acknowledgment parameter is non-null only in MANUAL mode; it is null in EXPLICIT and IMPLICIT modes.
EXPLICIT Mode (Default — acknowledgment is null)
@KafkaListener(
topics = "my-topic",
containerFactory = "shareKafkaListenerContainerFactory" // EXPLICIT by default
)
public void listen(ConsumerRecord<String, String> record,
@Nullable ShareAcknowledgment acknowledgment,
ShareConsumer<?, ?> consumer) {
// acknowledgment is null in EXPLICIT mode — container handles ACCEPT/REJECT automatically
process(record.value());
// Access consumer metrics if needed
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
}
MANUAL Mode (acknowledgment is non-null)
@KafkaListener(
topics = "my-topic",
containerFactory = "manualShareKafkaListenerContainerFactory" // ShareAckMode.MANUAL
)
public void listen(ConsumerRecord<String, String> record,
@Nullable ShareAcknowledgment acknowledgment,
ShareConsumer<?, ?> consumer) {
// acknowledgment is non-null in MANUAL mode
try {
processRecord(record);
acknowledgment.acknowledge(); // ACCEPT
}
catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
}
catch (Exception e) {
acknowledgment.reject(); // Permanent failure
}
}
Acknowledgment Constraints (MANUAL Mode)
In ShareAckMode.MANUAL, the container enforces the following constraints:
-
Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged.
-
Terminal Acknowledgment: Each record must receive exactly one terminal acknowledgment (
acknowledge(),release(), orreject()). You may callrenew()zero or more times before that to extend the acquisition lock. -
Error Handling: If processing throws an exception, the outcome is determined by the ShareConsumerRecordRecoverer (default:
REJECT).
In ShareAckMode.MANUAL, failing to acknowledge records will block further message processing.
Always ensure records are acknowledged in all code paths, including exception handlers.
|
Acknowledgment Timeout Detection
To help identify missing acknowledgments, Spring Kafka provides configurable timeout detection in MANUAL mode.
When a record is not acknowledged within the specified timeout, a warning is logged with details about the unacknowledged record.
@Bean
public ShareKafkaListenerContainerFactory<String, String> manualShareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
factory.getContainerProperties().setShareAckMode(ContainerProperties.ShareAckMode.MANUAL);
// Set acknowledgment timeout (default is 30 seconds)
factory.getContainerProperties().setShareAcknowledgmentTimeout(Duration.ofSeconds(30));
return factory;
}
When a record exceeds the timeout, you’ll see a warning like:
WARN: Record not acknowledged within timeout (30 seconds). In ShareAckMode.MANUAL you must call ack.acknowledge(), ack.release(), or ack.reject() for every record (call ack.renew() to extend the lock; a terminal ack is still required). Unacknowledged record: topic='my-topic', partition=0, offset=42
This feature helps identify missing acknowledgment calls before they cause "no new records consumed" symptoms due to blocked polls.
Synchronous vs Asynchronous Commits
By default, the share container uses commitSync() to ensure acknowledgments are durable before proceeding.
Set syncShareCommits to false to use commitAsync() when slightly relaxed ack-durability is acceptable in exchange for higher throughput.
@Bean
public ShareKafkaListenerContainerFactory<String, String> asyncCommitFactory(
ShareConsumerFactory<String, String> consumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(consumerFactory);
factory.getContainerProperties().setSyncShareCommits(false); // use commitAsync()
return factory;
}
| Property value | Kafka API called | Trade-off |
|---|---|---|
|
|
Durable — acknowledgments are guaranteed persisted before the next poll |
|
|
Higher throughput — slight risk of redelivery if the consumer crashes before the async commit completes |
Error Handling
The share container handles errors at two levels: during poll() (poll-level) and when the listener throws (listener-level).
Poll-Level Error Handling
If consumer.poll() throws an exception, the container handles it so the consumer thread does not stop:
-
RecordDeserializationException: Thrown when a record cannot be deserialized (for example, invalid UTF-8 for
StringDeserializer). The container catches it, logs a warning, and continues polling. InEXPLICITandMANUALmodes, the container REJECTs the affected record using the topic, partition, and offset from the exception. InIMPLICITmode no explicit acknowledgment call is made — the broker auto-accepts the record. Subsequent records are processed normally. -
CorruptRecordException: Thrown when CRC validation fails (for example, with
check.crcsenabled). The Kafka client automatically rejects the corrupt batch. The container catches the exception, logs it, and continues polling.
Without this handling, a single bad record would terminate the consumer thread.
Listener-Level Error Handling: ShareConsumerRecordRecoverer
When the listener throws an exception, the container delegates to a ShareConsumerRecordRecoverer to decide whether to ACCEPT, RELEASE, or REJECT the failed record.
The default is ShareConsumerRecordRecoverer.REJECTING (log and REJECT); ShareConsumerRecordRecoverer.RELEASING is also available (log and RELEASE for redelivery).
You can provide a custom recoverer to RELEASE records for transient failures (for example, downstream timeouts) so they can be redelivered to another consumer, while REJECTing permanent failures.
Configuring a Custom Recoverer
Set the recoverer on the container or on the factory so it applies to all containers created by that factory:
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
factory.setShareConsumerRecordRecoverer((record, ex) -> {
if (ex instanceof TransientException || ex.getCause() instanceof TimeoutException) {
return AcknowledgeType.RELEASE;
}
return AcknowledgeType.REJECT;
});
return factory;
}
You can also set the recoverer on an individual container via container.setShareConsumerRecordRecoverer(recoverer).
Recoverer Behavior
-
The recoverer must not return
RENEW; the container treats it as REJECT. -
If the recoverer itself throws, the container falls back to REJECT for that record.
-
The default recoverer logs at
ERRORand returnsREJECTfor every exception.
Relation to Poison Message Protection
Broker-side delivery count (see Poison Message Protection and Delivery Count) limits how many times a record can be acquired.
Even if your recoverer always returns RELEASE, the broker will eventually archive the record after the configured limit (default 5), so poison messages cannot loop forever.
Difference from Traditional Container Error Handling
Share consumers do not use CommonErrorHandler.
That interface is designed for partition-based consumers (seek, remaining records, offset commit).
Share consumers use acknowledgment-based recovery: the recoverer only decides ACCEPT, RELEASE, or REJECT; the container performs the acknowledgment.
There is no seek or dead-letter topic integration in the share container.
Acknowledgment Examples (MANUAL Mode)
Mixed Acknowledgment Patterns
@KafkaListener(topics = "order-processing", containerFactory = "manualShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
String orderData = record.value();
try {
if (isValidOrder(orderData)) {
if (processOrder(orderData)) {
acknowledgment.acknowledge(); // ACCEPT — success
}
else {
acknowledgment.release(); // RELEASE — temporary failure, retry later
}
}
else {
acknowledgment.reject(); // REJECT — invalid order, don't retry
}
}
catch (Exception e) {
acknowledgment.reject(); // REJECT — unexpected failure
}
}
Conditional Acknowledgment
@KafkaListener(topics = "data-validation", containerFactory = "manualShareKafkaListenerContainerFactory")
public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
ValidationResult result = validator.validate(record.value());
switch (result.getStatus()) {
case VALID -> acknowledgment.acknowledge(); // ACCEPT
case INVALID_RETRYABLE -> acknowledgment.release(); // RELEASE
case INVALID_PERMANENT -> acknowledgment.reject(); // REJECT
}
}
Poison Message Protection and Delivery Count
KIP-932 includes broker-side poison message protection to prevent unprocessable records from being endlessly redelivered.
How It Works
Every time a record is acquired by a consumer in a share group, the broker increments an internal delivery count. The first acquisition sets the delivery count to 1, and each subsequent acquisition increments it. When the delivery count reaches the configured limit (default: 5), the record moves to Archived state and is not eligible for additional delivery attempts.
|
Delivery Count is Not Exposed to Applications The delivery count is maintained internally by the broker and is not exposed to consumer applications. This is an intentional design decision in KIP-932. The delivery count is approximate and serves as a poison message protection mechanism, not a precise redelivery counter. Applications cannot query or access this value through any API. |
For application-level retry logic, use the acknowledgment types:
-
RELEASE- Make record available for redelivery (contributes to delivery count) -
REJECT- Mark as permanently failed (does not cause redelivery) -
ACCEPT- Successfully processed (does not cause redelivery)
The broker automatically prevents endless redelivery once group.share.delivery.count.limit is reached, moving the record to Archived state.
Retry Strategy Recommendations
Here is an example of how to use the various acknowledgement types based exception types.
@KafkaListener(topics = "orders", containerFactory = "manualShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment ack) {
try {
// Attempt to process the order
orderService.process(record.value());
ack.acknowledge(); // ACCEPT - successfully processed
}
catch (TransientException e) {
// Temporary failure (network issue, service unavailable, etc.)
// Release the record for redelivery
// Broker will retry up to group.share.delivery.count.limit times
logger.warn("Transient error processing order, will retry: {}", e.getMessage());
ack.release(); // RELEASE - make available for retry
}
catch (ValidationException e) {
// Permanent semantic error (invalid data format, business rule violation, etc.)
// Do not retry - this record will never succeed
logger.error("Invalid order data, rejecting: {}", e.getMessage());
ack.reject(); // REJECT - permanent failure, do not retry
}
catch (Exception e) {
// Unknown error - typically safer to reject to avoid infinite loops
// But could also release if you suspect it might be transient
logger.error("Unexpected error processing order, rejecting: {}", e.getMessage());
ack.reject(); // REJECT - avoid poison message loops
}
}
The broker’s poison message protection ensures that even if you always use RELEASE for errors, records won’t be retried endlessly.
They will automatically be archived after reaching the delivery attempt limit.
Differences from Regular Consumers
Share consumers differ from regular consumers in several key ways:
-
No Partition Assignment: Share consumers cannot be assigned specific partitions
-
No Topic Patterns: Share consumers do not support subscribing to topic patterns
-
Cooperative Consumption: Multiple consumers in the same share group can consume from the same partitions simultaneously
-
Record-Level Acknowledgment: Supports explicit acknowledgment with
ACCEPT,RELEASE, andREJECTtypes -
Different Group Management: Share groups use different coordinator protocols
-
No Batch Processing: Share consumers process records individually, not in batches
-
Broker-Side Retry Management: Delivery count tracking and poison message protection are managed by the broker, not exposed to applications