This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.3.10! |
Kafka Queues (Share Consumer)
Starting with version 4.0, Spring for Apache Kafka provides support for Kafka Queues through share consumers, which are part of Apache Kafka 4.0.0 and implement KIP-932 (Queues for Kafka). This feature is currently in early access.
Kafka Queues enable a different consumption model compared to traditional consumer groups. Instead of the partition-based assignment model where each partition is exclusively assigned to one consumer, share consumers can cooperatively consume from the same partitions, with records being distributed among the consumers in the share group.
Share Consumer Factory
The ShareConsumerFactory
is responsible for creating share consumer instances.
Spring Kafka provides the DefaultShareConsumerFactory
implementation.
Configuration
You can configure a DefaultShareConsumerFactory
similar to how you configure a regular ConsumerFactory
:
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-share-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultShareConsumerFactory<>(props);
}
Constructor Options
The DefaultShareConsumerFactory
provides several constructor options:
// Basic configuration
new DefaultShareConsumerFactory<>(configs);
// With deserializer suppliers
new DefaultShareConsumerFactory<>(configs, keyDeserializerSupplier, valueDeserializerSupplier);
// With deserializer instances
new DefaultShareConsumerFactory<>(configs, keyDeserializer, valueDeserializer, configureDeserializers);
Deserializer Configuration
You can configure deserializers in several ways:
-
Via Configuration Properties (recommended for simple cases):
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
Via Setters:
factory.setKeyDeserializer(new StringDeserializer()); factory.setValueDeserializer(new StringDeserializer());
-
Via Suppliers (for cases where deserializers need to be created per consumer):
factory.setKeyDeserializerSupplier(() -> new StringDeserializer()); factory.setValueDeserializerSupplier(() -> new StringDeserializer());
Set configureDeserializers
to false
if your deserializers are already fully configured and should not be reconfigured by the factory.
Lifecycle Listeners
You can add listeners to monitor the lifecycle of share consumers:
factory.addListener(new ShareConsumerFactory.Listener<String, String>() {
@Override
public void consumerAdded(String id, ShareConsumer<String, String> consumer) {
// Called when a new consumer is created
System.out.println("Consumer added: " + id);
}
@Override
public void consumerRemoved(String id, ShareConsumer<String, String> consumer) {
// Called when a consumer is closed
System.out.println("Consumer removed: " + id);
}
});
Share Message Listener Containers
ShareKafkaMessageListenerContainer
The ShareKafkaMessageListenerContainer
provides a container for share consumers with support for concurrent processing:
@Bean
public ShareKafkaMessageListenerContainer<String, String> container(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ContainerProperties containerProps = new ContainerProperties("my-topic");
containerProps.setGroupId("my-share-group");
ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("Received: " + record.value());
}
});
return container;
}
Container Properties
Share containers support a subset of the container properties available for regular consumers:
-
topics
: Array of topic names to subscribe to -
groupId
: The share group ID -
clientId
: The client ID for the consumer -
kafkaConsumerProperties
: Additional consumer properties
Share consumers do not support:
|
Concurrency
The ShareKafkaMessageListenerContainer
supports concurrent processing by creating multiple consumer threads within a single container.
Each thread runs its own ShareConsumer
instance that participates in the same share group.
Unlike traditional consumer groups where concurrency involves partition distribution, share consumers leverage Kafka’s record-level distribution at the broker. This means multiple consumer threads in the same container work together as part of the share group, with the Kafka broker distributing records across all consumer instances.
Concurrency is Additive Across Application Instances From the share group’s perspective, each For example:
* Application Instance 1: This means setting |
Configuring Concurrency Programmatically
@Bean
public ShareKafkaMessageListenerContainer<String, String> concurrentContainer(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ContainerProperties containerProps = new ContainerProperties("my-topic");
containerProps.setGroupId("my-share-group");
ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
// Set concurrency to create 5 consumer threads
container.setConcurrency(5);
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("Received on " + Thread.currentThread().getName() + ": " + record.value());
}
});
return container;
}
Configuring Concurrency via Factory
You can set default concurrency at the factory level, which applies to all containers created by that factory:
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Set default concurrency for all containers created by this factory
factory.setConcurrency(3);
return factory;
}
Per-Listener Concurrency
The concurrency setting can be overridden per listener using the concurrency
attribute:
@Component
public class ConcurrentShareListener {
@KafkaListener(
topics = "high-throughput-topic",
containerFactory = "shareKafkaListenerContainerFactory",
groupId = "my-share-group",
concurrency = "10" // Override factory default
)
public void listen(ConsumerRecord<String, String> record) {
// This listener will use 10 consumer threads
System.out.println("Processing: " + record.value());
}
}
Concurrency Considerations
-
Thread Safety: Each consumer thread has its own
ShareConsumer
instance and manages its own acknowledgments independently -
Client IDs: Each consumer thread receives a unique client ID with a numeric suffix (e.g.,
myContainer-0
,myContainer-1
, etc.) -
Metrics: Metrics from all consumer threads are aggregated and accessible via
container.metrics()
-
Lifecycle: All consumer threads start and stop together as a unit
-
Work Distribution: The Kafka broker handles record distribution across all consumer instances in the share group
-
Explicit Acknowledgment: Each thread independently manages acknowledgments for its records; unacknowledged records in one thread don’t block other threads
Concurrency with Explicit Acknowledgment
Concurrency works seamlessly with explicit acknowledgment mode. Each consumer thread independently tracks and acknowledges its own records:
@KafkaListener(
topics = "order-queue",
containerFactory = "explicitShareKafkaListenerContainerFactory",
groupId = "order-processors",
concurrency = "5"
)
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
try {
// Process the order
processOrderLogic(record.value());
acknowledgment.acknowledge(); // ACCEPT
}
catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
}
catch (Exception e) {
acknowledgment.reject(); // Permanent failure
}
}
Record Acquisition and Distribution Behavior: Share consumers use a pull-based model where each consumer thread calls
While records are acquired by one consumer, they are not available to other consumers. When the acquisition lock expires, unacknowledged records automatically return to "Available" state and can be delivered to another consumer. The broker limits the number of records that can be acquired per partition using Implications for Concurrency:
Configuration:
|
Annotation-Driven Listeners
@KafkaListener with Share Consumers
You can use @KafkaListener
with share consumers by configuring a ShareKafkaListenerContainerFactory
:
@Configuration
@EnableKafka
public class ShareConsumerConfig {
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultShareConsumerFactory<>(props);
}
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}
}
Then use it in your listener:
@Component
public class ShareMessageListener {
@KafkaListener(
topics = "my-queue-topic",
containerFactory = "shareKafkaListenerContainerFactory",
groupId = "my-share-group"
)
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received from queue: " + record.value());
// Record is automatically acknowledged with ACCEPT
}
}
Share Group Offset Reset
Unlike regular consumer groups, share groups use a different configuration for offset reset behavior. You can configure this programmatically:
private void configureShareGroup(String bootstrapServers, String groupId) throws Exception {
Map<String, Object> adminProps = new HashMap<>();
adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (Admin admin = Admin.create(adminProps)) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
ConfigEntry configEntry = new ConfigEntry("share.auto.offset.reset", "earliest");
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
configResource, List.of(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))
);
admin.incrementalAlterConfigs(configs).all().get();
}
}
Record Acknowledgment
Share consumers support two acknowledgment modes that control how records are acknowledged after processing.
Implicit Acknowledgment (Default)
In implicit mode, records are automatically acknowledged based on processing outcome:
Successful processing: Records are acknowledged as ACCEPT
Processing errors: Records are acknowledged as REJECT
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
// Implicit mode is the default - no additional configuration needed
return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}
Explicit Acknowledgment
In explicit mode, the application must manually acknowledge each record using the provided ShareAcknowledgment.
There are two ways to configure explicit acknowledgment mode:
Option 1: Using Kafka Client Configuration
@Bean
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"); // Official Kafka client config
return new DefaultShareConsumerFactory<>(props);
}
Option 2: Using Spring Container Configuration
@Bean
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit");
return new DefaultShareConsumerFactory<>(props);
}
@Bean
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> explicitShareConsumerFactory) {
// The factory will detect the explicit acknowledgment mode from the consumer factory configuration
return new ShareKafkaListenerContainerFactory<>(explicitShareConsumerFactory);
}
Configuration Precedence
When both configuration methods are used, Spring Kafka follows this precedence order (highest to lowest):
-
Container Properties:
containerProperties.setExplicitShareAcknowledgment(true/false)
-
Consumer Config:
ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
("implicit" or "explicit") -
Default:
false
(implicit acknowledgment)
Acknowledgment Types
Share consumers support three acknowledgment types:
ACCEPT: Record processed successfully, mark as completed RELEASE: Temporary failure, make record available for redelivery REJECT: Permanent failure, do not retry
ShareAcknowledgment API
The ShareAcknowledgment
interface provides methods for explicit acknowledgment:
public interface ShareAcknowledgment {
void acknowledge();
void release();
void reject();
}
Listener Interfaces
Share consumers support specialized listener interfaces for different use cases:
Basic Message Listener
Use the standard MessageListener for simple cases:
@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received: " + record.value());
// Automatically acknowledged in implicit mode
}
AcknowledgingShareConsumerAwareMessageListener
This interface provides access to the ShareConsumer
instance with optional acknowledgment support.
The acknowledgment parameter is nullable and depends on the container’s acknowledgment mode:
Implicit Mode Example (acknowledgment is null)
@KafkaListener(
topics = "my-topic",
containerFactory = "shareKafkaListenerContainerFactory" // Implicit mode by default
)
public void listen(ConsumerRecord<String, String> record,
@Nullable ShareAcknowledgment acknowledgment,
ShareConsumer<?, ?> consumer) {
// In implicit mode, acknowledgment is null
System.out.println("Received: " + record.value());
// Access consumer metrics if needed
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
// Record is auto-acknowledged as ACCEPT on success, REJECT on error
}
Explicit Mode Example (acknowledgment is non-null)
@Component
public class ExplicitAckListener {
@KafkaListener(
topics = "my-topic",
containerFactory = "explicitShareKafkaListenerContainerFactory"
)
public void listen(ConsumerRecord<String, String> record,
@Nullable ShareAcknowledgment acknowledgment,
ShareConsumer<?, ?> consumer) {
// In explicit mode, acknowledgment is non-null
try {
processRecord(record);
acknowledgment.acknowledge(); // ACCEPT
}
catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
}
catch (Exception e) {
acknowledgment.reject(); // Permanent failure
}
}
private void processRecord(ConsumerRecord<String, String> record) {
// Business logic here
}
}
Acknowledgment Constraints
In explicit acknowledgment mode, the container enforces important constraints:
Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged. One-time Acknowledgment: Each record can only be acknowledged once. Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`.
In explicit mode, failing to acknowledge records will block further message processing. Always ensure records are acknowledged in all code paths. |
Acknowledgment Timeout Detection
To help identify missing acknowledgments, Spring Kafka provides configurable timeout detection. When a record is not acknowledged within the specified timeout, a warning is logged with details about the unacknowledged record.
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Set acknowledgment timeout (default is 30 seconds)
factory.getContainerProperties().setShareAcknowledgmentTimeout(Duration.ofSeconds(30));
return factory;
}
When a record exceeds the timeout, you’ll see a warning like:
WARN: Record not acknowledged within timeout (30 seconds). In explicit acknowledgment mode, you must call ack.acknowledge(), ack.release(), or ack.reject() for every record.
This feature helps developers quickly identify when acknowledgment calls are missing from their code, preventing the common issue of "Spring Kafka does not consume new records any more" due to forgotten acknowledgments.
Acknowledgment Examples
Mixed Acknowledgment Patterns
@KafkaListener(topics = "order-processing", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
String orderId = record.key();
String orderData = record.value();
try {
if (isValidOrder(orderData)) {
if (processOrder(orderData)) {
acknowledgment.acknowledge(); // Success - ACCEPT
}
else {
acknowledgment.release(); // Temporary failure - retry later
}
}
else {
acknowledgment.reject(); // Invalid order - don't retry
}
}
catch (Exception e) {
// Exception automatically triggers REJECT
throw e;
}
}
Conditional Acknowledgment
@KafkaListener(topics = "data-validation", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
ValidationResult result = validator.validate(record.value());
switch (result.getStatus()) {
case VALID:
acknowledgment.acknowledge(AcknowledgeType.ACCEPT);
break;
case INVALID_RETRYABLE:
acknowledgment.acknowledge(AcknowledgeType.RELEASE);
break;
case INVALID_PERMANENT:
acknowledgment.acknowledge(AcknowledgeType.REJECT);
break;
}
}
Poison Message Protection and Delivery Count
KIP-932 includes broker-side poison message protection to prevent unprocessable records from being endlessly redelivered.
How It Works
Every time a record is acquired by a consumer in a share group, the broker increments an internal delivery count. The first acquisition sets the delivery count to 1, and each subsequent acquisition increments it. When the delivery count reaches the configured limit (default: 5), the record moves to Archived state and is not eligible for additional delivery attempts.
Configuration
The maximum delivery attempts can be configured per share group using the Admin API:
private void configureMaxDeliveryAttempts(String bootstrapServers, String groupId) throws Exception {
Map<String, Object> adminProps = new HashMap<>();
adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (Admin admin = Admin.create(adminProps)) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
// Default is 5, adjust based on your retry tolerance
ConfigEntry maxAttempts = new ConfigEntry("group.share.delivery.attempt.limit", "10");
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
configResource, List.of(new AlterConfigOp(maxAttempts, AlterConfigOp.OpType.SET))
);
admin.incrementalAlterConfigs(configs).all().get();
}
}
Delivery Count is Not Exposed to Applications The delivery count is maintained internally by the broker and is not exposed to consumer applications. This is an intentional design decision in KIP-932. The delivery count is approximate and serves as a poison message protection mechanism, not a precise redelivery counter. Applications cannot query or access this value through any API. For application-level retry logic, use the acknowledgment types:
The broker automatically prevents endless redelivery once |
Retry Strategy Recommendations
@KafkaListener(topics = "orders", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment ack) {
try {
// Attempt to process the order
orderService.process(record.value());
ack.acknowledge(); // ACCEPT - successfully processed
}
catch (TransientException e) {
// Temporary failure (network issue, service unavailable, etc.)
// Release the record for redelivery
// Broker will retry up to group.share.delivery.attempt.limit times
logger.warn("Transient error processing order, will retry: {}", e.getMessage());
ack.release(); // RELEASE - make available for retry
}
catch (ValidationException e) {
// Permanent semantic error (invalid data format, business rule violation, etc.)
// Do not retry - this record will never succeed
logger.error("Invalid order data, rejecting: {}", e.getMessage());
ack.reject(); // REJECT - permanent failure, do not retry
}
catch (Exception e) {
// Unknown error - typically safer to reject to avoid infinite loops
// But could also release if you suspect it might be transient
logger.error("Unexpected error processing order, rejecting: {}", e.getMessage());
ack.reject(); // REJECT - avoid poison message loops
}
}
The broker’s poison message protection ensures that even if you always use RELEASE
for errors, records won’t be retried endlessly.
They will automatically be archived after reaching the delivery attempt limit.
Differences from Regular Consumers
Share consumers differ from regular consumers in several key ways:
-
No Partition Assignment: Share consumers cannot be assigned specific partitions
-
No Topic Patterns: Share consumers do not support subscribing to topic patterns
-
Cooperative Consumption: Multiple consumers in the same share group can consume from the same partitions simultaneously
-
Record-Level Acknowledgment: Supports explicit acknowledgment with
ACCEPT
,RELEASE
, andREJECT
types -
Different Group Management: Share groups use different coordinator protocols
-
No Batch Processing: Share consumers process records individually, not in batches
-
Broker-Side Retry Management: Delivery count tracking and poison message protection are managed by the broker, not exposed to applications
Limitations and Considerations
Current Limitations
-
In preview: This feature is in preview mode and may change in future versions
-
No Message Converters: Message converters are not yet supported for share consumers
-
No Batch Listeners: Batch processing is not supported with share consumers
-
Poll Constraints: In explicit acknowledgment mode, unacknowledged records block subsequent polls within each consumer thread