This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.3.0! |
Change History
What’s New in 3.2 Since 3.1
This section covers the changes made from version 3.1 to version 3.2. For changes in earlier version, see Change History.
Kafka Client Version
This version requires 3.7.0 kafka-clients
.
The 3.7.0 version of Kafka client introduces the new consumer group protocol.
Fore more details and it’s limitations see KIP-848.
The new consumer group protocol is an early access release and not meant to be used in production.
It is only recommended to use for testing purposes in this version.
Therefore, Spring for Apache Kafka supports this new consumer group protocol only to the extent of such testing level support available in the kafka-client
itself.
By default, Spring for Apache Kafka uses the classic consumer group protocol and when testing the new consumer group protocol, that needs to be opted-in via the group.protocol
property on the consumer.
Testing Support Changes
The kraft
mode is disabled in EmbeddedKafka
by default and users wanting to use the kraft
mode must enable it.
This is due to certain instabilities observed while using EmbeddedKafka
in kraft
mode, especially when testing the new consumer group protocol.
The new consumer group protocol is only supported in kraft
mode and because of this, when testing the new protocol, that needs to be done against a real Kafka cluster and not the one based on the KafkaClusterTestKit
, which EmbeddedKafka
is based upon.
In addition, there were some other race conditions observed, while running multiple KafkaListener
methods with EmbeddedKafka
in kraft
mode.
Until these issues are resolved, the kraft
default on EmbeddedKafka
will remain as false
.
Kafka Streams Interactive Query Support
A new API KafkaStreamsInteractiveQuerySupport
for accessing queryable stores used in Kafka Streams interactive queries.
See Kafka Streams Interactive Support for more details.
TransactionIdSuffixStrategy
A new TransactionIdSuffixStrategy
interface was introduced to manage transactional.id
suffix.
The default implementation is DefaultTransactionIdSuffixStrategy
when setting maxCache
greater than zero can reuse transactional.id
within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter.
See Fixed TransactionIdSuffix for more information.
Async @KafkaListener Return
@KafkaListener
(and @KafkaHandler
) methods can now return asynchronous return types include CompletableFuture<?>
, Mono<?>
and Kotlin suspend
functions.
See Async Returns for more information.
Routing of messages to custom DLTs based on thrown exceptions
It’s now possible to redirect messages to the custom DLTs based on the type of the exception, which has been thrown during the message processing.
Rules for the redirection are set either via the RetryableTopic.exceptionBasedDltRouting
or the RetryTopicConfigurationBuilder.dltRoutingRules
.
Custom DLTs are created automatically as well as other retry and dead-letter topics.
See Routing of messages to custom DLTs based on thrown exceptions for more information.
Deprecating ContainerProperties transactionManager property
Deprecating the transactionManager
property in ContainerProperties
in favor of KafkaAwareTransactionManager
, a narrower type compared to the general PlatformTransactionManager
. See ContainerProperties and Transaction Synchronization.
After Rollback Processing
A new AfterRollbackProcessor
API processBatch
is provided.
See After-rollback Processor for more information.
Change @RetryableTopic SameIntervalTopicReuseStrategy default value
Change @RetryableTopic
property SameIntervalTopicReuseStrategy
default value to SINGLE_TOPIC
.
See Single Topic for maxInterval Exponential Delay.
Non-blocking retries support class level @KafkaListener
Non-blocking retries support @KafkaListener on a Class. See Non-Blocking Retries.
Support process @RetryableTopic on a class in RetryTopicConfigurationProvider.
Provides a new public API to find RetryTopicConfiguration
.
See Find RetryTopicConfiguration
RetryTopicConfigurer support process MultiMethodKafkaListenerEndpoint.
The RetryTopicConfigurer
support process and register MultiMethodKafkaListenerEndpoint
.
The MultiMethodKafkaListenerEndpoint
provides getter/setter
for properties defaultMethod
and methods
.
Modify the EndpointCustomizer
that strictly for MethodKafkaListenerEndpoint
types.
The EndpointHandlerMethod
add new constructors construct an instance for the provided bean.
Provides new class EndpointHandlerMultiMethod
to handler multi method for retrying endpoints.
New API method to seek to an offset based on a user provided function
ConsumerCallback
provides a new API to seek to an offset based on a user-defined function, which takes the current offset in the consumer as an argument.
See Seek API Docs for more details.
@PartitionOffset support for SeekPosition
Adding seekPosition
property to @PartitionOffset
support for TopicPartitionOffset.SeekPosition
.
See manual-assignment for more details.
New constructor in TopicPartitionOffset that accepts a function to compute the offset to seek to
TopicPartitionOffset
has a new constructor that takes a user-provided function to compute the offset to seek to.
When this constructor is used, the framework calls the function with the input argument of the current consumer offset position.
See Seek API Docs for more details.
Spring Boot application name as default client ID prefix
For Spring Boot applications which define an application name, this name is now used as a default prefix for auto-generated client IDs for certain client types. See Default client ID prefixes for more details.
Enhanced Retrieval of MessageListenerContainers
ListenerContainerRegistry
provides two new API’s dynamically find and filter MessageListenerContainer
instances.
getListenerContainersMatching(Predicate<String> idMatcher)
to filter by ID and the other is
getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher)
to filter by ID and container properties.
See @KafkaListener
Lifecycle Management’s API Docs for more information.
Enhanced observation by providing more tracing tags
KafkaTemplateObservation
provides more tracing tags(low cardinality).
KafkaListenerObservation
provides a new API to find high cardinality key names and more tracing tags(high or low cardinality).
See Micrometer Observation
What’s New in 3.1 Since 3.0
This section covers the changes made from version 3.0 to version 3.1. For changes in earlier version, see Change History.
EmbeddedKafkaBroker
An additional implementation is now provided to use Kraft
instead of Zookeeper.
See Embedded Kafka Broker for more information.
JsonDeserializer
When a deserialization exception occurs, the SerializationException
message no longer contains the data with the form Can’t deserialize data [[123, 34, 98, 97, 122, …
; an array of numerical values for each data byte is not useful and can be verbose for large data.
When used with an ErrorHandlingDeserializer
, the DeserializationException
sent to the error handler contains the data
property which contains the raw data that could not be deserialized.
When not used with an ErrorHandlingDeserializer
, the KafkaConsumer
will continually emit exceptions for the same record showing the topic/partition/offset and the cause thrown by Jackson.
ContainerPostProcessor
Post-processing can be applied on a listener container by specifying the bean name of a ContainerPostProcessor
on the @KafkaListener
annotation.
This occurs after the container has been created and after any configured ContainerCustomizer
configured on the container factory.
See Container Factory for more information.
ErrorHandlingDeserializer
You can now add a Validator
to this deserializer; if the delegate Deserializer
successfully deserializes the object, but that object fails validation, an exception is thrown similar to a deserialization exception occurring.
This allows the original raw data to be passed to the error handler.
See Using ErrorHandlingDeserializer
for more information.
Retryable Topics
Change suffix -retry-5000
to -retry
when @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
.
If you want to keep suffix -retry-5000
, use @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")
.
See Topic Naming for more information.
Listener Container Changes
When manually assigning partitions, with a null
consumer group.id
, the AckMode
is now automatically coerced to MANUAL
.
See Manually Assigning All Partitions for more information.
What’s New in 3.0 Since 2.9
Exactly Once Semantics
EOSMode.V1
(aka ALPHA
) is no longer supported.
When using transactions, the minimum broker version is 2.5. |
See Exactly Once Semantics and KIP-447 for more information.
Observation
Enabling observation for timers and tracing using Micrometer is now supported. See Observation for more information.
Native Images
Support for creating native images is provided. See Native Images for more information.
Global Single Embedded Kafka
The embedded Kafka (EmbeddedKafkaBroker
) can now be start as a single global instance for the whole test plan.
See Using the Same Broker(s) for Multiple Test Classes for more information.
Retryable Topics Changes
This feature is no longer considered experimental (as far as its API is concerned), the feature itself has been supported since 2.7, but with a greater than normal possibility of breaking API changes.
The bootstrapping of Non-Blocking Retries infrastructure beans has changed in this release to avoid some timing problems that occurred in some application regarding application initialization.
You can now set a different concurrency
for the retry containers; by default, the concurrency is the same as the main container.
@RetryableTopic
can now be used as a meta-annotation on custom annotations, including support for @AliasFor
properties.
See Configuration for more information.
The default replication factor for the retry topics is now -1
(use broker default).
If your broker is earlier that version 2.4, you will now need to explicitly set the property.
You can now configure multiple @RetryableTopic
listeners on the same topic in the same application context.
Previously, this was not possible.
See Multiple Listeners, Same Topic(s) for more information.
There are breaking API changes in RetryTopicConfigurationSupport
; specifically, if you override the bean definition methods for destinationTopicResolver
, kafkaConsumerBackoffManager
and/or retryTopicConfigurer
;
these methods now require an ObjectProvider<RetryTopicComponentFactory>
parameter.
Listener Container Changes
Events related to consumer authentication and authorization failures are now published by the container. See Application Events for more information.
You can now customize the thread names used by consumer threads. See Container Thread Naming for more information.
The container property restartAfterAuthException
has been added.
See Listener Container Properties for more information.
KafkaTemplate
Changes
The futures returned by this class are now CompletableFuture
s instead of ListenableFuture
s.
See Using KafkaTemplate
.
ReplyingKafkaTemplate
Changes
The futures returned by this class are now CompletableFuture
s instead of ListenableFuture
s.
See Using ReplyingKafkaTemplate
and Request/Reply with Message<?>
s.
@KafkaListener
Changes
You can now use a custom correlation header which will be echoed in any reply message.
See the note at the end of Using ReplyingKafkaTemplate
for more information.
You can now manually commit parts of a batch before the entire batch is processed. See Committing Offsets for more information.
KafkaHeaders
Changes
Four constants in KafkaHeaders
that were deprecated in 2.9.x have now been removed.
-
Instead of
MESSAGE_KEY
, useKEY
. -
Instead of
PARTITION_ID
, usePARTITION
Similarly, RECEIVED_MESSAGE_KEY
is replaced by RECEIVED_KEY
and RECEIVED_PARTITION_ID
is replaced by RECEIVED_PARTITION
.
Testing Changes
Version 3.0.7 introduced a MockConsumerFactory
and MockProducerFactory
.
See Mock Consumer and Producer for more information.
Starting with version 3.0.10, the embedded Kafka broker, by default, sets the Spring Boot property spring.kafka.bootstrap-servers
to the address(es) of the embedded broker(s).
What’s New in 2.9 since 2.8
Error Handler Changes
The DefaultErrorHandler
can now be configured to pause the container for one poll and use the remaining results from the previous poll, instead of seeking to the offsets of the remaining records.
See DefaultErrorHandler for more information.
The DefaultErrorHandler
now has a BackOffHandler
property.
See Back Off Handlers for more information.
Listener Container Changes
interceptBeforeTx
now works with all transaction managers (previously it was only applied when a KafkaAwareTransactionManager
was used).
See [interceptBeforeTx].
A new container property pauseImmediate
is provided which allows the container to pause the consumer after the current record is processed, instead of after all the records from the previous poll have been processed.
See [pauseImmediate].
Events related to consumer authentication and authorization
Header Mapper Changes
You can now configure which inbound headers should be mapped. Also available in version 2.8.8 or later. See Message Headers for more information.
KafkaTemplate
Changes
In 3.0, the futures returned by this class will be CompletableFuture
s instead of ListenableFuture
s.
See Using KafkaTemplate
for assistance in transitioning when using this release.
ReplyingKafkaTemplate
Changes
The template now provides a method to wait for assignment on the reply container, to avoid a race when sending a request before the reply container is initialized.
Also available in version 2.8.8 or later.
See Using ReplyingKafkaTemplate
.
In 3.0, the futures returned by this class will be CompletableFuture
s instead of ListenableFuture
s.
See Using ReplyingKafkaTemplate
and Request/Reply with Message<?>
s for assistance in transitioning when using this release.
What’s New in 2.8 Since 2.7
This section covers the changes made from version 2.7 to version 2.8. For changes in earlier version, see Change History.
Package Changes
Classes and interfaces related to type mapping have been moved from …support.converter
to …support.mapping
.
-
AbstractJavaTypeMapper
-
ClassMapper
-
DefaultJackson2JavaTypeMapper
-
Jackson2JavaTypeMapper
Out of Order Manual Commits
The listener container can now be configured to accept manual offset commits out of order (usually asynchronously). The container will defer the commit until the missing offset is acknowledged. See Manually Committing Offsets for more information.
@KafkaListener
Changes
It is now possible to specify whether the listener method is a batch listener on the method itself. This allows the same container factory to be used for both record and batch listeners.
See [batch-listeners] for more information.
Batch listeners can now handle conversion exceptions.
See Conversion Errors with Batch Error Handlers for more information.
RecordFilterStrategy
, when used with batch listeners, can now filter the entire batch in one call.
See the note at the end of [batch-listeners] for more information.
The @KafkaListener
annotation now has the filter
attribute, to override the container factory’s RecordFilterStrategy
for just this listener.
The @KafkaListener
annotation now has the info
attribute; this is used to populate the new listener container property listenerInfo
.
This is then used to populate a KafkaHeaders.LISTENER_INFO
header in each record which can be used in RecordInterceptor
, RecordFilterStrategy
, or the listener itself.
See Listener Info Header and AbstractMessageListenerContainer Properties for more information.
KafkaTemplate
Changes
You can now receive a single record, given the topic, partition and offset.
See Using KafkaTemplate
to Receive for more information.
CommonErrorHandler
Added
The legacy GenericErrorHandler
and its sub-interface hierarchies for record an batch listeners have been replaced by a new single interface CommonErrorHandler
with implementations corresponding to most legacy implementations of GenericErrorHandler
.
See Container Error Handlers and Migrating Custom Legacy Error Handler Implementations to CommonErrorHandler
for more information.
Listener Container Changes
The interceptBeforeTx
container property is now true
by default.
The authorizationExceptionRetryInterval
property has been renamed to authExceptionRetryInterval
and now applies to AuthenticationException
s in addition to AuthorizationException
s previously.
Both exceptions are considered fatal and the container will stop by default, unless this property is set.
See Using KafkaMessageListenerContainer
and Listener Container Properties for more information.
Serializer/Deserializer Changes
The DelegatingByTopicSerializer
and DelegatingByTopicDeserializer
are now provided.
See Delegating Serializer and Deserializer for more information.
DeadLetterPublishingRecover
Changes
The property stripPreviousExceptionHeaders
is now true
by default.
There are now several techniques to customize which headers are added to the output record.
See Managing Dead Letter Record Headers for more information.
Retryable Topics Changes
Now you can use the same factory for retryable and non-retryable topics. See Specifying a ListenerContainerFactory for more information.
There’s now a manageable global list of fatal exceptions that will make the failed record go straight to the DLT. Refer to Exception Classifier to see how to manage it.
You can now use blocking and non-blocking retries in conjunction. See Combining Blocking and Non-Blocking Retries for more information.
The KafkaBackOffException thrown when using the retryable topics feature is now logged at DEBUG level. See Changing KafkaBackOffException Logging Level if you need to change the logging level back to WARN or set it to any other level.
Changes between 2.6 and 2.7
Kafka Client Version
This version requires the 2.7.0 kafka-clients
.
It is also compatible with the 2.8.0 clients, since version 2.7.1; see Override Spring Boot Dependencies.
Non-Blocking Delayed Retries Using Topics
This significant new feature is added in this release. When strict ordering is not important, failed deliveries can be sent to another topic to be consumed later. A series of such retry topics can be configured, with increasing delays. See Non-Blocking Retries for more information.
Listener Container Changes
The onlyLogRecordMetadata
container property is now true
by default.
A new container property stopImmediate
is now available.
See Listener Container Properties for more information.
Error handlers that use a BackOff
between delivery attempts (e.g. SeekToCurrentErrorHandler
and DefaultAfterRollbackProcessor
) will now exit the back off interval soon after the container is stopped, rather than delaying the stop.
Error handlers and after rollback processors that extend FailedRecordProcessor
can now be configured with one or more RetryListener
s to receive information about retry and recovery progress.
The RecordInterceptor
now has additional methods called after the listener returns (normally, or by throwing an exception).
It also has a sub-interface ConsumerAwareRecordInterceptor
.
In addition, there is now a BatchInterceptor
for batch listeners.
See Message Listener Containers for more information.
@KafkaListener
Changes
You can now validate the payload parameter of @KafkaHandler
methods (class-level listeners).
See @KafkaListener
@Payload
Validation for more information.
You can now set the rawRecordHeader
property on the MessagingMessageConverter
and BatchMessagingMessageConverter
which causes the raw ConsumerRecord
to be added to the converted Message<?>
.
This is useful, for example, if you wish to use a DeadLetterPublishingRecoverer
in a listener error handler.
See Listener Error Handlers for more information.
You can now modify @KafkaListener
annotations during application initialization.
See @KafkaListener
Attribute Modification for more information.
DeadLetterPublishingRecover
Changes
Now, if both the key and value fail deserialization, the original values are published to the DLT.
Previously, the value was populated but the key DeserializationException
remained in the headers.
There is a breaking API change, if you subclassed the recoverer and overrode the createProducerRecord
method.
In addition, the recoverer verifies that the partition selected by the destination resolver actually exists before publishing to it.
See Publishing Dead-letter Records for more information.
ChainedKafkaTransactionManager
is Deprecated
See Transactions for more information.
ReplyingKafkaTemplate
Changes
There is now a mechanism to examine a reply and fail the future exceptionally if some condition exists.
Support for sending and receiving spring-messaging
Message<?>
s has been added.
See Using ReplyingKafkaTemplate
for more information.
Kafka Streams Changes
By default, the StreamsBuilderFactoryBean
is now configured to not clean up local state.
See Configuration for more information.
KafkaAdmin
Changes
New methods createOrModifyTopics
and describeTopics
have been added.
KafkaAdmin.NewTopics
has been added to facilitate configuring multiple topics in a single bean.
See [configuring-topics] for more information.
MessageConverter
Changes
It is now possible to add a spring-messaging
SmartMessageConverter
to the MessagingMessageConverter
, allowing content negotiation based on the contentType
header.
See Spring Messaging Message Conversion for more information.
Sequencing @KafkaListener
s
See Starting @KafkaListener
s in Sequence for more information.
ExponentialBackOffWithMaxRetries
A new BackOff
implementation is provided, making it more convenient to configure the max retries.
See ExponentialBackOffWithMaxRetries
Implementation for more information.
Conditional Delegating Error Handlers
These new error handlers can be configured to delegate to different error handlers, depending on the exception type. See Delegating Error Handler for more information.
Changes between 2.5 and 2.6
Listener Container Changes
The default EOSMode
is now BETA
.
See Exactly Once Semantics for more information.
Various error handlers (that extend FailedRecordProcessor
) and the DefaultAfterRollbackProcessor
now reset the BackOff
if recovery fails.
In addition, you can now select the BackOff
to use based on the failed record and/or exception.
You can now configure an adviceChain
in the container properties.
See Listener Container Properties for more information.
When the container is configured to publish ListenerContainerIdleEvent
s, it now publishes a ListenerContainerNoLongerIdleEvent
when a record is received after publishing an idle event.
See Application Events and Detecting Idle and Non-Responsive Consumers for more information.
@KafkaListener Changes
When using manual partition assignment, you can now specify a wildcard for determining which partitions should be reset to the initial offset.
In addition, if the listener implements ConsumerSeekAware
, onPartitionsAssigned()
is called after the manual assignment.
(Also added in version 2.5.5).
See Explicit Partition Assignment for more information.
Convenience methods have been added to AbstractConsumerSeekAware
to make seeking easier.
See [seek] for more information.
ErrorHandler Changes
Subclasses of FailedRecordProcessor
(e.g. SeekToCurrentErrorHandler
, DefaultAfterRollbackProcessor
, RecoveringBatchErrorHandler
) can now be configured to reset the retry state if the exception is a different type to that which occurred previously with this record.
Producer Factory Changes
You can now set a maximum age for producers after which they will be closed and recreated. See Transactions for more information.
You can now update the configuration map after the DefaultKafkaProducerFactory
has been created.
This might be useful, for example, if you have to update SSL key/trust store locations after a credentials change.
See Using DefaultKafkaProducerFactory
for more information.
Changes between 2.4 and 2.5
This section covers the changes made from version 2.4 to version 2.5. For changes in earlier version, see Change History.
Consumer/Producer Factory Changes
The default consumer and producer factories can now invoke a callback whenever a consumer or producer is created or closed. Implementations for native Micrometer metrics are provided. See Factory Listeners for more information.
You can now change bootstrap server properties at runtime, enabling failover to another Kafka cluster. See Connecting to Kafka for more information.
StreamsBuilderFactoryBean
Changes
The factory bean can now invoke a callback whenever a KafkaStreams
created or destroyed.
An Implementation for native Micrometer metrics is provided.
See KafkaStreams Micrometer Support for more information.
Delivery Attempts Header
There is now an option to to add a header which tracks delivery attempts when using certain error handlers and after rollback processors. See Delivery Attempts Header for more information.
@KafkaListener Changes
Default reply headers will now be populated automatically if needed when a @KafkaListener
return type is Message<?>
.
See Reply Type Message<?> for more information.
The KafkaHeaders.RECEIVED_MESSAGE_KEY
is no longer populated with a null
value when the incoming record has a null
key; the header is omitted altogether.
@KafkaListener
methods can now specify a ConsumerRecordMetadata
parameter instead of using discrete headers for metadata such as topic, partition, etc.
See Consumer Record Metadata for more information.
Listener Container Changes
The assignmentCommitOption
container property is now LATEST_ONLY_NO_TX
by default.
See Listener Container Properties for more information.
The subBatchPerPartition
container property is now true
by default when using transactions.
See Transactions for more information.
A new RecoveringBatchErrorHandler
is now provided.
Static group membership is now supported. See Message Listener Containers for more information.
When incremental/cooperative rebalancing is configured, if offsets fail to commit with a non-fatal RebalanceInProgressException
, the container will attempt to re-commit the offsets for the partitions that remain assigned to this instance after the rebalance is completed.
The default error handler is now the SeekToCurrentErrorHandler
for record listeners and RecoveringBatchErrorHandler
for batch listeners.
See Container Error Handlers for more information.
You can now control the level at which exceptions intentionally thrown by standard error handlers are logged. See Container Error Handlers for more information.
The getAssignmentsByClientId()
method has been added, making it easier to determine which consumers in a concurrent container are assigned which partition(s).
See Listener Container Properties for more information.
You can now suppress logging entire ConsumerRecord
s in error, debug logs etc.
See onlyLogRecordMetadata
in Listener Container Properties.
KafkaTemplate Changes
The KafkaTemplate
can now maintain micrometer timers.
See Monitoring for more information.
The KafkaTemplate
can now be configured with ProducerConfig
properties to override those in the producer factory.
See Using KafkaTemplate
for more information.
A RoutingKafkaTemplate
has now been provided.
See Using RoutingKafkaTemplate
for more information.
You can now use KafkaSendCallback
instead of ListenerFutureCallback
to get a narrower exception, making it easier to extract the failed ProducerRecord
.
See Using KafkaTemplate
for more information.
Kafka String Serializer/Deserializer
New ToStringSerializer
/StringDeserializer
s as well as an associated SerDe
are now provided.
See String serialization for more information.
JsonDeserializer
The JsonDeserializer
now has more flexibility to determine the deserialization type.
See Using Methods to Determine Types for more information.
Delegating Serializer/Deserializer
The DelegatingSerializer
can now handle "standard" types, when the outbound record has no header.
See Delegating Serializer and Deserializer for more information.
Testing Changes
The KafkaTestUtils.consumerProps()
helper record now sets ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
to earliest
by default.
See JUnit for more information.
Changes between 2.3 and 2.4
Kafka Client Version
This version requires the 2.4.0 kafka-clients
or higher and supports the new incremental rebalancing feature.
ConsumerAwareRebalanceListener
Like ConsumerRebalanceListener
, this interface now has an additional method onPartitionsLost
.
Refer to the Apache Kafka documentation for more information.
Unlike the ConsumerRebalanceListener
, The default implementation does not call onPartitionsRevoked
.
Instead, the listener container will call that method after it has called onPartitionsLost
; you should not, therefore, do the same when implementing ConsumerAwareRebalanceListener
.
See the IMPORTANT note at the end of Rebalancing Listeners for more information.
KafkaTemplate
The KafkaTemplate
now supports non-transactional publishing alongside transactional.
See KafkaTemplate
Transactional and non-Transactional Publishing for more information.
AggregatingReplyingKafkaTemplate
The releaseStrategy
is now a BiConsumer
.
It is now called after a timeout (as well as when records arrive); the second parameter is true
in the case of a call after a timeout.
See Aggregating Multiple Replies for more information.
Listener Container
The ContainerProperties
provides an authorizationExceptionRetryInterval
option to let the listener container to retry after any AuthorizationException
is thrown by the KafkaConsumer
.
See its JavaDocs and Using KafkaMessageListenerContainer
for more information.
@KafkaListener
The @KafkaListener
annotation has a new property splitIterables
; default true.
When a replying listener returns an Iterable
this property controls whether the return result is sent as a single record or a record for each element is sent.
See Forwarding Listener Results using @SendTo
for more information
Batch listeners can now be configured with a BatchToRecordAdapter
; this allows, for example, the batch to be processed in a transaction while the listener gets one record at a time.
With the default implementation, a ConsumerRecordRecoverer
can be used to handle errors within the batch, without stopping the processing of the entire batch - this might be useful when using transactions.
See Transactions with Batch Listeners for more information.
Kafka Streams
The StreamsBuilderFactoryBean
accepts a new property KafkaStreamsInfrastructureCustomizer
.
This allows configuration of the builder and/or topology before the stream is created.
See Spring Management for more information.
Changes Between 2.2 and 2.3
This section covers the changes made from version 2.2 to version 2.3.
Tips, Tricks and Examples
A new chapter Tips, Tricks and Examples has been added. Please submit GitHub issues and/or pull requests for additional entries in that chapter.
Configuration Changes
Starting with version 2.3.4, the missingTopicsFatal
container property is false by default.
When this is true, the application fails to start if the broker is down; many users were affected by this change; given that Kafka is a high-availability platform, we did not anticipate that starting an application with no active brokers would be a common use case.
Producer and Consumer Factory Changes
The DefaultKafkaProducerFactory
can now be configured to create a producer per thread.
You can also provide Supplier<Serializer>
instances in the constructor as an alternative to either configured classes (which require no-arg constructors), or constructing with Serializer
instances, which are then shared between all Producers.
See Using DefaultKafkaProducerFactory
for more information.
The same option is available with Supplier<Deserializer>
instances in DefaultKafkaConsumerFactory
.
See Using KafkaMessageListenerContainer
for more information.
Listener Container Changes
Previously, error handlers received ListenerExecutionFailedException
(with the actual listener exception as the cause
) when the listener was invoked using a listener adapter (such as @KafkaListener
s).
Exceptions thrown by native GenericMessageListener
s were passed to the error handler unchanged.
Now a ListenerExecutionFailedException
is always the argument (with the actual listener exception as the cause
), which provides access to the container’s group.id
property.
Because the listener container has it’s own mechanism for committing offsets, it prefers the Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
to be false
.
It now sets it to false automatically unless specifically set in the consumer factory or the container’s consumer property overrides.
The ackOnError
property is now false
by default.
It is now possible to obtain the consumer’s group.id
property in the listener method.
See Obtaining the Consumer group.id
for more information.
The container has a new property recordInterceptor
allowing records to be inspected or modified before invoking the listener.
A CompositeRecordInterceptor
is also provided in case you need to invoke multiple interceptors.
See Message Listener Containers for more information.
The ConsumerSeekAware
has new methods allowing you to perform seeks relative to the beginning, end, or current position and to seek to the first offset greater than or equal to a time stamp.
See [seek] for more information.
A convenience class AbstractConsumerSeekAware
is now provided to simplify seeking.
See [seek] for more information.
The ContainerProperties
provides an idleBetweenPolls
option to let the main loop in the listener container to sleep between KafkaConsumer.poll()
calls.
See its JavaDocs and Using KafkaMessageListenerContainer
for more information.
When using AckMode.MANUAL
(or MANUAL_IMMEDIATE
) you can now cause a redelivery by calling nack
on the Acknowledgment
.
See Committing Offsets for more information.
Listener performance can now be monitored using Micrometer Timer
s.
See Monitoring for more information.
The containers now publish additional consumer lifecycle events relating to startup. See Application Events for more information.
Transactional batch listeners can now support zombie fencing. See Transactions for more information.
The listener container factory can now be configured with a ContainerCustomizer
to further configure each container after it has been created and configured.
See Container factory for more information.
ErrorHandler Changes
The SeekToCurrentErrorHandler
now treats certain exceptions as fatal and disables retry for those, invoking the recoverer on first failure.
The SeekToCurrentErrorHandler
and SeekToCurrentBatchErrorHandler
can now be configured to apply a BackOff
(thread sleep) between delivery attempts.
Starting with version 2.3.2, recovered records' offsets will be committed when the error handler returns after recovering a failed record.
The DeadLetterPublishingRecoverer
, when used in conjunction with an ErrorHandlingDeserializer
, now sets the payload of the message sent to the dead-letter topic, to the original value that could not be deserialized.
Previously, it was null
and user code needed to extract the DeserializationException
from the message headers.
See Publishing Dead-letter Records for more information.
TopicBuilder
A new class TopicBuilder
is provided for more convenient creation of NewTopic
@Bean
s for automatic topic provisioning.
See [configuring-topics] for more information.
Kafka Streams Changes
You can now perform additional configuration of the StreamsBuilderFactoryBean
created by @EnableKafkaStreams
.
See Streams Configuration for more information.
A RecoveringDeserializationExceptionHandler
is now provided which allows records with deserialization errors to be recovered.
It can be used in conjunction with a DeadLetterPublishingRecoverer
to send these records to a dead-letter topic.
See Recovery from Deserialization Exceptions for more information.
The HeaderEnricher
transformer has been provided, using SpEL to generate the header values.
See Header Enricher for more information.
The MessagingTransformer
has been provided.
This allows a Kafka streams topology to interact with a spring-messaging component, such as a Spring Integration flow.
See MessagingProcessor
and See [Calling a Spring Integration Flow from a KStream
] for more information.
JSON Component Changes
Now all the JSON-aware components are configured by default with a Jackson ObjectMapper
produced by the JacksonUtils.enhancedObjectMapper()
.
The JsonDeserializer
now provides TypeReference
-based constructors for better handling of target generic container types.
Also a JacksonMimeTypeModule
has been introduced for serialization of org.springframework.util.MimeType
to plain string.
See its JavaDocs and Serialization, Deserialization, and Message Conversion for more information.
A ByteArrayJsonMessageConverter
has been provided as well as a new super class for all Json converters, JsonMessageConverter
.
Also, a StringOrBytesSerializer
is now available; it can serialize byte[]
, Bytes
and String
values in ProducerRecord
s.
See Spring Messaging Message Conversion for more information.
The JsonSerializer
, JsonDeserializer
and JsonSerde
now have fluent APIs to make programmatic configuration simpler.
See the javadocs, Serialization, Deserialization, and Message Conversion, and Streams JSON Serialization and Deserialization for more information.
ReplyingKafkaTemplate
When a reply times out, the future is completed exceptionally with a KafkaReplyTimeoutException
instead of a KafkaException
.
Also, an overloaded sendAndReceive
method is now provided that allows specifying the reply timeout on a per message basis.
AggregatingReplyingKafkaTemplate
Extends the ReplyingKafkaTemplate
by aggregating replies from multiple receivers.
See Aggregating Multiple Replies for more information.
Transaction Changes
You can now override the producer factory’s transactionIdPrefix
on the KafkaTemplate
and KafkaTransactionManager
.
See transactionIdPrefix
for more information.
New Delegating Serializer/Deserializer
The framework now provides a delegating Serializer
and Deserializer
, utilizing a header to enable producing and consuming records with multiple key/value types.
See Delegating Serializer and Deserializer for more information.
New Retrying Deserializer
The framework now provides a delegating RetryingDeserializer
, to retry serialization when transient errors such as network problems might occur.
See Retrying Deserializer for more information.
Changes Between 2.1 and 2.2
Class and Package Changes
The ContainerProperties
class has been moved from org.springframework.kafka.listener.config
to org.springframework.kafka.listener
.
The AckMode
enum has been moved from AbstractMessageListenerContainer
to ContainerProperties
.
The setBatchErrorHandler()
and setErrorHandler()
methods have been moved from ContainerProperties
to both AbstractMessageListenerContainer
and AbstractKafkaListenerContainerFactory
.
After Rollback Processing
A new AfterRollbackProcessor
strategy is provided.
See After-rollback Processor for more information.
ConcurrentKafkaListenerContainerFactory
Changes
You can now use the ConcurrentKafkaListenerContainerFactory
to create and configure any ConcurrentMessageListenerContainer
, not only those for @KafkaListener
annotations.
See Container factory for more information.
Listener Container Changes
A new container property (missingTopicsFatal
) has been added.
See Using KafkaMessageListenerContainer
for more information.
A ConsumerStoppedEvent
is now emitted when a consumer stops.
See Thread Safety for more information.
Batch listeners can optionally receive the complete ConsumerRecords<?, ?>
object instead of a List<ConsumerRecord<?, ?>
.
See [batch-listeners] for more information.
The DefaultAfterRollbackProcessor
and SeekToCurrentErrorHandler
can now recover (skip) records that keep failing, and, by default, does so after 10 failures.
They can be configured to publish failed records to a dead-letter topic.
Starting with version 2.2.4, the consumer’s group ID can be used while selecting the dead letter topic name.
The ConsumerStoppingEvent
has been added.
See Application Events for more information.
The SeekToCurrentErrorHandler
can now be configured to commit the offset of a recovered record when the container is configured with AckMode.MANUAL_IMMEDIATE
(since 2.2.4).
@KafkaListener Changes
You can now override the concurrency
and autoStartup
properties of the listener container factory by setting properties on the annotation.
You can now add configuration to determine which headers (if any) are copied to a reply message.
See @KafkaListener
Annotation for more information.
You can now use @KafkaListener
as a meta-annotation on your own annotations.
See @KafkaListener
as a Meta Annotation for more information.
It is now easier to configure a Validator
for @Payload
validation.
See @KafkaListener
@Payload
Validation for more information.
You can now specify kafka consumer properties directly on the annotation; these will override any properties with the same name defined in the consumer factory (since version 2.2.4). See Annotation Properties for more information.
Header Mapping Changes
Headers of type MimeType
and MediaType
are now mapped as simple strings in the RecordHeader
value.
Previously, they were mapped as JSON and only MimeType
was decoded.
MediaType
could not be decoded.
They are now simple strings for interoperability.
Also, the DefaultKafkaHeaderMapper
has a new addToStringClasses
method, allowing the specification of types that should be mapped by using toString()
instead of JSON.
See Message Headers for more information.
Embedded Kafka Changes
The KafkaEmbedded
class and its KafkaRule
interface have been deprecated in favor of the EmbeddedKafkaBroker
and its JUnit 4 EmbeddedKafkaRule
wrapper.
The @EmbeddedKafka
annotation now populates an EmbeddedKafkaBroker
bean instead of the deprecated KafkaEmbedded
.
This change allows the use of @EmbeddedKafka
in JUnit 5 tests.
The @EmbeddedKafka
annotation now has the attribute ports
to specify the port that populates the EmbeddedKafkaBroker
.
See Testing Applications for more information.
JsonSerializer/Deserializer Enhancements
You can now provide type mapping information by using producer and consumer properties.
New constructors are available on the deserializer to allow overriding the type header information with the supplied target type.
The JsonDeserializer
now removes any type information headers by default.
You can now configure the JsonDeserializer
to ignore type information headers by using a Kafka property (since 2.2.3).
See Serialization, Deserialization, and Message Conversion for more information.
Kafka Streams Changes
The streams configuration bean must now be a KafkaStreamsConfiguration
object instead of a StreamsConfig
object.
The StreamsBuilderFactoryBean
has been moved from package …core
to …config
.
The KafkaStreamBrancher
has been introduced for better end-user experience when conditional branches are built on top of KStream
instance.
See Apache Kafka Streams Support and Configuration for more information.
Transactional ID
When a transaction is started by the listener container, the transactional.id
is now the transactionIdPrefix
appended with <group.id>.<topic>.<partition>
.
This change allows proper fencing of zombies, as described here.
Changes Between 2.0 and 2.1
Kafka Client Version
This version requires the 1.0.0 kafka-clients
or higher.
The 1.1.x client is supported natively in version 2.2.
JSON Improvements
The StringJsonMessageConverter
and JsonSerializer
now add type information in Headers
, letting the converter and JsonDeserializer
create specific types on reception, based on the message itself rather than a fixed configured type.
See Serialization, Deserialization, and Message Conversion for more information.
Container Stopping Error Handlers
Container error handlers are now provided for both record and batch listeners that treat any exceptions thrown by the listener as fatal/ They stop the container. See Handling Exceptions for more information.
Pausing and Resuming Containers
The listener containers now have pause()
and resume()
methods (since version 2.1.3).
See Pausing and Resuming Listener Containers for more information.
Stateful Retry
Starting with version 2.1.3, you can configure stateful retry. See Stateful Retry for more information.
Client ID
Starting with version 2.1.1, you can now set the client.id
prefix on @KafkaListener
.
Previously, to customize the client ID, you needed a separate consumer factory (and container factory) per listener.
The prefix is suffixed with -n
to provide unique client IDs when you use concurrency.
Logging Offset Commits
By default, logging of topic offset commits is performed with the DEBUG
logging level.
Starting with version 2.1.2, a new property in ContainerProperties
called commitLogLevel
lets you specify the log level for these messages.
See Using KafkaMessageListenerContainer
for more information.
Default @KafkaHandler
Starting with version 2.1.3, you can designate one of the @KafkaHandler
annotations on a class-level @KafkaListener
as the default.
See @KafkaListener
on a Class for more information.
ReplyingKafkaTemplate
Starting with version 2.1.3, a subclass of KafkaTemplate
is provided to support request/reply semantics.
See Using ReplyingKafkaTemplate
for more information.
ChainedKafkaTransactionManager
Version 2.1.3 introduced the ChainedKafkaTransactionManager
.
(It is now deprecated).
Migration Guide from 2.0
See the 2.0 to 2.1 Migration guide.
Changes Between 1.3 and 2.0
Spring Framework and Java Versions
The Spring for Apache Kafka project now requires Spring Framework 5.0 and Java 8.
@KafkaListener
Changes
You can now annotate @KafkaListener
methods (and classes and @KafkaHandler
methods) with @SendTo
.
If the method returns a result, it is forwarded to the specified topic.
See Forwarding Listener Results using @SendTo
for more information.
Message Listeners
Message listeners can now be aware of the Consumer
object.
See [message-listeners] for more information.
Using ConsumerAwareRebalanceListener
Rebalance listeners can now access the Consumer
object during rebalance notifications.
See Rebalancing Listeners for more information.
Changes Between 1.2 and 1.3
Support for Transactions
The 0.11.0.0 client library added support for transactions.
The KafkaTransactionManager
and other support for transactions have been added.
See Transactions for more information.
Support for Headers
The 0.11.0.0 client library added support for message headers.
These can now be mapped to and from spring-messaging
MessageHeaders
.
See Message Headers for more information.
Creating Topics
The 0.11.0.0 client library provides an AdminClient
, which you can use to create topics.
The KafkaAdmin
uses this client to automatically add topics defined as @Bean
instances.
Support for Kafka Timestamps
KafkaTemplate
now supports an API to add records with timestamps.
New KafkaHeaders
have been introduced regarding timestamp
support.
Also, new KafkaConditions.timestamp()
and KafkaMatchers.hasTimestamp()
testing utilities have been added.
See Using KafkaTemplate
, @KafkaListener
Annotation, and Testing Applications for more details.
@KafkaListener
Changes
You can now configure a KafkaListenerErrorHandler
to handle exceptions.
See Handling Exceptions for more information.
By default, the @KafkaListener
id
property is now used as the group.id
property, overriding the property configured in the consumer factory (if present).
Further, you can explicitly configure the groupId
on the annotation.
Previously, you would have needed a separate container factory (and consumer factory) to use different group.id
values for listeners.
To restore the previous behavior of using the factory configured group.id
, set the idIsGroup
property on the annotation to false
.
@EmbeddedKafka
Annotation
For convenience, a test class-level @EmbeddedKafka
annotation is provided, to register KafkaEmbedded
as a bean.
See Testing Applications for more information.
Kerberos Configuration
Support for configuring Kerberos is now provided. See JAAS and Kerberos for more information.
Changes Between 1.0 and 1.1
Batch Listeners
Listeners can be configured to receive the entire batch of messages returned by the consumer.poll()
operation, rather than one at a time.
Initial Offset
When explicitly assigning partitions, you can now configure the initial offset relative to the current position for the consumer group, rather than absolute or relative to the current end.
Seek
You can now seek the position of each topic or partition. You can use this to set the initial position during initialization when group management is in use and Kafka assigns the partitions. You can also seek when an idle container is detected or at any arbitrary point in your application’s execution. See [seek] for more information.