Sending Messages

This section covers how to send messages.

Using KafkaTemplate

This section covers how to use KafkaTemplate to send messages.

Overview

The KafkaTemplate wraps a producer and provides convenience methods to send data to Kafka topics. The following listing shows the relevant methods from KafkaTemplate:

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

// Flush the producer.
void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

interface OperationsCallback<K, V, T> {

    T doInOperations(KafkaOperations<K, V> operations);

}

See the Javadoc for more detail.

In version 3.0, the methods that previously returned ListenableFuture have been changed to return CompletableFuture. To facilitate the migration, the 2.9 version added a method usingCompletableFuture() which provided the same methods with CompletableFuture return types; this method is no longer available.

The sendDefault API requires that a default topic has been provided to the template.

The API takes in a timestamp as a parameter and stores this timestamp in the record. How the user-provided timestamp is stored depends on the timestamp type configured on the Kafka topic. If the topic is configured to use CREATE_TIME, the user-specified timestamp is recorded (or generated if not specified). If the topic is configured to use LOG_APPEND_TIME, the user-specified timestamp is ignored and the broker adds in the local broker time.

The metrics and partitionsFor methods delegate to the same methods on the underlying Producer. The execute method provides direct access to the underlying Producer.

To use the template, you can configure a producer factory and provide it in the template’s constructor. The following example shows how to do so:

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

Starting with version 2.5, you can now override the factory’s ProducerConfig properties to create templates with different producer configurations from the same factory.

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

Note that a bean of type ProducerFactory<?, ?> (such as the one auto-configured by Spring Boot) can be referenced with different narrowed generic types.

You can also configure the template by using standard <bean/> definitions.

Then, to use the template, you can invoke one of its methods.

When you use the methods with a Message<?> parameter, the topic, partition, key and timestamp information is provided in a message header that includes the following items:

  • KafkaHeaders.TOPIC

  • KafkaHeaders.PARTITION

  • KafkaHeaders.KEY

  • KafkaHeaders.TIMESTAMP

The message payload is the data.

Optionally, you can configure the KafkaTemplate with a ProducerListener to get an asynchronous callback with the results of the send (success or failure) instead of waiting for the Future to complete. The following listing shows the definition of the ProducerListener interface:

public interface ProducerListener<K, V> {

    void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);

    void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
            Exception exception);

}

By default, the template is configured with a LoggingProducerListener, which logs errors and does nothing when the send is successful.

For convenience, default method implementations are provided in case you want to implement only one of the methods.

Notice that the send methods return a CompletableFuture<SendResult>. You can register a callback with the listener to receive the result of the send asynchronously. The following example shows how to do so:

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});

SendResult has two properties, a ProducerRecord and RecordMetadata. See the Kafka API documentation for information about those objects.

The Throwable can be cast to a KafkaProducerException; its failedProducerRecord property contains the failed record.

If you wish to block the sending thread to await the result, you can invoke the future’s get() method; using the method with a timeout is recommended. If you have set a linger.ms, you may wish to invoke flush() before waiting or, for convenience, the template has a constructor with an autoFlush parameter that causes the template to flush() on each send. Flushing is only needed if you have set the linger.ms producer property and want to immediately send a partial batch.

Examples

This section shows examples of sending messages to Kafka:

Example 1. Non Blocking (Async)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    CompletableFuture<SendResult<Integer, String>> future = template.send(record);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            handleSuccess(data);
        }
        else {
            handleFailure(data, record, ex);
        }
    });
}
Blocking (Sync)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

Note that the cause of the ExecutionException is KafkaProducerException with the failedProducerRecord property.

Using RoutingKafkaTemplate

Starting with version 2.5, you can use a RoutingKafkaTemplate to select the producer at runtime, based on the destination topic name.

The routing template does not support transactions, execute, flush, or metrics operations because the topic is not known for those operations.

The template requires a map of java.util.regex.Pattern to ProducerFactory<Object, Object> instances. This map should be ordered (e.g. a LinkedHashMap) because it is traversed in order; you should add more specific patterns at the beginning.

The following simple Spring Boot application provides an example of how to use the same template to send to different topics, each using a different value serializer.

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
            ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}

The corresponding @KafkaListeners for this example are shown in Annotation Properties.

