Class BatchMessagingMessageConverter

  • All Implemented Interfaces:
    BatchMessageConverter, MessageConverter

    public class BatchMessagingMessageConverter
    extends java.lang.Object
    implements BatchMessageConverter
    A Messaging MessageConverter implementation used with a batch message listener; the consumer record values are extracted into a collection in the message payload.

    Populates KafkaHeaders based on the ConsumerRecord onto the returned message. Each header is a collection where the position in the collection matches the payload position.

    If a RecordMessageConverter is provided, and the batch type is a ParameterizedType with a single generic type parameter, each record will be passed to the converter, thus supporting a method signature List<Foo> foos.

    Since:
    1.1
    Author:
    Marius Bogoevici, Gary Russell, Dariusz Szablinski, Biju Kunjummen
    • Field Summary

      Fields 
      Modifier and Type Field Description
      protected org.springframework.core.log.LogAccessor logger  
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      protected java.lang.Object convert​(org.apache.kafka.clients.consumer.ConsumerRecord<?,​?> record, java.lang.reflect.Type type, java.util.List<ConversionException> conversionFailures)
      Convert the record value.
      protected java.lang.Object extractAndConvertValue​(org.apache.kafka.clients.consumer.ConsumerRecord<?,​?> record, java.lang.reflect.Type type)
      Subclasses can convert the value; by default, it's returned as provided by Kafka unless a RecordMessageConverter has been provided.
      java.util.List<org.apache.kafka.clients.producer.ProducerRecord<?,​?>> fromMessage​(org.springframework.messaging.Message<?> message, java.lang.String defaultTopic)
      Convert a message to a producer record.
      RecordMessageConverter getRecordMessageConverter()
      Return the record converter used by this batch converter, if configured, or null.
      void setGenerateMessageId​(boolean generateMessageId)
      Generate Message ids for produced messages.
      void setGenerateTimestamp​(boolean generateTimestamp)
      Generate timestamp for produced messages.
      void setHeaderMapper​(KafkaHeaderMapper headerMapper)
      Set the header mapper to map headers.
      void setRawRecordHeader​(boolean rawRecordHeader)
      Set to true to add the raw List<ConsumerRecord<?, ?>> as a header KafkaHeaders.RAW_DATA.
      org.springframework.messaging.Message<?> toMessage​(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<?,​?>> records, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,​?> consumer, java.lang.reflect.Type type)
      Convert a list of ConsumerRecord to a Message.
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Field Detail

      • logger

        protected final org.springframework.core.log.LogAccessor logger
    • Constructor Detail

      • BatchMessagingMessageConverter

        public BatchMessagingMessageConverter()
        Create an instance that does not convert the record values.
      • BatchMessagingMessageConverter

        public BatchMessagingMessageConverter​(RecordMessageConverter recordConverter)
        Create an instance that converts record values using the supplied converter.
        Parameters:
        recordConverter - the converter.
        Since:
        1.3.2
    • Method Detail

      • setGenerateMessageId

        public void setGenerateMessageId​(boolean generateMessageId)
        Generate Message ids for produced messages. If set to false, will try to use a default value. By default set to false.
        Parameters:
        generateMessageId - true if a message id should be generated
      • setGenerateTimestamp

        public void setGenerateTimestamp​(boolean generateTimestamp)
        Generate timestamp for produced messages. If set to false, -1 is used instead. By default set to false.
        Parameters:
        generateTimestamp - true if a timestamp should be generated
      • setHeaderMapper

        public void setHeaderMapper​(KafkaHeaderMapper headerMapper)
        Set the header mapper to map headers.
        Parameters:
        headerMapper - the mapper.
        Since:
        1.3
      • setRawRecordHeader

        public void setRawRecordHeader​(boolean rawRecordHeader)
        Set to true to add the raw List<ConsumerRecord<?, ?>> as a header KafkaHeaders.RAW_DATA.
        Parameters:
        rawRecordHeader - true to add the header.
        Since:
        2.7
      • toMessage

        public org.springframework.messaging.Message<?> toMessage​(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<?,​?>> records,
                                                                  @Nullable
                                                                  Acknowledgment acknowledgment,
                                                                  org.apache.kafka.clients.consumer.Consumer<?,​?> consumer,
                                                                  java.lang.reflect.Type type)
        Description copied from interface: BatchMessageConverter
        Convert a list of ConsumerRecord to a Message.
        Specified by:
        toMessage in interface BatchMessageConverter
        Parameters:
        records - the records.
        acknowledgment - the acknowledgment.
        consumer - the consumer.
        type - the required payload type.
        Returns:
        the message.
      • fromMessage

        public java.util.List<org.apache.kafka.clients.producer.ProducerRecord<?,​?>> fromMessage​(org.springframework.messaging.Message<?> message,
                                                                                                       java.lang.String defaultTopic)
        Description copied from interface: BatchMessageConverter
        Convert a message to a producer record.
        Specified by:
        fromMessage in interface BatchMessageConverter
        Parameters:
        message - the message.
        defaultTopic - the default topic to use if no header found.
        Returns:
        the producer records.
      • extractAndConvertValue

        protected java.lang.Object extractAndConvertValue​(org.apache.kafka.clients.consumer.ConsumerRecord<?,​?> record,
                                                          java.lang.reflect.Type type)
        Subclasses can convert the value; by default, it's returned as provided by Kafka unless a RecordMessageConverter has been provided.
        Parameters:
        record - the record.
        type - the required type.
        Returns:
        the value.
      • convert

        protected java.lang.Object convert​(org.apache.kafka.clients.consumer.ConsumerRecord<?,​?> record,
                                           java.lang.reflect.Type type,
                                           java.util.List<ConversionException> conversionFailures)
        Convert the record value.
        Parameters:
        record - the record.
        type - the type - must be a ParameterizedType with a single generic type parameter.
        conversionFailures - Conversion failures.
        Returns:
        the converted payload.