Class AggregatingReplyingKafkaTemplate<K,​V,​R>

  • Type Parameters:
    K - the key type.
    V - the outbound data type.
    R - the reply data type.
    All Implemented Interfaces:
    java.util.EventListener, org.springframework.beans.factory.Aware, org.springframework.beans.factory.BeanNameAware, org.springframework.beans.factory.DisposableBean, org.springframework.beans.factory.InitializingBean, org.springframework.context.ApplicationContextAware, org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>, org.springframework.context.Lifecycle, org.springframework.context.Phased, org.springframework.context.SmartLifecycle, KafkaOperations<K,​V>, BatchConsumerAwareMessageListener<K,​java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,​R>>>, BatchMessageListener<K,​java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,​R>>>, GenericMessageListener<java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,​java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,​R>>>>>, ReplyingKafkaOperations<K,​V,​java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,​R>>>

    public class AggregatingReplyingKafkaTemplate<K,​V,​R>
    extends ReplyingKafkaTemplate<K,​V,​java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,​R>>>
    implements BatchConsumerAwareMessageListener<K,​java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,​R>>>
    A replying template that aggregates multiple replies with the same correlation id.
    Since:
    2.3
    Author:
    Gary Russell
    • Field Detail

      • AGGREGATED_RESULTS_TOPIC

        public static final java.lang.String AGGREGATED_RESULTS_TOPIC
        Pseudo topic name for the "outer" ConsumerRecord that has the aggregated results in its value after a normal release by the release strategy.
        See Also:
        Constant Field Values
      • PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC

        public static final java.lang.String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC
        Pseudo topic name for the "outer" ConsumerRecord that has the aggregated results in its value after a timeout.
        See Also:
        Constant Field Values
    • Constructor Detail

      • AggregatingReplyingKafkaTemplate

        public AggregatingReplyingKafkaTemplate​(ProducerFactory<K,​V> producerFactory,
                                                GenericMessageListenerContainer<K,​java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,​R>>> replyContainer,
                                                java.util.function.BiPredicate<java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,​R>>,​java.lang.Boolean> releaseStrategy)
        Construct an instance using the provided parameter arguments. The releaseStrategy is consulted to determine when a collection is "complete".
        Parameters:
        producerFactory - the producer factory.
        replyContainer - the reply container.
        releaseStrategy - the release strategy which is a BiPredicate which is passed the current list and a boolean to indicate if this is for a normal delivery or a timeout (when setReturnPartialOnTimeout(boolean) is true. The predicate may modify the list of records.
        Since:
        2.3.5
    • Method Detail

      • setCommitTimeout

        public void setCommitTimeout​(java.time.Duration commitTimeout)
        Set the timeout to use when committing offsets.
        Parameters:
        commitTimeout - the timeout.
      • setReturnPartialOnTimeout

        public void setReturnPartialOnTimeout​(boolean returnPartialOnTimeout)
        Set to true to return a partial result when a request times out.
        Parameters:
        returnPartialOnTimeout - true to return a partial result.
      • onMessage

        public void onMessage​(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,​java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,​R>>>> data,
                              org.apache.kafka.clients.consumer.Consumer<?,​?> consumer)
        Description copied from interface: GenericMessageListener
        Invoked with data from kafka and provides access to the Consumer. The default implementation throws UnsupportedOperationException.
        Specified by:
        onMessage in interface BatchConsumerAwareMessageListener<K,​V>
        Specified by:
        onMessage in interface GenericMessageListener<K>
        Parameters:
        data - the data to be processed.
        consumer - the consumer.
      • handleTimeout

        protected boolean handleTimeout​(CorrelationKey correlationId,
                                        RequestReplyFuture<K,​V,​java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,​R>>> future)
        Description copied from class: ReplyingKafkaTemplate
        Used to inform subclasses that a request has timed out so they can clean up state and, optionally, complete the future.
        Overrides:
        handleTimeout in class ReplyingKafkaTemplate<K,​V,​java.util.Collection<org.apache.kafka.clients.consumer.ConsumerRecord<K,​R>>>
        Parameters:
        correlationId - the correlation id.
        future - the future.
        Returns:
        true to indicate the future has been completed.