For another technique to achieve similar results, but with the additional capability of sending different types to the same topic, see Delegating Serializer and Deserializer.

Using DefaultKafkaProducerFactory

As seen in Using KafkaTemplate, a ProducerFactory is used to create the producer.

When not using Transactions, by default, the DefaultKafkaProducerFactory creates a singleton producer used by all clients, as recommended in the KafkaProducer JavaDocs. However, if you call flush() on the template, this can cause delays for other threads using the same producer. Starting with version 2.3, the DefaultKafkaProducerFactory has a new property producerPerThread. When set to true, the factory will create (and cache) a separate producer for each thread, to avoid this issue.

When producerPerThread is true, user code must call closeThreadBoundProducer() on the factory when the producer is no longer needed. This will physically close the producer and remove it from the ThreadLocal. Calling reset() or destroy() will not clean up these producers.

When creating a DefaultKafkaProducerFactory, key and/or value Serializer classes can be picked up from configuration by calling the constructor that only takes in a Map of properties (see example in Using KafkaTemplate), or Serializer instances may be passed to the DefaultKafkaProducerFactory constructor (in which case all Producer s share the same instances). Alternatively you can provide Supplier<Serializer>s (starting with version 2.3) that will be used to obtain separate Serializer instances for each Producer:

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

Starting with version 2.5.10, you can now update the producer properties after the factory is created. This might be useful, for example, if you have to update SSL key/trust store locations after a credentials change. The changes will not affect existing producer instances; call reset() to close any existing producers so that new producers will be created using the new properties. NOTE: You cannot change a transactional producer factory to non-transactional, and vice-versa.

Two new methods are now provided:

void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);

Starting with version 2.8, if you provide serializers as objects (in the constructor or via the setters), the factory will invoke the configure() method to configure them with the configuration properties.

Using ReplyingKafkaTemplate

Version 2.1.3 introduced a subclass of KafkaTemplate to provide request/reply semantics. The class is named ReplyingKafkaTemplate and has two additional methods; the following shows the method signatures:

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

The result is a CompletableFuture that is asynchronously populated with the result (or an exception, for a timeout). The result also has a sendFuture property, which is the result of calling KafkaTemplate.send(). You can use this future to determine the result of the send operation.

In version 3.0, the futures returned by these methods (and their sendFuture properties) have been changed to CompletableFutures instead of ListenableFutures.

If the first method is used, or the replyTimeout argument is null, the template’s defaultReplyTimeout property is used (5 seconds by default).

Starting with version 2.8.8, the template has a new method waitForAssignment. This is useful if the reply container is configured with auto.offset.reset=latest to avoid sending a request and a reply sent before the container is initialized.

When using manual partition assignment (no group management), the duration for the wait must be greater than the container’s pollTimeout property because the notification will not be sent until after the first poll is completed.

The following Spring Boot application shows an example of how to use the feature:

@SpringBootApplication
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean
    public NewTopic kReplies() {
        return TopicBuilder.name("kReplies")
            .partitions(10)
            .replicas(2)
            .build();
    }

}

Note that we can use Boot’s auto-configured container factory to create the reply container.

If a non-trivial deserializer is being used for replies, consider using an ErrorHandlingDeserializer that delegates to your configured deserializer. When so configured, the RequestReplyFuture will be completed exceptionally and you can catch the ExecutionException, with the DeserializationException in its cause property.

Starting with version 2.6.7, in addition to detecting DeserializationExceptions, the template will call the replyErrorChecker function, if provided. If it returns an exception, the future will be completed exceptionally.

Here is an example:

template.setReplyErrorChecker(record -> {
    Header error = record.headers().lastHeader("serverSentAnError");
    if (error != null) {
        return new MyException(new String(error.value()));
    }
    else {
        return null;
    }
});

...

RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
    future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
    ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
    ...
}
catch (InterruptedException e) {
    ...
}
catch (ExecutionException e) {
    if (e.getCause instanceof MyException) {
        ...
    }
}
catch (TimeoutException e) {
    ...
}

The template sets a header (named KafkaHeaders.CORRELATION_ID by default), which must be echoed back by the server side.

In this case, the following @KafkaListener application responds:

