Class BatchMessagingMessageConverter

java.lang.Object
org.springframework.kafka.support.converter.BatchMessagingMessageConverter
All Implemented Interfaces:
BatchMessageConverter, MessageConverter

public class BatchMessagingMessageConverter extends Object implements BatchMessageConverter
A Messaging MessageConverter implementation used with a batch message listener; the consumer record values are extracted into a collection in the message payload.

Populates KafkaHeaders based on the ConsumerRecord onto the returned message. Each header is a collection where the position in the collection matches the payload position.

If a RecordMessageConverter is provided, and the batch type is a ParameterizedType with a single generic type parameter, each record will be passed to the converter, thus supporting a method signature List<Foo> foos.

Since:
1.1
Author:
Marius Bogoevici, Gary Russell, Dariusz Szablinski, Biju Kunjummen, Sanghyeok An, Hope Kim, Borahm Lee
  • Field Details

  • Constructor Details

    • BatchMessagingMessageConverter

      public BatchMessagingMessageConverter()
      Create an instance that does not convert the record values.
    • BatchMessagingMessageConverter

      public BatchMessagingMessageConverter(@Nullable RecordMessageConverter recordConverter)
      Create an instance that converts record values using the supplied converter.
      Parameters:
      recordConverter - the converter.
      Since:
      1.3.2
  • Method Details

    • setGenerateMessageId

      public void setGenerateMessageId(boolean generateMessageId)
      Generate Message ids for produced messages. If set to false, will try to use a default value. By default set to false.
      Parameters:
      generateMessageId - true if a message id should be generated
    • setGenerateTimestamp

      public void setGenerateTimestamp(boolean generateTimestamp)
      Generate timestamp for produced messages. If set to false, -1 is used instead. By default set to false.
      Parameters:
      generateTimestamp - true if a timestamp should be generated
    • setHeaderMapper

      public void setHeaderMapper(KafkaHeaderMapper headerMapper)
      Set the header mapper to map headers.
      Parameters:
      headerMapper - the mapper.
      Since:
      1.3
    • getRecordMessageConverter

      @Nullable public RecordMessageConverter getRecordMessageConverter()
      Description copied from interface: BatchMessageConverter
      Return the record converter used by this batch converter, if configured, or null.
      Specified by:
      getRecordMessageConverter in interface BatchMessageConverter
      Returns:
      the converter or null.
    • setRawRecordHeader

      public void setRawRecordHeader(boolean rawRecordHeader)
      Set to true to add the raw List<ConsumerRecord<?, ?>> as a header KafkaHeaders.RAW_DATA.
      Parameters:
      rawRecordHeader - true to add the header.
      Since:
      2.7
    • toMessage

      public Message<?> toMessage(List<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>> records, @Nullable Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer, Type type)
      Description copied from interface: BatchMessageConverter
      Convert a list of ConsumerRecord to a Message.
      Specified by:
      toMessage in interface BatchMessageConverter
      Parameters:
      records - the records.
      acknowledgment - the acknowledgment.
      consumer - the consumer.
      type - the required payload type.
      Returns:
      the message.
    • fromMessage

      public List<org.apache.kafka.clients.producer.ProducerRecord<?,?>> fromMessage(Message<?> message, String defaultTopic)
      Description copied from interface: BatchMessageConverter
      Convert a message to a producer record.
      Specified by:
      fromMessage in interface BatchMessageConverter
      Parameters:
      message - the message.
      defaultTopic - the default topic to use if no header found.
      Returns:
      the producer records.
    • extractAndConvertValue

      protected Object extractAndConvertValue(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, Type type)
      Subclasses can convert the value; by default, it's returned as provided by Kafka unless a RecordMessageConverter has been provided.
      Parameters:
      record - the record.
      type - the required type.
      Returns:
      the value.
    • convert

      protected Object convert(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, Type type, List<ConversionException> conversionFailures)
      Convert the record value.
      Parameters:
      record - the record.
      type - the type - must be a ParameterizedType with a single generic type parameter.
      conversionFailures - Conversion failures.
      Returns:
      the converted payload.