For the latest stable version, please use Spring for Apache Kafka 3.3.0! |
Forwarding Listener Results using @SendTo
Starting with version 2.0, if you also annotate a @KafkaListener
with a @SendTo
annotation and the method invocation returns a result, the result is forwarded to the topic specified by the @SendTo
.
The @SendTo
value can have several forms:
-
@SendTo("someTopic")
routes to the literal topic. -
@SendTo("#{someExpression}")
routes to the topic determined by evaluating the expression once during application context initialization. -
@SendTo("!{someExpression}")
routes to the topic determined by evaluating the expression at runtime. The#root
object for the evaluation has three properties:-
request
: The inboundConsumerRecord
(orConsumerRecords
object for a batch listener). -
source
: Theorg.springframework.messaging.Message<?>
converted from therequest
. -
result
: The method return result.
-
-
@SendTo
(no properties): This is treated as!{source.headers['kafka_replyTopic']}
(since version 2.1.3).
Starting with versions 2.1.11 and 2.2.1, property placeholders are resolved within @SendTo
values.
The result of the expression evaluation must be a String
that represents the topic name.
The following examples show the various ways to use @SendTo
:
@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
...
}
@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
...
}
@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {
@KafkaHandler
public String foo(String in) {
...
}
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
In order to support @SendTo , the listener container factory must be provided with a KafkaTemplate (in its replyTemplate property), which is used to send the reply.
This should be a KafkaTemplate and not a ReplyingKafkaTemplate which is used on the client-side for request/reply processing.
When using Spring Boot, it will auto-configure the template into the factory; when configuring your own factory, it must be set as shown in the examples below.
|
Starting with version 2.2, you can add a ReplyHeadersConfigurer
to the listener container factory.
This is consulted to determine which headers you want to set in the reply message.
The following example shows how to add a ReplyHeadersConfigurer
:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
return factory;
}
You can also add more headers if you wish. The following example shows how to do so:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
@Override
public boolean shouldCopy(String headerName, Object headerValue) {
return false;
}
@Override
public Map<String, Object> additionalHeaders() {
return Collections.singletonMap("qux", "fiz");
}
});
return factory;
}
When you use @SendTo
, you must configure the ConcurrentKafkaListenerContainerFactory
with a KafkaTemplate
in its replyTemplate
property to perform the send.
Spring Boot will automatically wire in its auto-configured template (or any if a single instance is present).
Unless you use request/reply semantics, only the simple send(topic, value) method is used, so you may wish to create a subclass to generate the partition or key.
The following example shows how to do so:
|
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory()) {
@Override
public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
return super.send(topic, partitionForData(data), keyForData(data), data);
}
...
};
}
If the listener method returns
|
When using request/reply semantics, the target partition can be requested by the sender.
You can annotate a
See Handling Exceptions for more information. |
If a listener method returns an Iterable , by default a record for each element as the value is sent.
Starting with version 2.3.5, set the splitIterables property on @KafkaListener to false and the entire result will be sent as the value of a single ProducerRecord .
This requires a suitable serializer in the reply template’s producer configuration.
However, if the reply is Iterable<Message<?>> the property is ignored and each message is sent separately.
|