@SpringBootApplication
public class KReplyingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KReplyingApplication.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // use default replyTo expression
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean // not required if Jackson is on the classpath
    public MessagingMessageConverter simpleMapperConverter() {
        MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
        messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
        return messagingMessageConverter;
    }

}

The @KafkaListener infrastructure echoes the correlation ID and determines the reply topic.

See Forwarding Listener Results using @SendTo for more information about sending replies. The template uses the default header KafKaHeaders.REPLY_TOPIC to indicate the topic to which the reply goes.

Starting with version 2.2, the template tries to detect the reply topic or partition from the configured reply container. If the container is configured to listen to a single topic or a single TopicPartitionOffset, it is used to set the reply headers. If the container is configured otherwise, the user must set up the reply headers. In this case, an INFO log message is written during initialization. The following example uses KafkaHeaders.REPLY_TOPIC:

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));

When you configure with a single reply TopicPartitionOffset, you can use the same reply topic for multiple templates, as long as each instance listens on a different partition. When configuring with a single reply topic, each instance must use a different group.id. In this case, all instances receive each reply, but only the instance that sent the request finds the correlation ID. This may be useful for auto-scaling, but with the overhead of additional network traffic and the small cost of discarding each unwanted reply. When you use this setting, we recommend that you set the template’s sharedReplyTopic to true, which reduces the logging level of unexpected replies to DEBUG instead of the default ERROR.

The following is an example of configuring the reply container to use the same shared reply topic:

@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

    ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
    container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
    container.getContainerProperties().setKafkaConsumerProperties(props);
    return container;
}
If you have multiple client instances and you do not configure them as discussed in the preceding paragraph, each instance needs a dedicated reply topic. An alternative is to set the KafkaHeaders.REPLY_PARTITION and use a dedicated partition for each instance. The Header contains a four-byte int (big-endian). The server must use this header to route the reply to the correct partition (@KafkaListener does this). In this case, though, the reply container must not use Kafka’s group management feature and must be configured to listen on a fixed partition (by using a TopicPartitionOffset in its ContainerProperties constructor).
The DefaultKafkaHeaderMapper requires Jackson to be on the classpath (for the @KafkaListener). If it is not available, the message converter has no header mapper, so you must configure a MessagingMessageConverter with a SimpleKafkaHeaderMapper, as shown earlier.

By default, 3 headers are used:

  • KafkaHeaders.CORRELATION_ID - used to correlate the reply to a request

  • KafkaHeaders.REPLY_TOPIC - used to tell the server where to reply

  • KafkaHeaders.REPLY_PARTITION - (optional) used to tell the server which partition to reply to

These header names are used by the @KafkaListener infrastructure to route the reply.

Starting with version 2.3, you can customize the header names - the template has 3 properties correlationHeaderName, replyTopicHeaderName, and replyPartitionHeaderName. This is useful if your server is not a Spring application (or does not use the @KafkaListener).

Conversely, if the requesting application is not a spring application and puts correlation information in a different header, starting with version 3.0, you can configure a custom correlationHeaderName on the listener container factory and that header will be echoed back. Previously, the listener had to echo custom correlation headers.

Request/Reply with Message<?>s

Version 2.7 added methods to the ReplyingKafkaTemplate to send and receive spring-messaging 's Message<?> abstraction:

RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);

<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
        ParameterizedTypeReference<P> returnType);

These will use the template’s default replyTimeout, there are also overloaded versions that can take a timeout in the method call.

In version 3.0, the futures returned by these methods (and their sendFuture properties) have been changed to CompletableFutures instead of ListenableFutures.

Use the first method if the consumer’s Deserializer or the template’s MessageConverter can convert the payload without any additional information, either via configuration or type metadata in the reply message.

Use the second method if you need to provide type information for the return type, to assist the message converter. This also allows the same template to receive different types, even if there is no type metadata in the replies, such as when the server side is not a Spring application. The following is an example of the latter:

Template Bean
  • Java

  • Kotlin

