The class ContainerProperties
has been moved from org.springframework.kafka.listener.config
to org.springframework.kafka.listener
.
The enum AckMode
has been moved from AbstractMessageListenerContainer
to ContainerProperties
.
setBatchErrorHandler()
and setErrorHandler()
methods have been moved from ContainerProperties
to AbstractMessageListenerContainer
(and AbstractKafkaListenerContainerFactory
).
A new AfterRollbackProcessor
strategy is provided - see the section called “After Rollback Processor” for more information.
The ConcurrentKafkaListenerContainerFactory
can now be used to create/configure any ConcurrentMessageListenerContainer
, not just those for @KafkaListener
annotations.
See the section called “Container factory” for more information.
A new container property missingTopicsFatal
has been added.
See the section called “KafkaMessageListenerContainer” for more information.
A ConsumerStoppedEvent
is now emitted when a consumer terminates.
See the section called “Thread Safety” for more information.
Batch listeners can optionally receive the complete ConsumerRecords<?, ?>
object instead of a List<ConsumerRecord<?, ?>
.
See the section called “Batch listeners” for more information.
The DefaultAfterRollbackProcessor
and SeekToCurrentErrorHandler
can now recover (skip) records that keep failing, and will do so after 10 failures, by default.
They can be configured to publish failed records to a dead-letter topic.
See the section called “After Rollback Processor”, the section called “Seek To Current Container Error Handlers” and the section called “Publishing Dead-Letter Records” for more information.
You can now override the concurrency
and autoStartup
properties of the listener container factory by setting properties on the annotation.
See the section called “@KafkaListener Annotation” for more information.
Headers of type MimeType
and MediaType
are now mapped as simple strings in the RecordHeader
value.
Previously, they were mapped as JSON and only MimeType
was decoded, MediaType
could not be decoded.
They are now simple strings for interoperability.
Also, the DefaultKafkaHeaderMapper
has a new method addToStringClasses
allowing the specification of types that should be mapped using toString()
instead of JSON.
See Section 4.1.6, “Message Headers” for more information.
The KafkaEmbedded
class and its KafkaRule
interface have need deprecated in favor of the EmbeddedKafkaBroker
and its JUnit 4 EmbeddedKafkaRule
wrapper.
The @EmbeddedKafka
annotation now populates an EmbeddedKafkaBroker
bean instead of the deprecated KafkaEmbedded
.
This allows the use of @EmbeddedKafka
in JUnit 5 tests.
See Section 4.3, “Testing Applications” for more information.
You can now provide type mapping information using producer/consumer properties.
New constructors are available on the deserializer to allow overriding the type header information with the supplied target type.
The JsonDeserializer will now remove any type information headers by default.
See Section 4.1.5, “Serialization/Deserialization and Message Conversion” for more information.
The streams configuration bean must now be a simple Properties
object instead of a StreamsConfig
.
The StreamsBuilderFactoryBean
has been moved from package ...core
to ...config
.
See Section 4.2, “Kafka Streams Support” for more information.
When a transaction is started by the listener container, the transactional.id
is now the transactionIdPrefix
appended with <group.id>.<topic>.<partition>
.
This is to allow proper fencing of zombies as described here.