This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.3.0!

Change History

What’s New in 3.2 Since 3.1

This section covers the changes made from version 3.1 to version 3.2. For changes in earlier version, see Change History.

Kafka Client Version

This version requires 3.7.0 kafka-clients. The 3.7.0 version of Kafka client introduces the new consumer group protocol. Fore more details and it’s limitations see KIP-848. The new consumer group protocol is an early access release and not meant to be used in production. It is only recommended to use for testing purposes in this version. Therefore, Spring for Apache Kafka supports this new consumer group protocol only to the extent of such testing level support available in the kafka-client itself. By default, Spring for Apache Kafka uses the classic consumer group protocol and when testing the new consumer group protocol, that needs to be opted-in via the group.protocol property on the consumer.

Testing Support Changes

The kraft mode is disabled in EmbeddedKafka by default and users wanting to use the kraft mode must enable it. This is due to certain instabilities observed while using EmbeddedKafka in kraft mode, especially when testing the new consumer group protocol. The new consumer group protocol is only supported in kraft mode and because of this, when testing the new protocol, that needs to be done against a real Kafka cluster and not the one based on the KafkaClusterTestKit, which EmbeddedKafka is based upon. In addition, there were some other race conditions observed, while running multiple KafkaListener methods with EmbeddedKafka in kraft mode. Until these issues are resolved, the kraft default on EmbeddedKafka will remain as false.

Kafka Streams Interactive Query Support

A new API KafkaStreamsInteractiveQuerySupport for accessing queryable stores used in Kafka Streams interactive queries. See Kafka Streams Interactive Support for more details.

TransactionIdSuffixStrategy

A new TransactionIdSuffixStrategy interface was introduced to manage transactional.id suffix. The default implementation is DefaultTransactionIdSuffixStrategy when setting maxCache greater than zero can reuse transactional.id within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter. See Fixed TransactionIdSuffix for more information.

Async @KafkaListener Return

@KafkaListener (and @KafkaHandler) methods can now return asynchronous return types include CompletableFuture<?>, Mono<?> and Kotlin suspend functions. See Async Returns for more information.

Routing of messages to custom DLTs based on thrown exceptions

It’s now possible to redirect messages to the custom DLTs based on the type of the exception, which has been thrown during the message processing. Rules for the redirection are set either via the RetryableTopic.exceptionBasedDltRouting or the RetryTopicConfigurationBuilder.dltRoutingRules. Custom DLTs are created automatically as well as other retry and dead-letter topics. See Routing of messages to custom DLTs based on thrown exceptions for more information.

Deprecating ContainerProperties transactionManager property

Deprecating the transactionManager property in ContainerProperties in favor of KafkaAwareTransactionManager, a narrower type compared to the general PlatformTransactionManager. See ContainerProperties and Transaction Synchronization.

After Rollback Processing

A new AfterRollbackProcessor API processBatch is provided. See After-rollback Processor for more information.

Change @RetryableTopic SameIntervalTopicReuseStrategy default value

Change @RetryableTopic property SameIntervalTopicReuseStrategy default value to SINGLE_TOPIC. See Single Topic for maxInterval Exponential Delay.

Non-blocking retries support class level @KafkaListener

Non-blocking retries support @KafkaListener on a Class. See Non-Blocking Retries.

Support process @RetryableTopic on a class in RetryTopicConfigurationProvider.

Provides a new public API to find RetryTopicConfiguration. See Find RetryTopicConfiguration

RetryTopicConfigurer support process MultiMethodKafkaListenerEndpoint.

The RetryTopicConfigurer support process and register MultiMethodKafkaListenerEndpoint. The MultiMethodKafkaListenerEndpoint provides getter/setter for properties defaultMethod and methods. Modify the EndpointCustomizer that strictly for MethodKafkaListenerEndpoint types. The EndpointHandlerMethod add new constructors construct an instance for the provided bean. Provides new class EndpointHandlerMultiMethod to handler multi method for retrying endpoints.

New API method to seek to an offset based on a user provided function

ConsumerCallback provides a new API to seek to an offset based on a user-defined function, which takes the current offset in the consumer as an argument. See Seek API Docs for more details.

@PartitionOffset support for SeekPosition

Adding seekPosition property to @PartitionOffset support for TopicPartitionOffset.SeekPosition. See manual-assignment for more details.

New constructor in TopicPartitionOffset that accepts a function to compute the offset to seek to

TopicPartitionOffset has a new constructor that takes a user-provided function to compute the offset to seek to. When this constructor is used, the framework calls the function with the input argument of the current consumer offset position. See Seek API Docs for more details.

Spring Boot application name as default client ID prefix

For Spring Boot applications which define an application name, this name is now used as a default prefix for auto-generated client IDs for certain client types. See Default client ID prefixes for more details.

Enhanced Retrieval of MessageListenerContainers

ListenerContainerRegistry provides two new API’s dynamically find and filter MessageListenerContainer instances. getListenerContainersMatching(Predicate<String> idMatcher) to filter by ID and the other is getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher) to filter by ID and container properties.

Enhanced observation by providing more tracing tags

KafkaTemplateObservation provides more tracing tags(low cardinality). KafkaListenerObservation provides a new API to find high cardinality key names and more tracing tags(high or low cardinality). See Micrometer Observation

