Class MessagingMessageConverter
- All Implemented Interfaces:
MessageConverter,RecordMessageConverter
- Direct Known Subclasses:
JsonMessageConverter,ProjectingMessageConverter
MessageConverter implementation for a message listener that
receives individual messages.
Populates KafkaHeaders based on the ConsumerRecord onto the returned
message.
- Author:
- Marius Bogoevici, Gary Russell, Dariusz Szablinski, Biju Kunjummen
-
Field Summary
Fields -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionprotected ObjectconvertPayload(Message<?> message) Subclasses can convert the payload; by default, it's sent unchanged to Kafka.protected ObjectextractAndConvertValue(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, Type type) Subclasses can convert the value; by default, it's returned as provided by Kafka unless there is aSmartMessageConverterthat can convert it.org.apache.kafka.clients.producer.ProducerRecord<?,?> fromMessage(Message<?> messageArg, String defaultTopic) Convert a message to a producer record.protected MessageConverterprotected org.apache.kafka.common.header.HeadersinitialRecordHeaders(Message<?> message) Subclasses can populate additional headers before they are mapped.voidsetGenerateMessageId(boolean generateMessageId) GenerateMessageidsfor produced messages.voidsetGenerateTimestamp(boolean generateTimestamp) Generatetimestampfor produced messages.voidsetHeaderMapper(KafkaHeaderMapper headerMapper) Set the header mapper to map headers.voidsetMessagingConverter(SmartMessageConverter messagingConverter) Set a spring-messagingSmartMessageConverterto convert the record value to the desired type.voidsetRawRecordHeader(boolean rawRecordHeader) Set to true to add the rawConsumerRecordas a headerKafkaHeaders.RAW_DATA.Message<?>toMessage(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, Type type) Convert aConsumerRecordto aMessage.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.springframework.kafka.support.converter.MessageConverter
commonHeaders
-
Field Details
-
logger
-
-
Constructor Details
-
MessagingMessageConverter
public MessagingMessageConverter()
-
-
Method Details
-
setGenerateMessageId
public void setGenerateMessageId(boolean generateMessageId) GenerateMessageidsfor produced messages. If set tofalse, will try to use a default value. By default set tofalse.- Parameters:
generateMessageId- true if a message id should be generated
-
setGenerateTimestamp
public void setGenerateTimestamp(boolean generateTimestamp) Generatetimestampfor produced messages. If set tofalse, -1 is used instead. By default set tofalse.- Parameters:
generateTimestamp- true if a timestamp should be generated
-
setHeaderMapper
Set the header mapper to map headers.- Parameters:
headerMapper- the mapper.- Since:
- 1.3
-
setRawRecordHeader
public void setRawRecordHeader(boolean rawRecordHeader) Set to true to add the rawConsumerRecordas a headerKafkaHeaders.RAW_DATA.- Parameters:
rawRecordHeader- true to add the header.- Since:
- 2.7
-
getMessagingConverter
-
setMessagingConverter
Set a spring-messagingSmartMessageConverterto convert the record value to the desired type. This will also cause theMessageHeaders.CONTENT_TYPEto be converted to String when mapped inbound.IMPORTANT: This converter's
fromMessage(Message, String)method is called for outbound conversion to aProducerRecordwith the message payload in theProducerRecord.value()property.toMessage(ConsumerRecord, Acknowledgment, Consumer, Type)is called for inbound conversion fromConsumerRecordwith the payload being theConsumerRecord.value()property.The
MessageConverter.toMessage(Object, MessageHeaders)method is called to create a new outboundMessagefrom theMessagepassed tofromMessage(Message, String). Similarly, intoMessage(ConsumerRecord, Acknowledgment, Consumer, Type), after this converter has created a newMessagefrom theConsumerRecordtheMessageConverter.fromMessage(Message, Class)method is called and then the final inbound message is created with the newly converted payload.In either case, if the
SmartMessageConverterreturnsnull, the original message is used.- Parameters:
messagingConverter- the converter.- Since:
- 2.7.1
-
toMessage
public Message<?> toMessage(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, Type type) Description copied from interface:RecordMessageConverterConvert aConsumerRecordto aMessage.- Specified by:
toMessagein interfaceRecordMessageConverter- Parameters:
record- the record.acknowledgment- the acknowledgment.consumer- the consumertype- the required payload type.- Returns:
- the message.
-
fromMessage
public org.apache.kafka.clients.producer.ProducerRecord<?,?> fromMessage(Message<?> messageArg, String defaultTopic) Description copied from interface:RecordMessageConverterConvert a message to a producer record.- Specified by:
fromMessagein interfaceRecordMessageConverter- Parameters:
messageArg- the message.defaultTopic- the default topic to use if no header found.- Returns:
- the producer record.
-
initialRecordHeaders
Subclasses can populate additional headers before they are mapped.- Parameters:
message- the message.- Returns:
- the headers
- Since:
- 2.1
-
convertPayload
Subclasses can convert the payload; by default, it's sent unchanged to Kafka.- Parameters:
message- the message.- Returns:
- the payload.
-
extractAndConvertValue
protected Object extractAndConvertValue(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, Type type) Subclasses can convert the value; by default, it's returned as provided by Kafka unless there is aSmartMessageConverterthat can convert it.- Parameters:
record- the record.type- the required type.- Returns:
- the value.
-