3. Reference

This part of the reference documentation details the various components that comprise Spring for Apache Kafka. The main chapter covers the core classes to develop a Kafka application with Spring.

3.1 Using Spring for Apache Kafka

3.1.1 Sending Messages with the KafkaTemplate

The KafkaTemplate wraps a producer and provides convenience methods to send data to kafka topics. Both asynchronous and synchronous methods are provided, with the async methods returning a Future.

// Async methods

Future<RecordMetadata> convertAndSend(V data);

Future<RecordMetadata> convertAndSend(K key, V data);

Future<RecordMetadata> convertAndSend(int partition, K key, V data);

Future<RecordMetadata> convertAndSend(String topic, V data);

Future<RecordMetadata> convertAndSend(String topic, K key, V data);

Future<RecordMetadata> convertAndSend(String topic, int partition, K key, V data);


// Sync methods


RecordMetadata syncConvertAndSend(V data)
                                 throws InterruptedException, ExecutionException;

RecordMetadata syncConvertAndSend(K key, V data)
                                 throws InterruptedException, ExecutionException;

RecordMetadata syncConvertAndSend(int partition, K key, V data)
                                 throws InterruptedException, ExecutionException;

RecordMetadata syncConvertAndSend(String topic, V data)
                                 throws InterruptedException, ExecutionException;

RecordMetadata syncConvertAndSend(String topic, K key, V data)
                                 throws InterruptedException, ExecutionException;

RecordMetadata syncConvertAndSend(String topic, int partition, K key, V data)
                                 throws InterruptedException, ExecutionException;

// Flush the producer.

void flush();

To use the template, configure a producer factory and provide it in the template’s constructor:

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    ...
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

The template can also be configured using standard <bean/> definitions.

Then, to use the template, simply invoke one of its methods.

Optionally, you can configure the KafkaTemplate with a ProducerListener to get an async callback with the results of the send (success or failure) instead of waiting for the Future to complete.

3.1.2 Receiving Messages

Messages can be received by configuring a MessageListenerContainer and providing a MessageListener, or by using the @KafkaListener annotation.

Message Listener Containers

Two MessageListenerContainer implementations are provided:

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

The KafkaMessageListenerContainer receives all message from all topics/partitions on a single thread. The ConcurrentMessageListenerContainer delegates to 1 or more KafkaMessageListenerContainer s to provide multi-threaded consumption.

KafkaMessageListenerContainer

The following constructors are available.

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                                                        TopicPartition... topicPartitions)

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, String... topics)

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                                                        Pattern topicPattern)

Each takes a ConsumerFactory and information about topics and partitions.

The first takes a list of TopicPartition arguments to explicitly instruct the container which partitions to use (using the consumer assign() method). The second takes a list of topics and Kafka allocates the partitions based on the group.id property - distributing partitions across the group. The third is similar to the second, but uses a regex Pattern to select the topics.

ConcurrentMessageListenerContainer

The constructors are similar to the KafkaListenerContainer:

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, TopicPartition... topicPartitions)

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, String... topics)

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, Pattern topicPattern)

It also has a property concurrency, e.g. container.setConcurrency(3) will create 3 KafkaMessageListenerContainer s.

For the second and third container, kafka will distribute the partitions across the consumers. For the first constructor, the ConcurrentMessageListenerContainer distributes the TopicPartition s across the delegate KafkaMessageListenerContainer s.

If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.

Committing Offsets

Several options are provided for committing offsets. If the enable.auto.commit consumer property is true, kafka will auto-commit the offsets according to its configuration. If it is false, the containers support the following AckMode s.

