Class MessagingMessageConverter
- java.lang.Object
-
- org.springframework.kafka.support.converter.MessagingMessageConverter
-
- All Implemented Interfaces:
MessageConverter
,RecordMessageConverter
- Direct Known Subclasses:
JsonMessageConverter
,ProjectingMessageConverter
public class MessagingMessageConverter extends java.lang.Object implements RecordMessageConverter
A MessagingMessageConverter
implementation for a message listener that receives individual messages.Populates
KafkaHeaders
based on theConsumerRecord
onto the returned message.- Author:
- Marius Bogoevici, Gary Russell, Dariusz Szablinski, Biju Kunjummen
-
-
Field Summary
Fields Modifier and Type Field Description protected org.springframework.core.log.LogAccessor
logger
-
Constructor Summary
Constructors Constructor Description MessagingMessageConverter()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected java.lang.Object
convertPayload(org.springframework.messaging.Message<?> message)
Subclasses can convert the payload; by default, it's sent unchanged to Kafka.protected java.lang.Object
extractAndConvertValue(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, java.lang.reflect.Type type)
Subclasses can convert the value; by default, it's returned as provided by Kafka unless there is aSmartMessageConverter
that can convert it.org.apache.kafka.clients.producer.ProducerRecord<?,?>
fromMessage(org.springframework.messaging.Message<?> messageArg, java.lang.String defaultTopic)
Convert a message to a producer record.protected org.springframework.messaging.converter.MessageConverter
getMessagingConverter()
protected org.apache.kafka.common.header.Headers
initialRecordHeaders(org.springframework.messaging.Message<?> message)
Subclasses can populate additional headers before they are mapped.void
setGenerateMessageId(boolean generateMessageId)
GenerateMessage
ids
for produced messages.void
setGenerateTimestamp(boolean generateTimestamp)
Generatetimestamp
for produced messages.void
setHeaderMapper(KafkaHeaderMapper headerMapper)
Set the header mapper to map headers.void
setMessagingConverter(org.springframework.messaging.converter.SmartMessageConverter messagingConverter)
Set a spring-messagingSmartMessageConverter
to convert the record value to the desired type.void
setRawRecordHeader(boolean rawRecordHeader)
Set to true to add the rawConsumerRecord
as a headerKafkaHeaders.RAW_DATA
.org.springframework.messaging.Message<?>
toMessage(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer, java.lang.reflect.Type type)
Convert aConsumerRecord
to aMessage
.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.springframework.kafka.support.converter.MessageConverter
commonHeaders
-
-
-
-
Method Detail
-
setGenerateMessageId
public void setGenerateMessageId(boolean generateMessageId)
GenerateMessage
ids
for produced messages. If set tofalse
, will try to use a default value. By default set tofalse
.- Parameters:
generateMessageId
- true if a message id should be generated
-
setGenerateTimestamp
public void setGenerateTimestamp(boolean generateTimestamp)
Generatetimestamp
for produced messages. If set tofalse
, -1 is used instead. By default set tofalse
.- Parameters:
generateTimestamp
- true if a timestamp should be generated
-
setHeaderMapper
public void setHeaderMapper(KafkaHeaderMapper headerMapper)
Set the header mapper to map headers.- Parameters:
headerMapper
- the mapper.- Since:
- 1.3
-
setRawRecordHeader
public void setRawRecordHeader(boolean rawRecordHeader)
Set to true to add the rawConsumerRecord
as a headerKafkaHeaders.RAW_DATA
.- Parameters:
rawRecordHeader
- true to add the header.- Since:
- 2.7
-
getMessagingConverter
protected org.springframework.messaging.converter.MessageConverter getMessagingConverter()
-
setMessagingConverter
public void setMessagingConverter(org.springframework.messaging.converter.SmartMessageConverter messagingConverter)
Set a spring-messagingSmartMessageConverter
to convert the record value to the desired type. This will also cause theMessageHeaders.CONTENT_TYPE
to be converted to String when mapped inbound.IMPORTANT: This converter's
fromMessage(Message, String)
method is called for outbound conversion to aProducerRecord
with the message payload in theProducerRecord.value()
property.toMessage(ConsumerRecord, Acknowledgment, Consumer, Type)
is called for inbound conversion fromConsumerRecord
with the payload being theConsumerRecord.value()
property.The
MessageConverter.toMessage(Object, MessageHeaders)
method is called to create a new outboundMessage
from theMessage
passed tofromMessage(Message, String)
. Similarly, intoMessage(ConsumerRecord, Acknowledgment, Consumer, Type)
, after this converter has created a newMessage
from theConsumerRecord
theMessageConverter.fromMessage(Message, Class)
method is called and then the final inbound message is created with the newly converted payload.In either case, if the
SmartMessageConverter
returnsnull
, the original message is used.- Parameters:
messagingConverter
- the converter.- Since:
- 2.7.1
-
toMessage
public org.springframework.messaging.Message<?> toMessage(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer, java.lang.reflect.Type type)
Description copied from interface:RecordMessageConverter
Convert aConsumerRecord
to aMessage
.- Specified by:
toMessage
in interfaceRecordMessageConverter
- Parameters:
record
- the record.acknowledgment
- the acknowledgment.consumer
- the consumertype
- the required payload type.- Returns:
- the message.
-
fromMessage
public org.apache.kafka.clients.producer.ProducerRecord<?,?> fromMessage(org.springframework.messaging.Message<?> messageArg, java.lang.String defaultTopic)
Description copied from interface:RecordMessageConverter
Convert a message to a producer record.- Specified by:
fromMessage
in interfaceRecordMessageConverter
- Parameters:
messageArg
- the message.defaultTopic
- the default topic to use if no header found.- Returns:
- the producer record.
-
initialRecordHeaders
protected org.apache.kafka.common.header.Headers initialRecordHeaders(org.springframework.messaging.Message<?> message)
Subclasses can populate additional headers before they are mapped.- Parameters:
message
- the message.- Returns:
- the headers
- Since:
- 2.1
-
convertPayload
protected java.lang.Object convertPayload(org.springframework.messaging.Message<?> message)
Subclasses can convert the payload; by default, it's sent unchanged to Kafka.- Parameters:
message
- the message.- Returns:
- the payload.
-
extractAndConvertValue
protected java.lang.Object extractAndConvertValue(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, java.lang.reflect.Type type)
Subclasses can convert the value; by default, it's returned as provided by Kafka unless there is aSmartMessageConverter
that can convert it.- Parameters:
record
- the record.type
- the required type.- Returns:
- the value.
-
-