This part of the reference documentation details the various components that comprise Spring for Apache Kafka. The main chapter covers the core classes to develop a Kafka application with Spring.
The KafkaTemplate
wraps a producer and provides convenience methods to send data to kafka topics.
Both asynchronous and synchronous methods are provided, with the async methods returning a Future
.
ListenableFuture<SendResult<K, V>> sendDefault(V data); ListenableFuture<SendResult<K, V>> sendDefault(K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(int partition, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, V data); ListenableFuture<SendResult<K, V>> send(String topic, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data); ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V data); ListenableFuture<SendResult<K, V>> send(Message<?> message); // Flush the producer. void flush();
The first 3 methods require that a default topic has been provided to the template.
To use the template, configure a producer factory and provide it in the template’s constructor:
@Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); ... return props; } @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { return new KafkaTemplate<Integer, String>(producerFactory()); }
The template can also be configured using standard <bean/>
definitions.
Then, to use the template, simply invoke one of its methods.
When using the methods with a Message<?>
parameter, topic, partition and key information is provided in a message
header:
KafkaHeaders.TOPIC
KafkaHeaders.PARTITION_ID
KafkaHeaders.MESSAGE_KEY
with the message payload being the data.
Optionally, you can configure the KafkaTemplate
with a ProducerListener
to get an async callback with the
results of the send (success or failure) instead of waiting for the Future
to complete.
public interface ProducerListener<K, V> { void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata); void onError(String topic, Integer partition, K key, V value, Exception exception); boolean isInterestedInSuccess(); }
By default, the template is configured with a LoggingProducerListener
which logs errors and does nothing when the
send is successful.
onSuccess
is only called if isInterestedInSuccess
returns true
.
For convenience, the abstract ProducerListenerAdapter
is provided in case you only want to implement one of the
methods.
It returns false
for isInterestedInSuccess
.
Notice that the send methods return a ListenableFuture<SendResult>
.
You can register a callback with the listener to receive the result of the send asynchronously.
ListenableFuture<SendResult<Integer, String>> future = template.send("foo"); future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { @Override public void onSuccess(SendResult<Integer, String> result) { ... } @Override public void onFailure(Throwable ex) { ... } });
The SendResult
has two properties, a ProducerRecord
and RecordMetadata
; refer to the Kafka API documentation
for information about those objects.
If you wish to block the sending thread, to await the result, you can invoke the future’s get()
method.
You may wish to invoke flush()
before waiting or, for convenience, the template has a constructor with an autoFlush
parameter which will cause the template to flush()
on each send.
Note, however that flushing will likely significantly reduce performance.
Messages can be received by configuring a MessageListenerContainer
and providing a MessageListener
, or by
using the @KafkaListener
annotation.
Two MessageListenerContainer
implementations are provided:
KafkaMessageListenerContainer
ConcurrentMessageListenerContainer
The KafkaMessageListenerContainer
receives all message from all topics/partitions on a single thread.
The ConcurrentMessageListenerContainer
delegates to 1 or more KafkaMessageListenerContainer
s to provide
multi-threaded consumption.
The following constructors are available.
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties, TopicPartitionInitialOffset... topicPartitions)
Each takes a ConsumerFactory
and information about topics and partitions, as well as other configuration in a ContainerProperties
object.
The second constructor is used by the ConcurrentMessageListenerContainer
(see below) to distribute TopicPartitionInitialOffset
across the consumer instances.
ContainerProperties
has the following constructors:
public ContainerProperties(TopicPartitionInitialOffset... topicPartitions) public ContainerProperties(String... topics) public ContainerProperties(Pattern topicPattern)
The first takes an array of TopicPartitionInitialOffset
arguments to explicitly instruct the container which partitions to use
(using the consumer assign()
method), and with an optional initial offset: a positive value is an absolute offset; a negative value is relative to the current last offset within a partition.
The offsets are applied when the container is started.
The second takes an array of topics and Kafka allocates the partitions based on the group.id
property - distributing
partitions across the group.
The third uses a regex Pattern
to select the topics.
Refer to the JavaDocs for ContainerProperties
for more information about the various properties that can be set.
The single constructor is similar to the first KafkaListenerContainer
constructor:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
It also has a property concurrency
, e.g. container.setConcurrency(3)
will create 3 KafkaMessageListenerContainer
s.
For the first constructor, kafka will distribute the partitions across the consumers.
For the second constructor, the ConcurrentMessageListenerContainer
distributes the TopicPartition
s across the
delegate KafkaMessageListenerContainer
s.
If, say, 6 TopicPartition
s are provided and the concurrency
is 3; each container will get 2 partitions.
For 5 TopicPartition
s, 2 containers will get 2 partitions and the third will get 1.
If the concurrency
is greater than the number of TopicPartitions
, the concurrency
will be adjusted down such that
each container will get one partition.
Several options are provided for committing offsets.
If the enable.auto.commit
consumer property is true, kafka will auto-commit the offsets according to its
configuration.
If it is false, the containers support the following AckMode
s.
The consumer poll()
method will return one or more ConsumerRecords
; the MessageListener
is called for each record;
the following describes the action taken by the container for each AckMode
:
poll()
have been processed.
poll()
have been processed as long as the ackTime
since the last commit has been exceeded.
poll()
have been processed as long as ackCount
records have been received since the last commit.
AcknowledgingMessageListener
) is responsible to acknowledge()
the Acknowledgment
;
after which, the same semantics as BATCH
are applied.
Acknowledgment.acknowledge()
method is called by the
listener.
Note | |
---|---|
|
The commitSync()
or commitAsync()
method on the consumer is used, depending on the syncCommits
container property.
public interface AcknowledgingMessageListener<K, V> { void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment); } public interface Acknowledgment { void acknowledge(); }
This gives the listener control over when offsets are committed.
The @KafkaListener
annotation provides a mechanism for simple POJO listeners:
public class Listener { @KafkaListener(id = "foo", topics = "myTopic") public void listen(String data) { ... } }
This mechanism requires a listener container factory, which is used to configure the underlying
ConcurrentMessageListenerContainer
: by default, a bean with name kafkaListenerContainerFactory
is expected.
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); ... return props; }
Notice that to set container properties, you must use the getContainerProperties()
method on the factory.
It is used as a template for the actual properties injected into the container.
You can also configure POJO listeners with explicit topics and partitions (and, optionally, their initial offsets):
@KafkaListener(id = "bar", topicPartitions = { @TopicPartition(topic = "topic1", partitions = { "0", "1" }), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) }) public void listen(ConsumerRecord<?, ?> record) { ... }
Each partition can be specified in the partitions
or partitionOffsets
attribute, but not both.
When using manual AckMode
, the listener can also be provided with the Acknowledgment
; this example also shows
how to use a different container factory.
@KafkaListener(id = "baz", topics = "myTopic", containerFactory = "kafkaManualAckListenerContainerFactory") public void listen(String data, Acknowledgment ack) { ... ack.acknowledge(); }
Finally, metadata about the message is available from message headers:
@KafkaListener(id = "qux", topicPattern = "myTopic1") public void listen(@Payload String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { ... }
In certain scenarios, such as rebalancing, a message may be redelivered that has already been processed. The framework cannot know whether such a message has been processed or not, that is an application-level function. This is known as the Idempotent Receiver pattern and Spring Integration provides an implementation thereof.
The Spring for Apache Kafka project also provides some assistance by means of the FilteringMessageListenerAdapter
class, which can wrap your MessageListener
.
This class takes an implementation of RecordFilterStrategy
where you implement the filter
method to signal
that a message is a duplicate and should be discarded.
A FilteringAcknowledgingMessageListenerAdapter
is also provided for wrapping an AcknowledgingMessageListener
.
This has an additional property ackDiscarded
which indicates whether the adapter should acknowledge the discarded record; it is true
by default.
When using @KafkaListener
, set the RecordFilterStrategy
(and optionally ackDiscarded
) on the container factory and the listener will be wrapped in the appropriate filtering adapter.
If your listener throws an exception, the default behavior is to invoke the ErrorHandler
, if configured, or logged otherwise.
To retry deliveries, convenient listener adapters - RetryingMessageListenerAdapter
and RetryingAcknowledgingMessageListenerAdapter
are provided, depending on whether you are using a MessageListener
or an AcknowledgingMessageListener
.
These can be configured with a RetryTemplate
and RecoveryCallback<Void>
- see the spring-retry
project for information about these components.
If a recovery callback is not provided, the exception is thrown to the container after retries are exhausted.
In that case, the ErrorHandler
will be invoked, if configured, or logged otherwise.
When using @KafkaListener
, set the RetryTemplate
(and optionally recoveryCallback
) on the container factory and the listener will be wrapped in the appropriate retrying adapter.
Apache Kafka provides a high-level API for serializing/deserializing record values as well as their keys.
It is present with the org.apache.kafka.common.serialization.Serializer<T>
and
org.apache.kafka.common.serialization.Deserializer<T>
abstractions with some built-in implementations.
Meanwhile we can specify simple (de)serializer classes using Producer and/or Consumer configuration properties, e.g.:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); ... props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
for more complex or particular cases, the KafkaConsumer
, and therefore KafkaProducer
, provides overloaded
constructors to accept (De)Serializer
instances for keys
and/or values
, respectively.
To meet this API, the DefaultKafkaProducerFactory
and DefaultKafkaConsumerFactory
also provide properties to allow
to inject a custom (De)Serializer
to target Producer
/Consumer
.
For this purpose Spring for Apache Kafka also provides JsonSerializer
/JsonDeserializer
implementations based on the
Jackson JSON processor.
When JsonSerializer
is pretty simple and just lets to write any Java object as a JSON byte[]
, the JsonDeserializer
requires an additional Class<?> targetType
argument to allow to deserializer consumed byte[]
to the proper target
object.
The JsonDeserializer
can be extended to the particular generic type, when the last one is resolved at runtime,
instead of compile-time additional type
argument:
JsonDeserializer<Bar> barDeserializer = new JsonDeserializer<>(Bar.class); ... JsonDeserializer<Foo> fooDeserializer = new JsonDeserializer<Foo>() { };
Both JsonSerializer
and JsonDeserializer
can be customized with provided ObjectMapper
.
Plus you can extend them to implement some particular configuration logic in the
configure(Map<String, ?> configs, boolean isKey)
method.
Although Serializer
/Deserializer
API is pretty simple and flexible from the low-level Kafka Consumer
and
Producer
perspective, it is not enough on the Messaging level, where KafkaTemplate
and @KafkaListener
are present.
To easy convert to/from org.springframework.messaging.Message
, Spring for Apache Kafka provides MessageConverter
abstraction with the MessagingMessageConverter
implementation and its StringJsonMessageConverter
customization.
The MessageConverter
can be injected into KafkaTemplate
instance directly and via
AbstractKafkaListenerContainerFactory
bean definition for the @KafkaListener.containerFactory()
property:
@Bean public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setMessageConverter(new StringJsonMessageConverter()); return factory; } ... @KafkaListener(topics = "jsonData", containerFactory = "kafkaJsonListenerContainerFactory") public void jsonListener(Foo foo) { ... }
While efficient, one problem with asynchronous consumers is detecting when they are idle - users might want to take some action if no messages arrive for some period of time.
You can configure the listener container to publish a ListenerContainerIdleEvent
when some time passes with no message delivery.
While the container is idle, an event will be published every idleEventInterval
milliseconds.
To configure this feature, set the idleEventInterval
on the container:
@Bean public KafKaMessageListenerContainer(ConnectionFactory connectionFactory) { ContainerProperties containerProps = new ContainerProperties("topic1", "topic2"); ... containerProps.setIdleEventInterval(60000L); ... KafKaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(...); return container; }
Or, for a @KafkaListener
…
@Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); ... factory.getContainerProperties().setIdleEventInterval(60000L); ... return factory; }
In each of these cases, an event will be published once per minute while the container is idle.
You can capture these events by implementing ApplicationListener
- either a general listener, or one narrowed to only receive this specific event.
You can also use @EventListener
, introduced in Spring Framework 4.2.
The following example combines the @KafkaListener
and @EventListener
into a single class.
It’s important to understand that the application listener will get events for all containers so you may need to
check the listener id if you want to take specific action based on which container is idle.
You can also use the @EventListener
condition
for this purpose.
The events have 4 properties:
source
- the listener container instance
id
- the listener id (or container bean name)
idleTime
- the time the container had been idle when the event was published
topicPartitions
- the topics/partitions that the container was assigned at the time the event was generated
public class Listener { @KafkaListener(id = "qux", topics = "annotated") public void listen4(@Payload String foo, Acknowledgment ack) { ... } @EventListener(condition = "event.listenerId.startsWith('qux-')") public void eventHandler(ListenerContainerIdleEvent event) { this.event = event; eventLatch.countDown(); } }
Important | |
---|---|
Event listeners will see events for all containers; so, in the example above, we narrow the events received based on the listener ID.
Since containers created for the |
Caution | |
---|---|
If you wish to use the idle event to stop the lister container, you should not call |
When using Log Compaction, it is possible to send and receive messages with null
payloads which identifies the deletion of a key.
Starting with version 1.0.3, this is now fully supported.
To send a null
payload using the KafkaTemplate
simply pass null into the value argument of the send()
methods.
One exception to this is the send(Message<?> message)
variant.
Since spring-messaging
Message<?>
cannot have a null
payload, a special payload type KafkaNull
is used and the framework will send null
.
For convenience, the static KafkaNull.INSTANCE
is provided.
When using a message listener container, the received ConsumerRecord
will have a null
value()
.
To configure the @KafkaListener
to handle null
payloads, you must use the @Payload
annotation with required = false
; you will usually also need the key so your application knows which key was "deleted":
@KafkaListener(id = "deletableListener", topics = "myTopic") public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) { // value == null represents key deletion }
When using a class-level @KafkaListener
, some additional configuration is needed - a @KafkaHandler
method with a KafkaNull
payload:
@KafkaListener(id = "multi", topics = "myTopic") static class MultiListenerBean { private final CountDownLatch latch1 = new CountDownLatch(2); @KafkaHandler public void listen(String foo) { ... } @KafkaHandler public void listen(Integer bar) { ... } @KafkaHandler public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) { ... } }
The spring-kafka-test
jar contains some useful utilities to assist with testing your applications.
o.s.kafka.test.utils.KafkaUtils
provides some static methods to set up producer and consumer properties:
/** * Set up test properties for an {@code <Integer, String>} consumer. * @param group the group id. * @param autoCommit the auto commit. * @param embeddedKafka a {@link KafkaEmbedded} instance. * @return the properties. */ public static Map<String, Object> consumerProps(String group, String autoCommit, KafkaEmbedded embeddedKafka) { ... } /** * Set up test properties for an {@code <Integer, String>} producer. * @param embeddedKafka a {@link KafkaEmbedded} instance. * @return the properties. */ public static Map<String, Object> senderProps(KafkaEmbedded embeddedKafka) { ... }
A JUnit @Rule
is provided that creates an embedded kafka server.
/** * Create embedded Kafka brokers. * @param count the number of brokers. * @param controlledShutdown passed into TestUtils.createBrokerConfig. * @param topics the topics to create (2 partitions per). */ public KafkaEmbedded(int count, boolean controlledShutdown, String... topics) { ... } /** * * Create embedded Kafka brokers. * @param count the number of brokers. * @param controlledShutdown passed into TestUtils.createBrokerConfig. * @param partitions partitions per topic. * @param topics the topics to create. */ public KafkaEmbedded(int count, boolean controlledShutdown, int partitions, String... topics) { ... }
The embedded kafka class has a utility method allowing you to consume for all the topics it created:
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka); DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>( consumerProps); Consumer<Integer, String> consumer = cf.createConsumer(); embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
The KafkaTestUtils
has some utility methods to fetch results from the consumer:
/** * Poll the consumer, expecting a single record for the specified topic. * @param consumer the consumer. * @param topic the topic. * @return the record. * @throws org.junit.ComparisonFailure if exactly one record is not received. */ public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... } /** * Poll the consumer for records. * @param consumer the consumer. * @return the records. */ public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }
Usage:
... template.sendDefault(0, 2, "bar"); ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic"); ...
When the embedded server is started by JUnit, it sets a system property spring.embedded.kafka.brokers
to the address
of the broker(s).
A convenient constant KafkaEmbedded.SPRING_EMBEDDED_KAFKA_BROKERS
is provided for this property.
The o.s.kafka.test.hamcrest.KafkaMatchers
provides the following matchers:
/** * @param key the key * @param <K> the type. * @return a Matcher that matches the key in a consumer record. */ public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... } /** * @param value the value. * @param <V> the type. * @return a Matcher that matches the value in a consumer record. */ public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... } /** * @param partition the partition. * @return a Matcher that matches the partition in a consumer record. */ public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }
/** * @param key the key * @param <K> the type. * @return a Condition that matches the key in a consumer record. */ public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... } /** * @param value the value. * @param <V> the type. * @return a Condition that matches the value in a consumer record. */ public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... } /** * @param partition the partition. * @return a Condition that matches the partition in a consumer record. */ public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }
Putting it all together:
public class KafkaTemplateTests { private static final String TEMPLATE_TOPIC = "templateTopic"; @ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEMPLATE_TOPIC); @Test public void testTemplate() throws Exception { Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka); DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, TEMPLATE_TOPIC); final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>(); container.setMessageListener(new MessageListener<Integer, String>() { @Override public void onMessage(ConsumerRecord<Integer, String> record) { System.out.println(record); records.add(record); } }); container.setBeanName("templateTests"); container.start(); ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka); ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps); KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf); template.setDefaultTopic(TEMPLATE_TOPIC); template.sendDefault("foo"); assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo")); template.sendDefault(0, 2, "bar"); ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS); assertThat(received, hasKey(2)); assertThat(received, hasPartition(0)); assertThat(received, hasValue("bar")); template.send(TEMPLATE_TOPIC, 0, 2, "baz"); received = records.poll(10, TimeUnit.SECONDS); assertThat(received, hasKey(2)); assertThat(received, hasPartition(0)); assertThat(received, hasValue("baz")); } }
The above uses the hamcrest matchers; with AssertJ
, the final part looks like this…
... assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo")); template.sendDefault(0, 2, "bar"); ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS); assertThat(received).has(key(2)); assertThat(received).has(partition(0)); assertThat(received).has(value("bar")); template.send(TEMPLATE_TOPIC, 0, 2, "baz"); received = records.poll(10, TimeUnit.SECONDS); assertThat(received).has(key(2)); assertThat(received).has(partition(0)); assertThat(received).has(value("baz")); } }