This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.2.4!

Quick Tour

Prerequisites: You must install and run Apache Kafka. Then you must put the Spring for Apache Kafka (spring-kafka) JAR and all of its dependencies on your classpath. The easiest way to do that is to declare a dependency in your build tool.

If you are not using Spring Boot, declare the spring-kafka jar as a dependency in your project.

compile 'org.springframework.kafka:spring-kafka:3.3.0-SNAPSHOT'
When using Spring Boot, (and you haven’t used to create your project), omit the version and Boot will automatically bring in the correct version that is compatible with your Boot version:
implementation 'org.springframework.kafka:spring-kafka'

However, the quickest way to get started is to use (or the wizards in Spring Tool Suits and Intellij IDEA) and create a project, selecting 'Spring for Apache Kafka' as a dependency.


This quick tour works with the following versions:

  • Apache Kafka Clients 3.7.x

  • Spring Framework 6.1.x

  • Minimum Java version: 17

Getting Started

The simplest way to get started is to use (or the wizards in Spring Tool Suits and Intellij IDEA) and create a project, selecting 'Spring for Apache Kafka' as a dependency. Refer to the Spring Boot documentation for more information about its opinionated auto configuration of the infrastructure beans.

Here is a minimal consumer application.

Spring Boot Consumer App

public class Application {

    public static void main(String[] args) {, args);

    public NewTopic topic() {

    @KafkaListener(id = "myId", topics = "topic1")
    public void listen(String in) {

class Application {

    fun topic() = NewTopic("topic1", 10, 1)

    @KafkaListener(id = "myId", topics = ["topic1"])
    fun listen(value: String?) {


fun main(args: Array<String>) = runApplication<Application>(*args)

The NewTopic bean causes the topic to be created on the broker; it is not needed if the topic already exists.

Spring Boot Producer App

public class Application {

    public static void main(String[] args) {, args);

    public NewTopic topic() {

    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("topic1", "test");

class Application {

    fun topic() = NewTopic("topic1", 10, 1)

    fun runner(template: KafkaTemplate<String?, String?>) =
        ApplicationRunner { template.send("topic1", "test") }

    companion object {
        fun main(args: Array<String>) = runApplication<Application>(*args)


With Java Configuration (No Spring Boot)

Spring for Apache Kafka is designed to be used in a Spring Application Context. For example, if you create the listener container yourself outside of a Spring context, not all functions will work unless you satisfy all of the ...Aware interfaces that the container implements.

Here is an example of an application that does not use Spring Boot; it has both a Consumer and Producer.

Without Spring Boot
public class Sender {

    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
        context.getBean(Sender.class).send("test", 42);

    private final KafkaTemplate<Integer, String> template;

    public Sender(KafkaTemplate<Integer, String> template) {
        this.template = template;

    public void send(String toSend, int key) {
        this.template.send("topic1", key, toSend);


public class Listener {

    @KafkaListener(id = "listen1", topics = "topic1")
    public void listen1(String in) {


public class Config {

    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        return factory;

    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProps());

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // ...
        return props;

    public Sender sender(KafkaTemplate<Integer, String> template) {
        return new Sender(template);

    public Listener listener() {
        return new Listener();

    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(senderProps());

    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;

    public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
        return new KafkaTemplate<>(producerFactory);

class Sender(private val template: KafkaTemplate<Int, String>) {

    fun send(toSend: String, key: Int) {
        template.send("topic1", key, toSend)


class Listener {

    @KafkaListener(id = "listen1", topics = ["topic1"])
    fun listen1(`in`: String) {


class Config {

    fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Int, String>) =
        ConcurrentKafkaListenerContainerFactory<Int, String>().also { it.consumerFactory = consumerFactory }

    fun consumerFactory() = DefaultKafkaConsumerFactory<Int, String>(consumerProps)

    val consumerProps = mapOf(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
        ConsumerConfig.GROUP_ID_CONFIG to "group",
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to,
        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest"

    fun sender(template: KafkaTemplate<Int, String>) = Sender(template)

    fun listener() = Listener()

    fun producerFactory() = DefaultKafkaProducerFactory<Int, String>(senderProps)

    val senderProps = mapOf(
        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
        ProducerConfig.LINGER_MS_CONFIG to 10,
        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to,

    fun kafkaTemplate(producerFactory: ProducerFactory<Int, String>) = KafkaTemplate(producerFactory)


As you can see, you have to define several infrastructure beans when not using Spring Boot.