The consumer poll() method will return one or more ConsumerRecords; the MessageListener is called for each record; the following describes the action taken by the container for each AckMode :

  • RECORD - call commitAsync() when the listener returns after processing the record.
  • BATCH - call commitAsync() when all the records returned by the poll() have been processed.
  • TIME - call commitAsync() when all the records returned by the poll() have been processed as long as the ackTime since the last commit has been exceeded.
  • COUNT - call commitAsync() when all the records returned by the poll() have been processed as long as ackCount records have been received since the last commit.
  • COUNT_TIME - similar to TIME and COUNT but the commit is performed if either condition is true.
  • MANUAL - the message listener (AcknowledgingMessageListener) is responsible to acknowledge() the Acknowledgment; after which, the same semantics as COUNT_TIME are applied.
  • MANUAL_IMMEDIATE - call commitAsync()` immediately when the Acknowledgment.acknowledge() method is called by the listener - must be executed on the container’s thread.
[Note]Note

MANUAL and MANUAL_IMMEDIATE require the listener to be an AcknowledgingMessageListener.

public interface AcknowledgingMessageListener<K, V> {

	void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment);

}

public interface Acknowledgment {

	void acknowledge();

}

This gives the listener control over when offsets are committed.

@KafkaListener Annotation

The @KafkaListener annotation provides a mechanism for simple POJO listeners:

public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic")
    public void listen(String data) {
        ...
    }

}

This mechanism requires a listener container factory, which is used to configure the underlying ConcurrentMessageListenerContainer: by default, a bean with name kafkaListenerContainerFactory is expected.

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                    kafkaListenerContainerFactory() {
    SimpleKafkaListenerContainerFactory<Integer, String> factory =
                            new SimpleKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    return factory;
}

@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
    ...
    return props;
}

You can also configure POJO listeners with explicit topics and partitions:

@KafkaListener(id = "bar", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = { "0", "1" })
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

When using manual AckMode, the listener can also be provided with the Acknowledgment; this example also shows how to use a different container factory.

@KafkaListener(id = "baz", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

3.2 Testing Applications

3.2.1 Introduction

The spring-kafka-test jar contains some useful utilities to assist with testing your applications.

3.2.2 JUnit

o.s.kafka.test.utils.KafkaUtils provides some static methods to set up producer and consumer properties:

/**
 * Set up test properties for an {@code <Integer, String>} consumer.
 * @param group the group id.
 * @param autoCommit the auto commit.
 * @param embeddedKafka a {@link KafkaEmbedded} instance.
 * @return the properties.
 */
public static Map<String, Object> consumerProps(String group, String autoCommit,
                                       KafkaEmbedded embeddedKafka) { ... }

/**
 * Set up test properties for an {@code <Integer, String>} producer.
 * @param embeddedKafka a {@link KafkaEmbedded} instance.
 * @return the properties.
 */
public static Map<String, Object> senderProps(KafkaEmbedded embeddedKafka) { ... }

A JUnit @Rule is provided that creates an embedded kafka server.

/**
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param topics the topics to create (2 partitions per).
 */
public KafkaEmbedded(int count, boolean controlledShutdown, String... topics) { ... }

/**
 *
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param partitions partitions per topic.
 * @param topics the topics to create.
 */
public KafkaEmbedded(int count, boolean controlledShutdown, int partitions, String... topics) { ... }

3.2.3 Hamcrest Matchers

The o.s.kafka.test.hamcrest.KafkaMatchers provides the following matchers:

/**
 * @param key the key
 * @param <K> the type.
 * @return a Matcher that matches the key in a consumer record.
 */
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Matcher that matches the value in a consumer record.
 */
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }

/**
 * @param partition the partition.
 * @return a Matcher that matches the partition in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }

3.2.4 AssertJ Conditions

/**
 * @param key the key
 * @param <K> the type.
 * @return a Condition that matches the key in a consumer record.
 */
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Condition that matches the value in a consumer record.
 */
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }

/**
 * @param partition the partition.
 * @return a Condition that matches the partition in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }

3.2.5 Example

Putting it all together:

public class KafkaTemplateTests {

    private static final String TEMPLATE_TOPIC = "templateTopic";

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEMPLATE_TOPIC);

    @Test
    public void testTemplate() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
        DefaultKafkaConsumerFactory<Integer, String> cf =
                            new DefaultKafkaConsumerFactory<Integer, String>(consumerProps);
        KafkaMessageListenerContainer<Integer, String> container =
                            new KafkaMessageListenerContainer<>(cf, TEMPLATE_TOPIC);
        final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
        container.setMessageListener(new MessageListener<Integer, String>() {

        	@Override
        	public void onMessage(ConsumerRecord<Integer, String> record) {
                System.out.println(record);
                records.add(record);
            }

        });
        container.setBeanName("templateTests");
        container.start();
        ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
        Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka);
        ProducerFactory<Integer, String> pf =
                             new DefaultKafkaProducerFactory<Integer, String>(senderProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        template.setDefaultTopic(TEMPLATE_TOPIC);
        template.syncConvertAndSend("foo");
        assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
        template.syncConvertAndSend(0, 2, "bar");
        ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("bar"));
        template.syncConvertAndSend(TEMPLATE_TOPIC, 0, 2, "baz");
        received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("baz"));
    }

}

The above uses the hamcrest matchers; with AssertJ, the final part looks like this…​

...
        assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
        template.syncConvertAndSend(0, 2, "bar");
        ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received).has(key(2));
        assertThat(received).has(partition(0));
        assertThat(received).has(value("bar"));
        template.syncConvertAndSend(TEMPLATE_TOPIC, 0, 2, "baz");
        received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received).has(key(2));
        assertThat(received).has(partition(0));
        assertThat(received).has(value("baz"));
    }
}