What’s New in 3.1 Since 3.0

This section covers the changes made from version 3.0 to version 3.1. For changes in earlier version, see Change History.

Kafka Client Version

This version requires the 3.6.0 kafka-clients.

EmbeddedKafkaBroker

An additional implementation is now provided to use Kraft instead of Zookeeper. See Embedded Kafka Broker for more information.

JsonDeserializer

When a deserialization exception occurs, the SerializationException message no longer contains the data with the form Can’t deserialize data [[123, 34, 98, 97, 122, …​; an array of numerical values for each data byte is not useful and can be verbose for large data. When used with an ErrorHandlingDeserializer, the DeserializationException sent to the error handler contains the data property which contains the raw data that could not be deserialized. When not used with an ErrorHandlingDeserializer, the KafkaConsumer will continually emit exceptions for the same record showing the topic/partition/offset and the cause thrown by Jackson.

ContainerPostProcessor

Post-processing can be applied on a listener container by specifying the bean name of a ContainerPostProcessor on the @KafkaListener annotation. This occurs after the container has been created and after any configured ContainerCustomizer configured on the container factory. See Container Factory for more information.

ErrorHandlingDeserializer

You can now add a Validator to this deserializer; if the delegate Deserializer successfully deserializes the object, but that object fails validation, an exception is thrown similar to a deserialization exception occurring. This allows the original raw data to be passed to the error handler. See Using ErrorHandlingDeserializer for more information.

Retryable Topics

Change suffix -retry-5000 to -retry when @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC). If you want to keep suffix -retry-5000, use @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2"). See Topic Naming for more information.

Listener Container Changes

When manually assigning partitions, with a null consumer group.id, the AckMode is now automatically coerced to MANUAL. See Manually Assigning All Partitions for more information.

What’s New in 3.0 Since 2.9

Kafka Client Version

This version requires the 3.3.1 kafka-clients.

Exactly Once Semantics

EOSMode.V1 (aka ALPHA) is no longer supported.

When using transactions, the minimum broker version is 2.5.

See Exactly Once Semantics and KIP-447 for more information.

Observation

Enabling observation for timers and tracing using Micrometer is now supported. See Observation for more information.

Native Images

Support for creating native images is provided. See Native Images for more information.

Global Single Embedded Kafka

The embedded Kafka (EmbeddedKafkaBroker) can now be start as a single global instance for the whole test plan. See Using the Same Broker(s) for Multiple Test Classes for more information.

Retryable Topics Changes

This feature is no longer considered experimental (as far as its API is concerned), the feature itself has been supported since 2.7, but with a greater than normal possibility of breaking API changes.

The bootstrapping of Non-Blocking Retries infrastructure beans has changed in this release to avoid some timing problems that occurred in some application regarding application initialization.

You can now set a different concurrency for the retry containers; by default, the concurrency is the same as the main container.

@RetryableTopic can now be used as a meta-annotation on custom annotations, including support for @AliasFor properties.

See Configuration for more information.

The default replication factor for the retry topics is now -1 (use broker default). If your broker is earlier that version 2.4, you will now need to explicitly set the property.

You can now configure multiple @RetryableTopic listeners on the same topic in the same application context. Previously, this was not possible. See Multiple Listeners, Same Topic(s) for more information.

There are breaking API changes in RetryTopicConfigurationSupport; specifically, if you override the bean definition methods for destinationTopicResolver, kafkaConsumerBackoffManager and/or retryTopicConfigurer; these methods now require an ObjectProvider<RetryTopicComponentFactory> parameter.

Listener Container Changes

Events related to consumer authentication and authorization failures are now published by the container. See Application Events for more information.

You can now customize the thread names used by consumer threads. See Container Thread Naming for more information.

The container property restartAfterAuthException has been added. See Listener Container Properties for more information.

KafkaTemplate Changes

The futures returned by this class are now CompletableFuture s instead of ListenableFuture s. See Using KafkaTemplate.

ReplyingKafkaTemplate Changes

The futures returned by this class are now CompletableFuture s instead of ListenableFuture s. See Using ReplyingKafkaTemplate and Request/Reply with Message<?> s.

@KafkaListener Changes

You can now use a custom correlation header which will be echoed in any reply message. See the note at the end of Using ReplyingKafkaTemplate for more information.

You can now manually commit parts of a batch before the entire batch is processed. See Committing Offsets for more information.

KafkaHeaders Changes

Four constants in KafkaHeaders that were deprecated in 2.9.x have now been removed.

  • Instead of MESSAGE_KEY, use KEY.

  • Instead of PARTITION_ID, use PARTITION

Similarly, RECEIVED_MESSAGE_KEY is replaced by RECEIVED_KEY and RECEIVED_PARTITION_ID is replaced by RECEIVED_PARTITION.

Testing Changes

Version 3.0.7 introduced a MockConsumerFactory and MockProducerFactory. See Mock Consumer and Producer for more information.

Starting with version 3.0.10, the embedded Kafka broker, by default, sets the Spring Boot property spring.kafka.bootstrap-servers to the address(es) of the embedded broker(s).

What’s New in 2.9 since 2.8

Kafka Client Version

This version requires the 3.2.0 kafka-clients.

Error Handler Changes

The DefaultErrorHandler can now be configured to pause the container for one poll and use the remaining results from the previous poll, instead of seeking to the offsets of the remaining records. See DefaultErrorHandler for more information.

The DefaultErrorHandler now has a BackOffHandler property. See Back Off Handlers for more information.

Listener Container Changes

interceptBeforeTx now works with all transaction managers (previously it was only applied when a KafkaAwareTransactionManager was used). See [interceptBeforeTx].

A new container property pauseImmediate is provided which allows the container to pause the consumer after the current record is processed, instead of after all the records from the previous poll have been processed. See [pauseImmediate].

Events related to consumer authentication and authorization

Header Mapper Changes

You can now configure which inbound headers should be mapped. Also available in version 2.8.8 or later. See Message Headers for more information.

KafkaTemplate Changes

In 3.0, the futures returned by this class will be CompletableFuture s instead of ListenableFuture s. See Using KafkaTemplate for assistance in transitioning when using this release.

ReplyingKafkaTemplate Changes

The template now provides a method to wait for assignment on the reply container, to avoid a race when sending a request before the reply container is initialized. Also available in version 2.8.8 or later. See Using ReplyingKafkaTemplate.

In 3.0, the futures returned by this class will be CompletableFuture s instead of ListenableFuture s. See Using ReplyingKafkaTemplate and Request/Reply with Message<?> s for assistance in transitioning when using this release.

What’s New in 2.8 Since 2.7

This section covers the changes made from version 2.7 to version 2.8. For changes in earlier version, see Change History.

Kafka Client Version

This version requires the 3.0.0 kafka-clients

Package Changes

Classes and interfaces related to type mapping have been moved from …​support.converter to …​support.mapping.

  • AbstractJavaTypeMapper

  • ClassMapper

  • DefaultJackson2JavaTypeMapper

  • Jackson2JavaTypeMapper

Out of Order Manual Commits

The listener container can now be configured to accept manual offset commits out of order (usually asynchronously). The container will defer the commit until the missing offset is acknowledged. See Manually Committing Offsets for more information.

@KafkaListener Changes

It is now possible to specify whether the listener method is a batch listener on the method itself. This allows the same container factory to be used for both record and batch listeners.

See [batch-listeners] for more information.

Batch listeners can now handle conversion exceptions.

RecordFilterStrategy, when used with batch listeners, can now filter the entire batch in one call. See the note at the end of [batch-listeners] for more information.

The @KafkaListener annotation now has the filter attribute, to override the container factory’s RecordFilterStrategy for just this listener.

The @KafkaListener annotation now has the info attribute; this is used to populate the new listener container property listenerInfo. This is then used to populate a KafkaHeaders.LISTENER_INFO header in each record which can be used in RecordInterceptor, RecordFilterStrategy, or the listener itself. See Listener Info Header and AbstractMessageListenerContainer Properties for more information.

KafkaTemplate Changes

You can now receive a single record, given the topic, partition and offset. See Using KafkaTemplate to Receive for more information.

CommonErrorHandler Added

The legacy GenericErrorHandler and its sub-interface hierarchies for record an batch listeners have been replaced by a new single interface CommonErrorHandler with implementations corresponding to most legacy implementations of GenericErrorHandler. See Container Error Handlers and Migrating Custom Legacy Error Handler Implementations to CommonErrorHandler for more information.

Listener Container Changes

The interceptBeforeTx container property is now true by default.

The authorizationExceptionRetryInterval property has been renamed to authExceptionRetryInterval and now applies to AuthenticationException s in addition to AuthorizationException s previously. Both exceptions are considered fatal and the container will stop by default, unless this property is set.

Serializer/Deserializer Changes

The DelegatingByTopicSerializer and DelegatingByTopicDeserializer are now provided. See Delegating Serializer and Deserializer for more information.

DeadLetterPublishingRecover Changes

The property stripPreviousExceptionHeaders is now true by default.

There are now several techniques to customize which headers are added to the output record.

See Managing Dead Letter Record Headers for more information.

Retryable Topics Changes

Now you can use the same factory for retryable and non-retryable topics. See Specifying a ListenerContainerFactory for more information.

There’s now a manageable global list of fatal exceptions that will make the failed record go straight to the DLT. Refer to Exception Classifier to see how to manage it.

You can now use blocking and non-blocking retries in conjunction. See Combining Blocking and Non-Blocking Retries for more information.

The KafkaBackOffException thrown when using the retryable topics feature is now logged at DEBUG level. See Changing KafkaBackOffException Logging Level if you need to change the logging level back to WARN or set it to any other level.

Changes between 2.6 and 2.7

Kafka Client Version

This version requires the 2.7.0 kafka-clients. It is also compatible with the 2.8.0 clients, since version 2.7.1; see Override Spring Boot Dependencies.

Non-Blocking Delayed Retries Using Topics

This significant new feature is added in this release. When strict ordering is not important, failed deliveries can be sent to another topic to be consumed later. A series of such retry topics can be configured, with increasing delays. See Non-Blocking Retries for more information.

Listener Container Changes

The onlyLogRecordMetadata container property is now true by default.

A new container property stopImmediate is now available.

See Listener Container Properties for more information.

Error handlers that use a BackOff between delivery attempts (e.g. SeekToCurrentErrorHandler and DefaultAfterRollbackProcessor) will now exit the back off interval soon after the container is stopped, rather than delaying the stop.

Error handlers and after rollback processors that extend FailedRecordProcessor can now be configured with one or more RetryListener s to receive information about retry and recovery progress.

The RecordInterceptor now has additional methods called after the listener returns (normally, or by throwing an exception). It also has a sub-interface ConsumerAwareRecordInterceptor. In addition, there is now a BatchInterceptor for batch listeners. See Message Listener Containers for more information.

@KafkaListener Changes

You can now validate the payload parameter of @KafkaHandler methods (class-level listeners). See @KafkaListener @Payload Validation for more information.

You can now set the rawRecordHeader property on the MessagingMessageConverter and BatchMessagingMessageConverter which causes the raw ConsumerRecord to be added to the converted Message<?>. This is useful, for example, if you wish to use a DeadLetterPublishingRecoverer in a listener error handler. See Listener Error Handlers for more information.

You can now modify @KafkaListener annotations during application initialization. See @KafkaListener Attribute Modification for more information.

DeadLetterPublishingRecover Changes

Now, if both the key and value fail deserialization, the original values are published to the DLT. Previously, the value was populated but the key DeserializationException remained in the headers. There is a breaking API change, if you subclassed the recoverer and overrode the createProducerRecord method.

In addition, the recoverer verifies that the partition selected by the destination resolver actually exists before publishing to it.

See Publishing Dead-letter Records for more information.

ChainedKafkaTransactionManager is Deprecated

See Transactions for more information.

ReplyingKafkaTemplate Changes

There is now a mechanism to examine a reply and fail the future exceptionally if some condition exists.

Support for sending and receiving spring-messaging Message<?> s has been added.

See Using ReplyingKafkaTemplate for more information.

Kafka Streams Changes

By default, the StreamsBuilderFactoryBean is now configured to not clean up local state. See Configuration for more information.

KafkaAdmin Changes

New methods createOrModifyTopics and describeTopics have been added. KafkaAdmin.NewTopics has been added to facilitate configuring multiple topics in a single bean. See [configuring-topics] for more information.

MessageConverter Changes

It is now possible to add a spring-messaging SmartMessageConverter to the MessagingMessageConverter, allowing content negotiation based on the contentType header. See Spring Messaging Message Conversion for more information.

Sequencing @KafkaListener s

See Starting @KafkaListener s in Sequence for more information.

ExponentialBackOffWithMaxRetries

A new BackOff implementation is provided, making it more convenient to configure the max retries. See ExponentialBackOffWithMaxRetries Implementation for more information.

Conditional Delegating Error Handlers

These new error handlers can be configured to delegate to different error handlers, depending on the exception type. See Delegating Error Handler for more information.

Changes between 2.5 and 2.6

Kafka Client Version

This version requires the 2.6.0 kafka-clients.

Listener Container Changes

The default EOSMode is now BETA. See Exactly Once Semantics for more information.

Various error handlers (that extend FailedRecordProcessor) and the DefaultAfterRollbackProcessor now reset the BackOff if recovery fails. In addition, you can now select the BackOff to use based on the failed record and/or exception.

You can now configure an adviceChain in the container properties. See Listener Container Properties for more information.

When the container is configured to publish ListenerContainerIdleEvent s, it now publishes a ListenerContainerNoLongerIdleEvent when a record is received after publishing an idle event. See Application Events and Detecting Idle and Non-Responsive Consumers for more information.

@KafkaListener Changes

When using manual partition assignment, you can now specify a wildcard for determining which partitions should be reset to the initial offset. In addition, if the listener implements ConsumerSeekAware, onPartitionsAssigned() is called after the manual assignment. (Also added in version 2.5.5). See Explicit Partition Assignment for more information.

Convenience methods have been added to AbstractConsumerSeekAware to make seeking easier. See [seek] for more information.

ErrorHandler Changes

Subclasses of FailedRecordProcessor (e.g. SeekToCurrentErrorHandler, DefaultAfterRollbackProcessor, RecoveringBatchErrorHandler) can now be configured to reset the retry state if the exception is a different type to that which occurred previously with this record.

Producer Factory Changes

You can now set a maximum age for producers after which they will be closed and recreated. See Transactions for more information.

You can now update the configuration map after the DefaultKafkaProducerFactory has been created. This might be useful, for example, if you have to update SSL key/trust store locations after a credentials change. See Using DefaultKafkaProducerFactory for more information.

Changes between 2.4 and 2.5

This section covers the changes made from version 2.4 to version 2.5. For changes in earlier version, see Change History.

Consumer/Producer Factory Changes

The default consumer and producer factories can now invoke a callback whenever a consumer or producer is created or closed. Implementations for native Micrometer metrics are provided. See Factory Listeners for more information.

You can now change bootstrap server properties at runtime, enabling failover to another Kafka cluster. See Connecting to Kafka for more information.

StreamsBuilderFactoryBean Changes

The factory bean can now invoke a callback whenever a KafkaStreams created or destroyed. An Implementation for native Micrometer metrics is provided. See KafkaStreams Micrometer Support for more information.

Kafka Client Version

This version requires the 2.5.0 kafka-clients.

Class/Package Changes

SeekUtils has been moved from the o.s.k.support package to o.s.k.listener.

Delivery Attempts Header

There is now an option to to add a header which tracks delivery attempts when using certain error handlers and after rollback processors. See Delivery Attempts Header for more information.

@KafkaListener Changes

Default reply headers will now be populated automatically if needed when a @KafkaListener return type is Message<?>. See Reply Type Message<?> for more information.

The KafkaHeaders.RECEIVED_MESSAGE_KEY is no longer populated with a null value when the incoming record has a null key; the header is omitted altogether.

@KafkaListener methods can now specify a ConsumerRecordMetadata parameter instead of using discrete headers for metadata such as topic, partition, etc. See Consumer Record Metadata for more information.

Listener Container Changes

The assignmentCommitOption container property is now LATEST_ONLY_NO_TX by default. See Listener Container Properties for more information.

The subBatchPerPartition container property is now true by default when using transactions. See Transactions for more information.

A new RecoveringBatchErrorHandler is now provided.

Static group membership is now supported. See Message Listener Containers for more information.

When incremental/cooperative rebalancing is configured, if offsets fail to commit with a non-fatal RebalanceInProgressException, the container will attempt to re-commit the offsets for the partitions that remain assigned to this instance after the rebalance is completed.

The default error handler is now the SeekToCurrentErrorHandler for record listeners and RecoveringBatchErrorHandler for batch listeners. See Container Error Handlers for more information.

You can now control the level at which exceptions intentionally thrown by standard error handlers are logged. See Container Error Handlers for more information.

The getAssignmentsByClientId() method has been added, making it easier to determine which consumers in a concurrent container are assigned which partition(s). See Listener Container Properties for more information.

You can now suppress logging entire ConsumerRecord s in error, debug logs etc. See onlyLogRecordMetadata in Listener Container Properties.

KafkaTemplate Changes

The KafkaTemplate can now maintain micrometer timers. See Monitoring for more information.

The KafkaTemplate can now be configured with ProducerConfig properties to override those in the producer factory. See Using KafkaTemplate for more information.

A RoutingKafkaTemplate has now been provided. See Using RoutingKafkaTemplate for more information.

You can now use KafkaSendCallback instead of ListenerFutureCallback to get a narrower exception, making it easier to extract the failed ProducerRecord. See Using KafkaTemplate for more information.

Kafka String Serializer/Deserializer

New ToStringSerializer/StringDeserializer s as well as an associated SerDe are now provided. See String serialization for more information.

JsonDeserializer

The JsonDeserializer now has more flexibility to determine the deserialization type. See Using Methods to Determine Types for more information.

Delegating Serializer/Deserializer

The DelegatingSerializer can now handle "standard" types, when the outbound record has no header. See Delegating Serializer and Deserializer for more information.

Testing Changes

The KafkaTestUtils.consumerProps() helper record now sets ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest by default. See JUnit for more information.

Changes between 2.3 and 2.4

Kafka Client Version

This version requires the 2.4.0 kafka-clients or higher and supports the new incremental rebalancing feature.

ConsumerAwareRebalanceListener

Like ConsumerRebalanceListener, this interface now has an additional method onPartitionsLost. Refer to the Apache Kafka documentation for more information.

Unlike the ConsumerRebalanceListener, The default implementation does not call onPartitionsRevoked. Instead, the listener container will call that method after it has called onPartitionsLost; you should not, therefore, do the same when implementing ConsumerAwareRebalanceListener.

See the IMPORTANT note at the end of Rebalancing Listeners for more information.

GenericErrorHandler

The isAckAfterHandle() default implementation now returns true by default.

KafkaTemplate

The KafkaTemplate now supports non-transactional publishing alongside transactional. See KafkaTemplate Transactional and non-Transactional Publishing for more information.

AggregatingReplyingKafkaTemplate

The releaseStrategy is now a BiConsumer. It is now called after a timeout (as well as when records arrive); the second parameter is true in the case of a call after a timeout.

See Aggregating Multiple Replies for more information.

Listener Container

The ContainerProperties provides an authorizationExceptionRetryInterval option to let the listener container to retry after any AuthorizationException is thrown by the KafkaConsumer. See its JavaDocs and Using KafkaMessageListenerContainer for more information.

@KafkaListener

The @KafkaListener annotation has a new property splitIterables; default true. When a replying listener returns an Iterable this property controls whether the return result is sent as a single record or a record for each element is sent. See Forwarding Listener Results using @SendTo for more information

Batch listeners can now be configured with a BatchToRecordAdapter; this allows, for example, the batch to be processed in a transaction while the listener gets one record at a time. With the default implementation, a ConsumerRecordRecoverer can be used to handle errors within the batch, without stopping the processing of the entire batch - this might be useful when using transactions. See Transactions with Batch Listeners for more information.

Kafka Streams

The StreamsBuilderFactoryBean accepts a new property KafkaStreamsInfrastructureCustomizer. This allows configuration of the builder and/or topology before the stream is created. See Spring Management for more information.

Changes Between 2.2 and 2.3

This section covers the changes made from version 2.2 to version 2.3.

Tips, Tricks and Examples

A new chapter Tips, Tricks and Examples has been added. Please submit GitHub issues and/or pull requests for additional entries in that chapter.

Kafka Client Version

This version requires the 2.3.0 kafka-clients or higher.

Class/Package Changes

TopicPartitionInitialOffset is deprecated in favor of TopicPartitionOffset.

Configuration Changes

Starting with version 2.3.4, the missingTopicsFatal container property is false by default. When this is true, the application fails to start if the broker is down; many users were affected by this change; given that Kafka is a high-availability platform, we did not anticipate that starting an application with no active brokers would be a common use case.

Producer and Consumer Factory Changes

The DefaultKafkaProducerFactory can now be configured to create a producer per thread. You can also provide Supplier<Serializer> instances in the constructor as an alternative to either configured classes (which require no-arg constructors), or constructing with Serializer instances, which are then shared between all Producers. See Using DefaultKafkaProducerFactory for more information.

The same option is available with Supplier<Deserializer> instances in DefaultKafkaConsumerFactory. See Using KafkaMessageListenerContainer for more information.

Listener Container Changes

Previously, error handlers received ListenerExecutionFailedException (with the actual listener exception as the cause) when the listener was invoked using a listener adapter (such as @KafkaListener s). Exceptions thrown by native GenericMessageListener s were passed to the error handler unchanged. Now a ListenerExecutionFailedException is always the argument (with the actual listener exception as the cause), which provides access to the container’s group.id property.

Because the listener container has it’s own mechanism for committing offsets, it prefers the Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to be false. It now sets it to false automatically unless specifically set in the consumer factory or the container’s consumer property overrides.

The ackOnError property is now false by default.

It is now possible to obtain the consumer’s group.id property in the listener method. See Obtaining the Consumer group.id for more information.

The container has a new property recordInterceptor allowing records to be inspected or modified before invoking the listener. A CompositeRecordInterceptor is also provided in case you need to invoke multiple interceptors. See Message Listener Containers for more information.

The ConsumerSeekAware has new methods allowing you to perform seeks relative to the beginning, end, or current position and to seek to the first offset greater than or equal to a time stamp. See [seek] for more information.

A convenience class AbstractConsumerSeekAware is now provided to simplify seeking. See [seek] for more information.

The ContainerProperties provides an idleBetweenPolls option to let the main loop in the listener container to sleep between KafkaConsumer.poll() calls. See its JavaDocs and Using KafkaMessageListenerContainer for more information.

When using AckMode.MANUAL (or MANUAL_IMMEDIATE) you can now cause a redelivery by calling nack on the Acknowledgment. See Committing Offsets for more information.

Listener performance can now be monitored using Micrometer Timer s. See Monitoring for more information.

The containers now publish additional consumer lifecycle events relating to startup. See Application Events for more information.

Transactional batch listeners can now support zombie fencing. See Transactions for more information.

The listener container factory can now be configured with a ContainerCustomizer to further configure each container after it has been created and configured. See Container factory for more information.

ErrorHandler Changes

The SeekToCurrentErrorHandler now treats certain exceptions as fatal and disables retry for those, invoking the recoverer on first failure.

The SeekToCurrentErrorHandler and SeekToCurrentBatchErrorHandler can now be configured to apply a BackOff (thread sleep) between delivery attempts.

Starting with version 2.3.2, recovered records' offsets will be committed when the error handler returns after recovering a failed record.

The DeadLetterPublishingRecoverer, when used in conjunction with an ErrorHandlingDeserializer, now sets the payload of the message sent to the dead-letter topic, to the original value that could not be deserialized. Previously, it was null and user code needed to extract the DeserializationException from the message headers. See Publishing Dead-letter Records for more information.

TopicBuilder

A new class TopicBuilder is provided for more convenient creation of NewTopic @Bean s for automatic topic provisioning. See [configuring-topics] for more information.

Kafka Streams Changes

You can now perform additional configuration of the StreamsBuilderFactoryBean created by @EnableKafkaStreams. See Streams Configuration for more information.

A RecoveringDeserializationExceptionHandler is now provided which allows records with deserialization errors to be recovered. It can be used in conjunction with a DeadLetterPublishingRecoverer to send these records to a dead-letter topic. See Recovery from Deserialization Exceptions for more information.

The HeaderEnricher transformer has been provided, using SpEL to generate the header values. See Header Enricher for more information.

The MessagingTransformer has been provided. This allows a Kafka streams topology to interact with a spring-messaging component, such as a Spring Integration flow. See MessagingProcessor and See [Calling a Spring Integration Flow from a KStream] for more information.

JSON Component Changes

Now all the JSON-aware components are configured by default with a Jackson ObjectMapper produced by the JacksonUtils.enhancedObjectMapper(). The JsonDeserializer now provides TypeReference-based constructors for better handling of target generic container types. Also a JacksonMimeTypeModule has been introduced for serialization of org.springframework.util.MimeType to plain string. See its JavaDocs and Serialization, Deserialization, and Message Conversion for more information.

A ByteArrayJsonMessageConverter has been provided as well as a new super class for all Json converters, JsonMessageConverter. Also, a StringOrBytesSerializer is now available; it can serialize byte[], Bytes and String values in ProducerRecord s. See Spring Messaging Message Conversion for more information.

The JsonSerializer, JsonDeserializer and JsonSerde now have fluent APIs to make programmatic configuration simpler. See the javadocs, Serialization, Deserialization, and Message Conversion, and Streams JSON Serialization and Deserialization for more information.

ReplyingKafkaTemplate

When a reply times out, the future is completed exceptionally with a KafkaReplyTimeoutException instead of a KafkaException.

Also, an overloaded sendAndReceive method is now provided that allows specifying the reply timeout on a per message basis.

AggregatingReplyingKafkaTemplate

Extends the ReplyingKafkaTemplate by aggregating replies from multiple receivers. See Aggregating Multiple Replies for more information.

Transaction Changes

You can now override the producer factory’s transactionIdPrefix on the KafkaTemplate and KafkaTransactionManager. See transactionIdPrefix for more information.

New Delegating Serializer/Deserializer

The framework now provides a delegating Serializer and Deserializer, utilizing a header to enable producing and consuming records with multiple key/value types. See Delegating Serializer and Deserializer for more information.

New Retrying Deserializer

The framework now provides a delegating RetryingDeserializer, to retry serialization when transient errors such as network problems might occur. See Retrying Deserializer for more information.

Changes Between 2.1 and 2.2

Kafka Client Version

This version requires the 2.0.0 kafka-clients or higher.

Class and Package Changes

The ContainerProperties class has been moved from org.springframework.kafka.listener.config to org.springframework.kafka.listener.

The AckMode enum has been moved from AbstractMessageListenerContainer to ContainerProperties.

The setBatchErrorHandler() and setErrorHandler() methods have been moved from ContainerProperties to both AbstractMessageListenerContainer and AbstractKafkaListenerContainerFactory.

After Rollback Processing

A new AfterRollbackProcessor strategy is provided. See After-rollback Processor for more information.

ConcurrentKafkaListenerContainerFactory Changes

You can now use the ConcurrentKafkaListenerContainerFactory to create and configure any ConcurrentMessageListenerContainer, not only those for @KafkaListener annotations. See Container factory for more information.

Listener Container Changes

A new container property (missingTopicsFatal) has been added. See Using KafkaMessageListenerContainer for more information.

A ConsumerStoppedEvent is now emitted when a consumer stops. See Thread Safety for more information.

Batch listeners can optionally receive the complete ConsumerRecords<?, ?> object instead of a List<ConsumerRecord<?, ?>. See [batch-listeners] for more information.

The DefaultAfterRollbackProcessor and SeekToCurrentErrorHandler can now recover (skip) records that keep failing, and, by default, does so after 10 failures. They can be configured to publish failed records to a dead-letter topic.

Starting with version 2.2.4, the consumer’s group ID can be used while selecting the dead letter topic name.

The ConsumerStoppingEvent has been added. See Application Events for more information.

The SeekToCurrentErrorHandler can now be configured to commit the offset of a recovered record when the container is configured with AckMode.MANUAL_IMMEDIATE (since 2.2.4).

@KafkaListener Changes

You can now override the concurrency and autoStartup properties of the listener container factory by setting properties on the annotation. You can now add configuration to determine which headers (if any) are copied to a reply message. See @KafkaListener Annotation for more information.

You can now use @KafkaListener as a meta-annotation on your own annotations. See @KafkaListener as a Meta Annotation for more information.

It is now easier to configure a Validator for @Payload validation. See @KafkaListener @Payload Validation for more information.

You can now specify kafka consumer properties directly on the annotation; these will override any properties with the same name defined in the consumer factory (since version 2.2.4). See Annotation Properties for more information.

Header Mapping Changes

Headers of type MimeType and MediaType are now mapped as simple strings in the RecordHeader value. Previously, they were mapped as JSON and only MimeType was decoded. MediaType could not be decoded. They are now simple strings for interoperability.

Also, the DefaultKafkaHeaderMapper has a new addToStringClasses method, allowing the specification of types that should be mapped by using toString() instead of JSON. See Message Headers for more information.

Embedded Kafka Changes

The KafkaEmbedded class and its KafkaRule interface have been deprecated in favor of the EmbeddedKafkaBroker and its JUnit 4 EmbeddedKafkaRule wrapper. The @EmbeddedKafka annotation now populates an EmbeddedKafkaBroker bean instead of the deprecated KafkaEmbedded. This change allows the use of @EmbeddedKafka in JUnit 5 tests. The @EmbeddedKafka annotation now has the attribute ports to specify the port that populates the EmbeddedKafkaBroker. See Testing Applications for more information.

JsonSerializer/Deserializer Enhancements

You can now provide type mapping information by using producer and consumer properties.

New constructors are available on the deserializer to allow overriding the type header information with the supplied target type.

The JsonDeserializer now removes any type information headers by default.

You can now configure the JsonDeserializer to ignore type information headers by using a Kafka property (since 2.2.3).

Kafka Streams Changes

The streams configuration bean must now be a KafkaStreamsConfiguration object instead of a StreamsConfig object.

The StreamsBuilderFactoryBean has been moved from package …​core to …​config.

The KafkaStreamBrancher has been introduced for better end-user experience when conditional branches are built on top of KStream instance.

See Apache Kafka Streams Support and Configuration for more information.

Transactional ID

When a transaction is started by the listener container, the transactional.id is now the transactionIdPrefix appended with <group.id>.<topic>.<partition>. This change allows proper fencing of zombies, as described here.

Changes Between 2.0 and 2.1

Kafka Client Version

This version requires the 1.0.0 kafka-clients or higher.

The 1.1.x client is supported natively in version 2.2.

JSON Improvements

The StringJsonMessageConverter and JsonSerializer now add type information in Headers, letting the converter and JsonDeserializer create specific types on reception, based on the message itself rather than a fixed configured type. See Serialization, Deserialization, and Message Conversion for more information.

Container Stopping Error Handlers

Container error handlers are now provided for both record and batch listeners that treat any exceptions thrown by the listener as fatal/ They stop the container. See Handling Exceptions for more information.

Pausing and Resuming Containers

The listener containers now have pause() and resume() methods (since version 2.1.3). See Pausing and Resuming Listener Containers for more information.

Stateful Retry

Starting with version 2.1.3, you can configure stateful retry. See Stateful Retry for more information.

Client ID

Starting with version 2.1.1, you can now set the client.id prefix on @KafkaListener. Previously, to customize the client ID, you needed a separate consumer factory (and container factory) per listener. The prefix is suffixed with -n to provide unique client IDs when you use concurrency.

Logging Offset Commits

By default, logging of topic offset commits is performed with the DEBUG logging level. Starting with version 2.1.2, a new property in ContainerProperties called commitLogLevel lets you specify the log level for these messages. See Using KafkaMessageListenerContainer for more information.

Default @KafkaHandler

Starting with version 2.1.3, you can designate one of the @KafkaHandler annotations on a class-level @KafkaListener as the default. See @KafkaListener on a Class for more information.

ReplyingKafkaTemplate

Starting with version 2.1.3, a subclass of KafkaTemplate is provided to support request/reply semantics. See Using ReplyingKafkaTemplate for more information.

ChainedKafkaTransactionManager

Version 2.1.3 introduced the ChainedKafkaTransactionManager. (It is now deprecated).

Migration Guide from 2.0

See the 2.0 to 2.1 Migration guide.

Changes Between 1.3 and 2.0

Spring Framework and Java Versions

The Spring for Apache Kafka project now requires Spring Framework 5.0 and Java 8.

@KafkaListener Changes

You can now annotate @KafkaListener methods (and classes and @KafkaHandler methods) with @SendTo. If the method returns a result, it is forwarded to the specified topic. See Forwarding Listener Results using @SendTo for more information.

Message Listeners

Message listeners can now be aware of the Consumer object. See [message-listeners] for more information.

Using ConsumerAwareRebalanceListener

Rebalance listeners can now access the Consumer object during rebalance notifications. See Rebalancing Listeners for more information.

Changes Between 1.2 and 1.3

Support for Transactions

The 0.11.0.0 client library added support for transactions. The KafkaTransactionManager and other support for transactions have been added. See Transactions for more information.

Support for Headers

The 0.11.0.0 client library added support for message headers. These can now be mapped to and from spring-messaging MessageHeaders. See Message Headers for more information.

Creating Topics

The 0.11.0.0 client library provides an AdminClient, which you can use to create topics. The KafkaAdmin uses this client to automatically add topics defined as @Bean instances.

Support for Kafka Timestamps

KafkaTemplate now supports an API to add records with timestamps. New KafkaHeaders have been introduced regarding timestamp support. Also, new KafkaConditions.timestamp() and KafkaMatchers.hasTimestamp() testing utilities have been added. See Using KafkaTemplate, @KafkaListener Annotation, and Testing Applications for more details.

@KafkaListener Changes

You can now configure a KafkaListenerErrorHandler to handle exceptions. See Handling Exceptions for more information.

By default, the @KafkaListener id property is now used as the group.id property, overriding the property configured in the consumer factory (if present). Further, you can explicitly configure the groupId on the annotation. Previously, you would have needed a separate container factory (and consumer factory) to use different group.id values for listeners. To restore the previous behavior of using the factory configured group.id, set the idIsGroup property on the annotation to false.

@EmbeddedKafka Annotation

For convenience, a test class-level @EmbeddedKafka annotation is provided, to register KafkaEmbedded as a bean. See Testing Applications for more information.

Kerberos Configuration

Support for configuring Kerberos is now provided. See JAAS and Kerberos for more information.

Changes Between 1.1 and 1.2

This version uses the 0.10.2.x client.

Changes Between 1.0 and 1.1

Kafka Client

This version uses the Apache Kafka 0.10.x.x client.

Batch Listeners

Listeners can be configured to receive the entire batch of messages returned by the consumer.poll() operation, rather than one at a time.

Null Payloads

Null payloads are used to “delete” keys when you use log compaction.

Initial Offset

When explicitly assigning partitions, you can now configure the initial offset relative to the current position for the consumer group, rather than absolute or relative to the current end.

Seek

You can now seek the position of each topic or partition. You can use this to set the initial position during initialization when group management is in use and Kafka assigns the partitions. You can also seek when an idle container is detected or at any arbitrary point in your application’s execution. See [seek] for more information.