Quick Tour
Prerequisites: You must install and run Apache Kafka.
Then you must put the Spring for Apache Kafka (spring-kafka
) JAR and all of its dependencies on your classpath.
The easiest way to do that is to declare a dependency in your build tool.
If you are not using Spring Boot, declare the spring-kafka
jar as a dependency in your project.
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.1</version>
</dependency>
compile 'org.springframework.kafka:spring-kafka:3.1.1'
When using Spring Boot, (and you haven’t used start.spring.io to create your project), omit the version and Boot will automatically bring in the correct version that is compatible with your Boot version: |
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
implementation 'org.springframework.kafka:spring-kafka'
However, the quickest way to get started is to use start.spring.io (or the wizards in Spring Tool Suits and Intellij IDEA) and create a project, selecting 'Spring for Apache Kafka' as a dependency.
Compatibility
This quick tour works with the following versions:
-
Apache Kafka Clients 3.6.x
-
Spring Framework 6.1.x
-
Minimum Java version: 17
Getting Started
The simplest way to get started is to use start.spring.io (or the wizards in Spring Tool Suits and Intellij IDEA) and create a project, selecting 'Spring for Apache Kafka' as a dependency. Refer to the Spring Boot documentation for more information about its opinionated auto configuration of the infrastructure beans.
Here is a minimal consumer application.
Spring Boot Consumer App
-
Java
-
Kotlin
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
System.out.println(in);
}
}
@SpringBootApplication
class Application {
@Bean
fun topic() = NewTopic("topic1", 10, 1)
@KafkaListener(id = "myId", topics = ["topic1"])
fun listen(value: String?) {
println(value)
}
}
fun main(args: Array<String>) = runApplication<Application>(*args)
spring.kafka.consumer.auto-offset-reset=earliest
The NewTopic
bean causes the topic to be created on the broker; it is not needed if the topic already exists.
Spring Boot Producer App
-
Java
-
Kotlin
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic1", "test");
};
}
}
@SpringBootApplication
class Application {
@Bean
fun topic() = NewTopic("topic1", 10, 1)
@Bean
fun runner(template: KafkaTemplate<String?, String?>) =
ApplicationRunner { template.send("topic1", "test") }
companion object {
@JvmStatic
fun main(args: Array<String>) = runApplication<Application>(*args)
}
}
With Java Configuration (No Spring Boot)
Spring for Apache Kafka is designed to be used in a Spring Application Context.
For example, if you create the listener container yourself outside of a Spring context, not all functions will work unless you satisfy all of the ...Aware interfaces that the container implements.
|
Here is an example of an application that does not use Spring Boot; it has both a Consumer
and Producer
.
-
Java
-
Kotlin
public class Sender {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
context.getBean(Sender.class).send("test", 42);
}
private final KafkaTemplate<Integer, String> template;
public Sender(KafkaTemplate<Integer, String> template) {
this.template = template;
}
public void send(String toSend, int key) {
this.template.send("topic1", key, toSend);
}
}
public class Listener {
@KafkaListener(id = "listen1", topics = "topic1")
public void listen1(String in) {
System.out.println(in);
}
}
@Configuration
@EnableKafka
public class Config {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// ...
return props;
}
@Bean
public Sender sender(KafkaTemplate<Integer, String> template) {
return new Sender(template);
}
@Bean
public Listener listener() {
return new Listener();
}
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(senderProps());
}
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//...
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
class Sender(private val template: KafkaTemplate<Int, String>) {
fun send(toSend: String, key: Int) {
template.send("topic1", key, toSend)
}
}
class Listener {
@KafkaListener(id = "listen1", topics = ["topic1"])
fun listen1(`in`: String) {
println(`in`)
}
}
@Configuration
@EnableKafka
class Config {
@Bean
fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Int, String>) =
ConcurrentKafkaListenerContainerFactory<Int, String>().also { it.consumerFactory = consumerFactory }
@Bean
fun consumerFactory() = DefaultKafkaConsumerFactory<Int, String>(consumerProps)
val consumerProps = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG to "group",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to IntegerDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest"
)
@Bean
fun sender(template: KafkaTemplate<Int, String>) = Sender(template)
@Bean
fun listener() = Listener()
@Bean
fun producerFactory() = DefaultKafkaProducerFactory<Int, String>(senderProps)
val senderProps = mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
ProducerConfig.LINGER_MS_CONFIG to 10,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to IntegerSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
)
@Bean
fun kafkaTemplate(producerFactory: ProducerFactory<Int, String>) = KafkaTemplate(producerFactory)
}
As you can see, you have to define several infrastructure beans when not using Spring Boot.