Class MessagingMessageConverter

java.lang.Object
org.springframework.kafka.support.converter.MessagingMessageConverter
All Implemented Interfaces:
MessageConverter, RecordMessageConverter
Direct Known Subclasses:
JsonMessageConverter, ProjectingMessageConverter

public class MessagingMessageConverter extends Object implements RecordMessageConverter
A Messaging MessageConverter implementation for a message listener that receives individual messages.

Populates KafkaHeaders based on the ConsumerRecord onto the returned message.

Author:
Marius Bogoevici, Gary Russell, Dariusz Szablinski, Biju Kunjummen
  • Field Details

  • Constructor Details

    • MessagingMessageConverter

      public MessagingMessageConverter()
      Construct an instance that uses the KafkaHeaders.PARTITION to determine the target partition.
    • MessagingMessageConverter

      public MessagingMessageConverter(Function<Message<?>,Integer> partitionProvider)
      Construct an instance that uses the supplied partition provider function. The function can return null to delegate the partition selection to the kafka client.
      Parameters:
      partitionProvider - the provider.
      Since:
      3.0.8
  • Method Details

    • setGenerateMessageId

      public void setGenerateMessageId(boolean generateMessageId)
      Generate Message ids for produced messages. If set to false, will try to use a default value. By default set to false.
      Parameters:
      generateMessageId - true if a message id should be generated
    • setGenerateTimestamp

      public void setGenerateTimestamp(boolean generateTimestamp)
      Generate timestamp for produced messages. If set to false, -1 is used instead. By default set to false.
      Parameters:
      generateTimestamp - true if a timestamp should be generated
    • setHeaderMapper

      public void setHeaderMapper(KafkaHeaderMapper headerMapper)
      Set the header mapper to map headers.
      Parameters:
      headerMapper - the mapper.
      Since:
      1.3
    • setRawRecordHeader

      public void setRawRecordHeader(boolean rawRecordHeader)
      Set to true to add the raw ConsumerRecord as a header KafkaHeaders.RAW_DATA.
      Parameters:
      rawRecordHeader - true to add the header.
      Since:
      2.7
    • getMessagingConverter

      protected MessageConverter getMessagingConverter()
    • setMessagingConverter

      public void setMessagingConverter(SmartMessageConverter messagingConverter)
      Set a spring-messaging SmartMessageConverter to convert the record value to the desired type. This will also cause the MessageHeaders.CONTENT_TYPE to be converted to String when mapped inbound.

      IMPORTANT: This converter's fromMessage(Message, String) method is called for outbound conversion to a ProducerRecord with the message payload in the ProducerRecord.value() property. toMessage(ConsumerRecord, Acknowledgment, Consumer, Type) is called for inbound conversion from ConsumerRecord with the payload being the ConsumerRecord.value() property.

      The MessageConverter.toMessage(Object, MessageHeaders) method is called to create a new outbound Message from the Message passed to fromMessage(Message, String). Similarly, in toMessage(ConsumerRecord, Acknowledgment, Consumer, Type), after this converter has created a new Message from the ConsumerRecord the MessageConverter.fromMessage(Message, Class) method is called and then the final inbound message is created with the newly converted payload.

      In either case, if the SmartMessageConverter returns null, the original message is used.

      Parameters:
      messagingConverter - the converter.
      Since:
      2.7.1
    • toMessage

      public Message<?> toMessage(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer, Type type)
      Description copied from interface: RecordMessageConverter
      Convert a ConsumerRecord to a Message.
      Specified by:
      toMessage in interface RecordMessageConverter
      Parameters:
      record - the record.
      acknowledgment - the acknowledgment.
      consumer - the consumer
      type - the required payload type.
      Returns:
      the message.
    • fromMessage

      public org.apache.kafka.clients.producer.ProducerRecord<?,?> fromMessage(Message<?> messageArg, String defaultTopic)
      Description copied from interface: RecordMessageConverter
      Convert a message to a producer record.
      Specified by:
      fromMessage in interface RecordMessageConverter
      Parameters:
      messageArg - the message.
      defaultTopic - the default topic to use if no header found.
      Returns:
      the producer record.
    • initialRecordHeaders

      protected org.apache.kafka.common.header.Headers initialRecordHeaders(Message<?> message)
      Subclasses can populate additional headers before they are mapped.
      Parameters:
      message - the message.
      Returns:
      the headers
      Since:
      2.1
    • convertPayload

      protected Object convertPayload(Message<?> message)
      Subclasses can convert the payload; by default, it's sent unchanged to Kafka.
      Parameters:
      message - the message.
      Returns:
      the payload.
    • extractAndConvertValue

      protected Object extractAndConvertValue(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, Type type)
      Subclasses can convert the value; by default, it's returned as provided by Kafka unless there is a SmartMessageConverter that can convert it.
      Parameters:
      record - the record.
      type - the required type.
      Returns:
      the value.