Class MessagingMessageConverter
- java.lang.Object
-
- org.springframework.kafka.support.converter.MessagingMessageConverter
-
- All Implemented Interfaces:
MessageConverter,RecordMessageConverter
- Direct Known Subclasses:
JsonMessageConverter,ProjectingMessageConverter
public class MessagingMessageConverter extends java.lang.Object implements RecordMessageConverter
A MessagingMessageConverterimplementation for a message listener that receives individual messages.Populates
KafkaHeadersbased on theConsumerRecordonto the returned message.- Author:
- Marius Bogoevici, Gary Russell, Dariusz Szablinski, Biju Kunjummen
-
-
Field Summary
Fields Modifier and Type Field Description protected org.springframework.core.log.LogAccessorlogger
-
Constructor Summary
Constructors Constructor Description MessagingMessageConverter()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected java.lang.ObjectconvertPayload(org.springframework.messaging.Message<?> message)Subclasses can convert the payload; by default, it's sent unchanged to Kafka.protected java.lang.ObjectextractAndConvertValue(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, java.lang.reflect.Type type)Subclasses can convert the value; by default, it's returned as provided by Kafka unless there is aSmartMessageConverterthat can convert it.org.apache.kafka.clients.producer.ProducerRecord<?,?>fromMessage(org.springframework.messaging.Message<?> messageArg, java.lang.String defaultTopic)Convert a message to a producer record.protected org.springframework.messaging.converter.MessageConvertergetMessagingConverter()protected org.apache.kafka.common.header.HeadersinitialRecordHeaders(org.springframework.messaging.Message<?> message)Subclasses can populate additional headers before they are mapped.voidsetGenerateMessageId(boolean generateMessageId)GenerateMessageidsfor produced messages.voidsetGenerateTimestamp(boolean generateTimestamp)Generatetimestampfor produced messages.voidsetHeaderMapper(KafkaHeaderMapper headerMapper)Set the header mapper to map headers.voidsetMessagingConverter(org.springframework.messaging.converter.SmartMessageConverter messagingConverter)Set a spring-messagingSmartMessageConverterto convert the record value to the desired type.voidsetRawRecordHeader(boolean rawRecordHeader)Set to true to add the rawConsumerRecordas a headerKafkaHeaders.RAW_DATA.org.springframework.messaging.Message<?>toMessage(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer, java.lang.reflect.Type type)Convert aConsumerRecordto aMessage.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.springframework.kafka.support.converter.MessageConverter
commonHeaders
-
-
-
-
Method Detail
-
setGenerateMessageId
public void setGenerateMessageId(boolean generateMessageId)
GenerateMessageidsfor produced messages. If set tofalse, will try to use a default value. By default set tofalse.- Parameters:
generateMessageId- true if a message id should be generated
-
setGenerateTimestamp
public void setGenerateTimestamp(boolean generateTimestamp)
Generatetimestampfor produced messages. If set tofalse, -1 is used instead. By default set tofalse.- Parameters:
generateTimestamp- true if a timestamp should be generated
-
setHeaderMapper
public void setHeaderMapper(KafkaHeaderMapper headerMapper)
Set the header mapper to map headers.- Parameters:
headerMapper- the mapper.- Since:
- 1.3
-
setRawRecordHeader
public void setRawRecordHeader(boolean rawRecordHeader)
Set to true to add the rawConsumerRecordas a headerKafkaHeaders.RAW_DATA.- Parameters:
rawRecordHeader- true to add the header.- Since:
- 2.7
-
getMessagingConverter
protected org.springframework.messaging.converter.MessageConverter getMessagingConverter()
-
setMessagingConverter
public void setMessagingConverter(org.springframework.messaging.converter.SmartMessageConverter messagingConverter)
Set a spring-messagingSmartMessageConverterto convert the record value to the desired type. This will also cause theMessageHeaders.CONTENT_TYPEto be converted to String when mapped inbound.IMPORTANT: This converter's
fromMessage(Message, String)method is called for outbound conversion to aProducerRecordwith the message payload in theProducerRecord.value()property.toMessage(ConsumerRecord, Acknowledgment, Consumer, Type)is called for inbound conversion fromConsumerRecordwith the payload being theConsumerRecord.value()property.The
MessageConverter.toMessage(Object, MessageHeaders)method is called to create a new outboundMessagefrom theMessagepassed tofromMessage(Message, String). Similarly, intoMessage(ConsumerRecord, Acknowledgment, Consumer, Type), after this converter has created a newMessagefrom theConsumerRecordtheMessageConverter.fromMessage(Message, Class)method is called and then the final inbound message is created with the newly converted payload.In either case, if the
SmartMessageConverterreturnsnull, the original message is used.- Parameters:
messagingConverter- the converter.- Since:
- 2.7.1
-
toMessage
public org.springframework.messaging.Message<?> toMessage(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer, java.lang.reflect.Type type)Description copied from interface:RecordMessageConverterConvert aConsumerRecordto aMessage.- Specified by:
toMessagein interfaceRecordMessageConverter- Parameters:
record- the record.acknowledgment- the acknowledgment.consumer- the consumertype- the required payload type.- Returns:
- the message.
-
fromMessage
public org.apache.kafka.clients.producer.ProducerRecord<?,?> fromMessage(org.springframework.messaging.Message<?> messageArg, java.lang.String defaultTopic)Description copied from interface:RecordMessageConverterConvert a message to a producer record.- Specified by:
fromMessagein interfaceRecordMessageConverter- Parameters:
messageArg- the message.defaultTopic- the default topic to use if no header found.- Returns:
- the producer record.
-
initialRecordHeaders
protected org.apache.kafka.common.header.Headers initialRecordHeaders(org.springframework.messaging.Message<?> message)
Subclasses can populate additional headers before they are mapped.- Parameters:
message- the message.- Returns:
- the headers
- Since:
- 2.1
-
convertPayload
protected java.lang.Object convertPayload(org.springframework.messaging.Message<?> message)
Subclasses can convert the payload; by default, it's sent unchanged to Kafka.- Parameters:
message- the message.- Returns:
- the payload.
-
extractAndConvertValue
protected java.lang.Object extractAndConvertValue(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, java.lang.reflect.Type type)Subclasses can convert the value; by default, it's returned as provided by Kafka unless there is aSmartMessageConverterthat can convert it.- Parameters:
record- the record.type- the required type.- Returns:
- the value.
-
-