This part of the reference documentation details the various components that comprise Spring for Apache Kafka. The main chapter covers the core classes to develop a Kafka application with Spring.
The KafkaTemplate
wraps a producer and provides convenience methods to send data to kafka topics.
Both asynchronous and synchronous methods are provided, with the async methods returning a Future
.
// Async methods Future<RecordMetadata> convertAndSend(V data); Future<RecordMetadata> convertAndSend(K key, V data); Future<RecordMetadata> convertAndSend(int partition, K key, V data); Future<RecordMetadata> convertAndSend(String topic, V data); Future<RecordMetadata> convertAndSend(String topic, K key, V data); Future<RecordMetadata> convertAndSend(String topic, int partition, K key, V data); // Sync methods RecordMetadata syncConvertAndSend(V data) throws InterruptedException, ExecutionException; RecordMetadata syncConvertAndSend(K key, V data) throws InterruptedException, ExecutionException; RecordMetadata syncConvertAndSend(int partition, K key, V data) throws InterruptedException, ExecutionException; RecordMetadata syncConvertAndSend(String topic, V data) throws InterruptedException, ExecutionException; RecordMetadata syncConvertAndSend(String topic, K key, V data) throws InterruptedException, ExecutionException; RecordMetadata syncConvertAndSend(String topic, int partition, K key, V data) throws InterruptedException, ExecutionException; // Flush the producer. void flush();
To use the template, configure a producer factory and provide it in the template’s constructor:
@Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); ... return props; } @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { return new KafkaTemplate<Integer, String>(producerFactory()); }
The template can also be configured using standard <bean/>
definitions.
Then, to use the template, simply invoke one of its methods.
Optionally, you can configure the KafkaTemplate
with a ProducerListener
to get an async callback with the
results of the send (success or failure) instead of waiting for the Future
to complete.
Messages can be received by configuring a MessageListenerContainer
and providing a MessageListener
, or by
using the @KafkaListener
annotation.
Two MessageListenerContainer
implementations are provided:
KafkaMessageListenerContainer
ConcurrentMessageListenerContainer
The KafkaMessageListenerContainer
receives all message from all topics/partitions on a single thread.
The ConcurrentMessageListenerContainer
delegates to 1 or more KafkaMessageListenerContainer
s to provide
multi-threaded consumption.
The following constructors are available.
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, TopicPartition... topicPartitions) public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, String... topics) public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, Pattern topicPattern)
Each takes a ConsumerFactory
and information about topics and partitions.
The first takes a list of TopicPartition
arguments to explicitly instruct the container which partitions to use
(using the consumer assign()
method).
The second takes a list of topics and Kafka allocates the partitions based on the group.id
property - distributing
partitions across the group.
The third is similar to the second, but uses a regex Pattern
to select the topics.
The constructors are similar to the KafkaListenerContainer
:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, TopicPartition... topicPartitions) public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, String... topics) public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, Pattern topicPattern)
It also has a property concurrency
, e.g. container.setConcurrency(3)
will create 3
KafkaMessageListenerContainer
s.
For the second and third container, kafka will distribute the partitions across the consumers.
For the first constructor, the ConcurrentMessageListenerContainer
distributes the TopicPartition
s across the
delegate KafkaMessageListenerContainer
s.
If, say, 6 TopicPartition
s are provided and the concurrency
is 3; each container will get 2 partitions.
For 5 TopicPartition
s, 2 containers will get 2 partitions and the third will get 1.
If the concurrency
is greater than the number of TopicPartitions
, the concurrency
will be adjusted down such that
each container will get one partition.
Several options are provided for committing offsets.
If the enable.auto.commit
consumer property is true, kafka will auto-commit the offsets according to its
configuration.
If it is false, the containers support the following AckMode
s.
The consumer poll()
method will return one or more ConsumerRecords
; the MessageListener
is called for each record;
the following describes the action taken by the container for each AckMode
:
commitAsync()
when the listener returns after processing the record.
commitAsync()
when all the records returned by the poll()
have been processed.
commitAsync()
when all the records returned by the poll()
have been processed as long as the ackTime
since the last commit has been exceeded.
commitAsync()
when all the records returned by the poll()
have been processed as long as ackCount
records have been received since the last commit.
AcknowledgingMessageListener
) is responsible to acknowledge()
the Acknowledgment
;
after which, the same semantics as COUNT_TIME
are applied.
commitAsync()`
immediately when the Acknowledgment.acknowledge()
method is called by the
listener - must be executed on the container’s thread.
Note | |
---|---|
|
public interface AcknowledgingMessageListener<K, V> { void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment); } public interface Acknowledgment { void acknowledge(); }
This gives the listener control over when offsets are committed.
The @KafkaListener
annotation provides a mechanism for simple POJO listeners:
public class Listener { @KafkaListener(id = "foo", topics = "myTopic") public void listen(String data) { ... } }
This mechanism requires a listener container factory, which is used to configure the underlying
ConcurrentMessageListenerContainer
: by default, a bean with name kafkaListenerContainerFactory
is expected.
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { SimpleKafkaListenerContainerFactory<Integer, String> factory = new SimpleKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); ... return props; }
You can also configure POJO listeners with explicit topics and partitions:
@KafkaListener(id = "bar", topicPartitions = { @TopicPartition(topic = "topic1", partitions = { "0", "1" }), @TopicPartition(topic = "topic2", partitions = { "0", "1" }) }) public void listen(ConsumerRecord<?, ?> record) { ... }
When using manual AckMode
, the listener can also be provided with the Acknowledgment
; this example also shows
how to use a different container factory.
@KafkaListener(id = "baz", topics = "myTopic", containerFactory = "kafkaManualAckListenerContainerFactory") public void listen(String data, Acknowledgment ack) { ... ack.acknowledge(); }
The spring-kafka-test
jar contains some useful utilities to assist with testing your applications.
o.s.kafka.test.utils.KafkaUtils
provides some static methods to set up producer and consumer properties:
/** * Set up test properties for an {@code <Integer, String>} consumer. * @param group the group id. * @param autoCommit the auto commit. * @param embeddedKafka a {@link KafkaEmbedded} instance. * @return the properties. */ public static Map<String, Object> consumerProps(String group, String autoCommit, KafkaEmbedded embeddedKafka) { ... } /** * Set up test properties for an {@code <Integer, String>} producer. * @param embeddedKafka a {@link KafkaEmbedded} instance. * @return the properties. */ public static Map<String, Object> senderProps(KafkaEmbedded embeddedKafka) { ... }
A JUnit @Rule
is provided that creates an embedded kafka server.
/** * Create embedded Kafka brokers. * @param count the number of brokers. * @param controlledShutdown passed into TestUtils.createBrokerConfig. * @param topics the topics to create (2 partitions per). */ public KafkaEmbedded(int count, boolean controlledShutdown, String... topics) { ... } /** * * Create embedded Kafka brokers. * @param count the number of brokers. * @param controlledShutdown passed into TestUtils.createBrokerConfig. * @param partitions partitions per topic. * @param topics the topics to create. */ public KafkaEmbedded(int count, boolean controlledShutdown, int partitions, String... topics) { ... }
The o.s.kafka.test.hamcrest.KafkaMatchers
provides the following matchers:
/** * @param key the key * @param <K> the type. * @return a Matcher that matches the key in a consumer record. */ public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... } /** * @param value the value. * @param <V> the type. * @return a Matcher that matches the value in a consumer record. */ public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... } /** * @param partition the partition. * @return a Matcher that matches the partition in a consumer record. */ public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }
/** * @param key the key * @param <K> the type. * @return a Condition that matches the key in a consumer record. */ public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... } /** * @param value the value. * @param <V> the type. * @return a Condition that matches the value in a consumer record. */ public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... } /** * @param partition the partition. * @return a Condition that matches the partition in a consumer record. */ public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }
Putting it all together:
public class KafkaTemplateTests { private static final String TEMPLATE_TOPIC = "templateTopic"; @ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEMPLATE_TOPIC); @Test public void testTemplate() throws Exception { Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka); DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, TEMPLATE_TOPIC); final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>(); container.setMessageListener(new MessageListener<Integer, String>() { @Override public void onMessage(ConsumerRecord<Integer, String> record) { System.out.println(record); records.add(record); } }); container.setBeanName("templateTests"); container.start(); ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka); ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps); KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf); template.setDefaultTopic(TEMPLATE_TOPIC); template.syncConvertAndSend("foo"); assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo")); template.syncConvertAndSend(0, 2, "bar"); ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS); assertThat(received, hasKey(2)); assertThat(received, hasPartition(0)); assertThat(received, hasValue("bar")); template.syncConvertAndSend(TEMPLATE_TOPIC, 0, 2, "baz"); received = records.poll(10, TimeUnit.SECONDS); assertThat(received, hasKey(2)); assertThat(received, hasPartition(0)); assertThat(received, hasValue("baz")); } }
The above uses the hamcrest matchers; with AssertJ
, the final part looks like this…
... assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo")); template.syncConvertAndSend(0, 2, "bar"); ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS); assertThat(received).has(key(2)); assertThat(received).has(partition(0)); assertThat(received).has(value("bar")); template.syncConvertAndSend(TEMPLATE_TOPIC, 0, 2, "baz"); received = records.poll(10, TimeUnit.SECONDS); assertThat(received).has(key(2)); assertThat(received).has(partition(0)); assertThat(received).has(value("baz")); } }