@Bean
ReplyingKafkaTemplate<String, String, String> template(
        ProducerFactory<String, String> pf,
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> replyContainer =
            factory.createContainer("replies");
    replyContainer.getContainerProperties().setGroupId("request.replies");
    ReplyingKafkaTemplate<String, String, String> template =
            new ReplyingKafkaTemplate<>(pf, replyContainer);
    template.setMessageConverter(new ByteArrayJsonMessageConverter());
    template.setDefaultTopic("requests");
    return template;
}
@Bean
fun template(
    pf: ProducerFactory<String?, String>?,
    factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
    val replyContainer = factory.createContainer("replies")
    replyContainer.containerProperties.groupId = "request.replies"
    val template = ReplyingKafkaTemplate(pf, replyContainer)
    template.messageConverter = ByteArrayJsonMessageConverter()
    template.defaultTopic = "requests"
    return template
}
Using the template
  • Java

  • Kotlin

RequestReplyTypedMessageFuture<String, String, Thing> future1 =
        template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
                new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());

RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
        template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
                new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
        object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())

val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
        object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })

Reply Type Message<?>

When the @KafkaListener returns a Message<?>, with versions before 2.5, it was necessary to populate the reply topic and correlation id headers. In this example, we use the reply topic header from the request:

@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .build();
}

This also shows how to set a key on the reply record.

Starting with version 2.5, the framework will detect if these headers are missing and populate them with the topic - either the topic determined from the @SendTo value or the incoming KafkaHeaders.REPLY_TOPIC header (if present). It will also echo the incoming KafkaHeaders.CORRELATION_ID and KafkaHeaders.REPLY_PARTITION, if present.

@KafkaListener(id = "requestor", topics = "request")
@SendTo  // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.KEY, 42)
            .build();
}

Aggregating Multiple Replies

The template in Using ReplyingKafkaTemplate is strictly for a single request/reply scenario. For cases where multiple receivers of a single message return a reply, you can use the AggregatingReplyingKafkaTemplate. This is an implementation of the client-side of the Scatter-Gather Enterprise Integration Pattern.

Like the ReplyingKafkaTemplate, the AggregatingReplyingKafkaTemplate constructor takes a producer factory and a listener container to receive the replies; it has a third parameter BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy which is consulted each time a reply is received; when the predicate returns true, the collection of ConsumerRecords is used to complete the Future returned by the sendAndReceive method.

There is an additional property returnPartialOnTimeout (default false). When this is set to true, instead of completing the future with a KafkaReplyTimeoutException, a partial result completes the future normally (as long as at least one reply record has been received).

Starting with version 2.3.5, the predicate is also called after a timeout (if returnPartialOnTimeout is true). The first argument is the current list of records; the second is true if this call is due to a timeout. The predicate can modify the list of records.

AggregatingReplyingKafkaTemplate<Integer, String, String> template =
        new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
                        coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
        template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
        future.get(30, TimeUnit.SECONDS);

Notice that the return type is a ConsumerRecord with a value that is a collection of ConsumerRecords. The "outer" ConsumerRecord is not a "real" record, it is synthesized by the template, as a holder for the actual reply records received for the request. When a normal release occurs (release strategy returns true), the topic is set to aggregatedResults; if returnPartialOnTimeout is true, and timeout occurs (and at least one reply record has been received), the topic is set to partialResultsAfterTimeout. The template provides constant static variables for these "topic" names:

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a normal release by the release strategy.
 */
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a timeout.
 */
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";

The real ConsumerRecords in the Collection contain the actual topic(s) from which the replies are received.

The listener container for the replies must be configured with AckMode.MANUAL or AckMode.MANUAL_IMMEDIATE; the consumer property enable.auto.commit must be false (the default since version 2.3). To avoid any possibility of losing messages, the template only commits offsets when there are zero requests outstanding, i.e. when the last outstanding request is released by the release strategy. After a rebalance, it is possible for duplicate reply deliveries; these will be ignored for any in-flight requests; you may see error log messages when duplicate replies are received for already released replies.
If you use an ErrorHandlingDeserializer with this aggregating template, the framework will not automatically detect DeserializationExceptions. Instead, the record (with a null value) will be returned intact, with the deserialization exception(s) in headers. It is recommended that applications call the utility method ReplyingKafkaTemplate.checkDeserialization() method to determine if a deserialization exception occurred. See its JavaDocs for more information. The replyErrorChecker is also not called for this aggregating template; you should perform the checks on each element of the reply.