Class MessagingMessageConverter
- All Implemented Interfaces:
MessageConverter
,RecordMessageConverter
- Direct Known Subclasses:
JsonMessageConverter
,ProjectingMessageConverter
MessageConverter
implementation for a message listener that
receives individual messages.
Populates KafkaHeaders
based on the ConsumerRecord
onto the returned
message.
- Author:
- Marius Bogoevici, Gary Russell, Dariusz Szablinski, Biju Kunjummen
-
Field Summary
-
Constructor Summary
ConstructorDescriptionConstruct an instance that uses theKafkaHeaders.PARTITION
to determine the target partition.MessagingMessageConverter
(Function<Message<?>, Integer> partitionProvider) Construct an instance that uses the supplied partition provider function. -
Method Summary
Modifier and TypeMethodDescriptionprotected Object
convertPayload
(Message<?> message) Subclasses can convert the payload; by default, it's sent unchanged to Kafka.protected Object
extractAndConvertValue
(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, Type type) Subclasses can convert the value; by default, it's returned as provided by Kafka unless there is aSmartMessageConverter
that can convert it.org.apache.kafka.clients.producer.ProducerRecord<?,
?> fromMessage
(Message<?> messageArg, String defaultTopic) Convert a message to a producer record.protected MessageConverter
protected org.apache.kafka.common.header.Headers
initialRecordHeaders
(Message<?> message) Subclasses can populate additional headers before they are mapped.void
setGenerateMessageId
(boolean generateMessageId) GenerateMessage
ids
for produced messages.void
setGenerateTimestamp
(boolean generateTimestamp) Generatetimestamp
for produced messages.void
setHeaderMapper
(KafkaHeaderMapper headerMapper) Set the header mapper to map headers.void
setMessagingConverter
(SmartMessageConverter messagingConverter) Set a spring-messagingSmartMessageConverter
to convert the record value to the desired type.void
setRawRecordHeader
(boolean rawRecordHeader) Set to true to add the rawConsumerRecord
as a headerKafkaHeaders.RAW_DATA
.Message<?>
toMessage
(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, Type type) Convert aConsumerRecord
to aMessage
.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.springframework.kafka.support.converter.MessageConverter
commonHeaders
-
Field Details
-
logger
-
-
Constructor Details
-
MessagingMessageConverter
public MessagingMessageConverter()Construct an instance that uses theKafkaHeaders.PARTITION
to determine the target partition. -
MessagingMessageConverter
Construct an instance that uses the supplied partition provider function. The function can return null to delegate the partition selection to the kafka client.- Parameters:
partitionProvider
- the provider.- Since:
- 3.0.8
-
-
Method Details
-
setGenerateMessageId
public void setGenerateMessageId(boolean generateMessageId) GenerateMessage
ids
for produced messages. If set tofalse
, will try to use a default value. By default set tofalse
.- Parameters:
generateMessageId
- true if a message id should be generated
-
setGenerateTimestamp
public void setGenerateTimestamp(boolean generateTimestamp) Generatetimestamp
for produced messages. If set tofalse
, -1 is used instead. By default set tofalse
.- Parameters:
generateTimestamp
- true if a timestamp should be generated
-
setHeaderMapper
Set the header mapper to map headers.- Parameters:
headerMapper
- the mapper.- Since:
- 1.3
-
setRawRecordHeader
public void setRawRecordHeader(boolean rawRecordHeader) Set to true to add the rawConsumerRecord
as a headerKafkaHeaders.RAW_DATA
.- Parameters:
rawRecordHeader
- true to add the header.- Since:
- 2.7
-
getMessagingConverter
-
setMessagingConverter
Set a spring-messagingSmartMessageConverter
to convert the record value to the desired type. This will also cause theMessageHeaders.CONTENT_TYPE
to be converted to String when mapped inbound.IMPORTANT: This converter's
fromMessage(Message, String)
method is called for outbound conversion to aProducerRecord
with the message payload in theProducerRecord.value()
property.toMessage(ConsumerRecord, Acknowledgment, Consumer, Type)
is called for inbound conversion fromConsumerRecord
with the payload being theConsumerRecord.value()
property.The
MessageConverter.toMessage(Object, MessageHeaders)
method is called to create a new outboundMessage
from theMessage
passed tofromMessage(Message, String)
. Similarly, intoMessage(ConsumerRecord, Acknowledgment, Consumer, Type)
, after this converter has created a newMessage
from theConsumerRecord
theMessageConverter.fromMessage(Message, Class)
method is called and then the final inbound message is created with the newly converted payload.In either case, if the
SmartMessageConverter
returnsnull
, the original message is used.- Parameters:
messagingConverter
- the converter.- Since:
- 2.7.1
-
toMessage
public Message<?> toMessage(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, Type type) Description copied from interface:RecordMessageConverter
Convert aConsumerRecord
to aMessage
.- Specified by:
toMessage
in interfaceRecordMessageConverter
- Parameters:
record
- the record.acknowledgment
- the acknowledgment.consumer
- the consumertype
- the required payload type.- Returns:
- the message.
-
fromMessage
public org.apache.kafka.clients.producer.ProducerRecord<?,?> fromMessage(Message<?> messageArg, String defaultTopic) Description copied from interface:RecordMessageConverter
Convert a message to a producer record.- Specified by:
fromMessage
in interfaceRecordMessageConverter
- Parameters:
messageArg
- the message.defaultTopic
- the default topic to use if no header found.- Returns:
- the producer record.
-
initialRecordHeaders
Subclasses can populate additional headers before they are mapped.- Parameters:
message
- the message.- Returns:
- the headers
- Since:
- 2.1
-
convertPayload
Subclasses can convert the payload; by default, it's sent unchanged to Kafka.- Parameters:
message
- the message.- Returns:
- the payload.
-
extractAndConvertValue
protected Object extractAndConvertValue(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, Type type) Subclasses can convert the value; by default, it's returned as provided by Kafka unless there is aSmartMessageConverter
that can convert it.- Parameters:
record
- the record.type
- the required type.- Returns:
- the value.
-