This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.3.0!

Producer Interceptor Managed in Spring

Starting with version 3.0.0, when it comes to a producer interceptor, you can let Spring manage it directly as a bean instead of providing the class name of the interceptor to the Apache Kafka producer configuration. If you go with this approach, then you need to set this producer interceptor on KafkaTemplate. Following is an example using the same MyProducerInterceptor from above, but changed to not use the internal config property.

public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

    private final SomeBean bean;

    public MyProducerInterceptor(SomeBean bean) {
        this.bean = bean;
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        this.bean.someMethod("producer interceptor");
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {
    }

}
@Bean
public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {
  return new MyProducerInterceptor(someBean);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
   KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);
   kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
}

Right before the records are sent, the onSend method of the producer interceptor is invoked. Once the server sends an acknowledgement on publishing the data, then the onAcknowledgement method is invoked. The onAcknowledgement is called right before the producer invokes any user callbacks.

If you have multiple such producer interceptors managed through Spring that need to be applied on the KafkaTemplate, you need to use CompositeProducerInterceptor instead. CompositeProducerInterceptor allows individual producer interceptors to be added in order. The methods from the underlying ProducerInterceptor implementations are invoked in the order as they were added to the CompositeProducerInterceptor.