Class AggregatingReplyingKafkaTemplate<K,V,R>

java.lang.Object
org.springframework.kafka.core.KafkaTemplate<K,V>
org.springframework.kafka.requestreply.ReplyingKafkaTemplate<K,V,Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>
org.springframework.kafka.requestreply.AggregatingReplyingKafkaTemplate<K,V,R>
Type Parameters:
K - the key type.
V - the outbound data type.
R - the reply data type.
All Implemented Interfaces:
EventListener, Aware, BeanNameAware, DisposableBean, InitializingBean, ApplicationContextAware, ApplicationListener<ContextStoppedEvent>, Lifecycle, Phased, SmartLifecycle, KafkaOperations<K,V>, BatchConsumerAwareMessageListener<K,Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>, BatchMessageListener<K,Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>, GenericMessageListener<List<org.apache.kafka.clients.consumer.ConsumerRecord<K,Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>>>, ReplyingKafkaOperations<K,V,Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>

public class AggregatingReplyingKafkaTemplate<K,V,R> extends ReplyingKafkaTemplate<K,V,Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>> implements BatchConsumerAwareMessageListener<K,Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>
A replying template that aggregates multiple replies with the same correlation id.
Since:
2.3
Author:
Gary Russell
  • Field Details

    • AGGREGATED_RESULTS_TOPIC

      public static final String AGGREGATED_RESULTS_TOPIC
      Pseudo topic name for the "outer" ConsumerRecord that has the aggregated results in its value after a normal release by the release strategy.
      See Also:
    • PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC

      public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC
      Pseudo topic name for the "outer" ConsumerRecord that has the aggregated results in its value after a timeout.
      See Also:
  • Constructor Details

    • AggregatingReplyingKafkaTemplate

      public AggregatingReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>> replyContainer, BiPredicate<List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>,Boolean> releaseStrategy)
      Construct an instance using the provided parameter arguments. The releaseStrategy is consulted to determine when a collection is "complete".
      Parameters:
      producerFactory - the producer factory.
      replyContainer - the reply container.
      releaseStrategy - the release strategy which is a BiPredicate which is passed the current list and a boolean to indicate if this is for a normal delivery or a timeout (when setReturnPartialOnTimeout(boolean) is true. The predicate may modify the list of records.
      Since:
      2.3.5
  • Method Details

    • setCommitTimeout

      public void setCommitTimeout(Duration commitTimeout)
      Set the timeout to use when committing offsets.
      Parameters:
      commitTimeout - the timeout.
    • setReturnPartialOnTimeout

      public void setReturnPartialOnTimeout(boolean returnPartialOnTimeout)
      Set to true to return a partial result when a request times out.
      Parameters:
      returnPartialOnTimeout - true to return a partial result.
    • onMessage

      public void onMessage(List<org.apache.kafka.clients.consumer.ConsumerRecord<K,Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>> data, org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
      Description copied from interface: GenericMessageListener
      Invoked with data from kafka and provides access to the Consumer. The default implementation throws UnsupportedOperationException.
      Specified by:
      onMessage in interface BatchConsumerAwareMessageListener<K,V>
      Specified by:
      onMessage in interface GenericMessageListener<K>
      Parameters:
      data - the data to be processed.
      consumer - the consumer.
    • handleTimeout

      protected boolean handleTimeout(CorrelationKey correlationId, RequestReplyFuture<K,V,Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>> future)
      Description copied from class: ReplyingKafkaTemplate
      Used to inform subclasses that a request has timed out so they can clean up state and, optionally, complete the future.
      Overrides:
      handleTimeout in class ReplyingKafkaTemplate<K,V,Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>
      Parameters:
      correlationId - the correlation id.
      future - the future.
      Returns:
      true to indicate the future has been completed.