This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.3.3! |
Wiring Spring Beans into Producer/Consumer Interceptors
Apache Kafka provides a mechanism to add interceptors to producers and consumers.
These objects are managed by Kafka, not Spring, and so normal Spring dependency injection won’t work for wiring in dependent Spring Beans.
However, you can manually wire in those dependencies using the interceptor config()
The following Spring Boot application shows how to do this by overriding Spring Boot’s default factories to add some dependent bean into the configuration properties.
public class Application {
public static void main(String[] args) {, args);
public ConsumerFactory<?, ?> kafkaConsumerFactory(SomeBean someBean) {
Map<String, Object> consumerProperties = new HashMap<>();
// consumerProperties.put(..., ...)
// ...
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
consumerProperties.put("some.bean", someBean);
return new DefaultKafkaConsumerFactory<>(consumerProperties);
public ProducerFactory<?, ?> kafkaProducerFactory(SomeBean someBean) {
Map<String, Object> producerProperties = new HashMap<>();
// producerProperties.put(..., ...)
// ...
Map<String, Object> producerProperties = properties.buildProducerProperties();
producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
producerProperties.put("some.bean", someBean);
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(producerProperties);
return factory;
public SomeBean someBean() {
return new SomeBean();
@KafkaListener(id = "kgk897", topics = "kgh897")
public void listen(String in) {
System.out.println("Received " + in);
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("kgh897", "test");
public NewTopic kRequests() {
public class SomeBean {
public void someMethod(String what) {
System.out.println(what + " in my foo bean");
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
private SomeBean bean;
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
public void close() {
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private SomeBean bean;
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
this.bean.someMethod("consumer interceptor");
return records;
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
public void close() {
producer interceptor in my foo bean
consumer interceptor in my foo bean
Received test