Producer Interceptor Managed in Spring
Starting with version 3.0.0, when it comes to a producer interceptor, you can let Spring manage it directly as a bean instead of providing the class name of the interceptor to the Apache Kafka producer configuration.
If you go with this approach, then you need to set this producer interceptor on KafkaTemplate
.
Following is an example using the same MyProducerInterceptor
from above, but changed to not use the internal config property.
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
private final SomeBean bean;
public MyProducerInterceptor(SomeBean bean) {
this.bean = bean;
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
@Bean
public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {
return new MyProducerInterceptor(someBean);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);
kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
}
Right before the records are sent, the onSend
method of the producer interceptor is invoked.
Once the server sends an acknowledgement on publishing the data, then the onAcknowledgement
method is invoked.
The onAcknowledgement
is called right before the producer invokes any user callbacks.
If you have multiple such producer interceptors managed through Spring that need to be applied on the KafkaTemplate
, you need to use CompositeProducerInterceptor
instead.
CompositeProducerInterceptor
allows individual producer interceptors to be added in order.
The methods from the underlying ProducerInterceptor
implementations are invoked in the order as they were added to the CompositeProducerInterceptor
.