public class MessagingMessageConverter extends java.lang.Object implements RecordMessageConverter
MessageConverter implementation for a message listener that
receives individual messages.
Populates KafkaHeaders based on the ConsumerRecord onto the returned
message.
| Modifier and Type | Field and Description |
|---|---|
protected org.springframework.core.log.LogAccessor |
logger |
| Constructor and Description |
|---|
MessagingMessageConverter() |
| Modifier and Type | Method and Description |
|---|---|
protected java.lang.Object |
convertPayload(org.springframework.messaging.Message<?> message)
Subclasses can convert the payload; by default, it's sent unchanged to Kafka.
|
protected java.lang.Object |
extractAndConvertValue(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record,
java.lang.reflect.Type type)
Subclasses can convert the value; by default, it's returned as provided by Kafka.
|
org.apache.kafka.clients.producer.ProducerRecord<?,?> |
fromMessage(org.springframework.messaging.Message<?> message,
java.lang.String defaultTopic)
Convert a message to a producer record.
|
protected org.apache.kafka.common.header.Headers |
initialRecordHeaders(org.springframework.messaging.Message<?> message)
Subclasses can populate additional headers before they are mapped.
|
void |
setGenerateMessageId(boolean generateMessageId)
Generate
Message ids for produced messages. |
void |
setGenerateTimestamp(boolean generateTimestamp)
Generate
timestamp for produced messages. |
void |
setHeaderMapper(KafkaHeaderMapper headerMapper)
Set the header mapper to map headers.
|
org.springframework.messaging.Message<?> |
toMessage(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record,
Acknowledgment acknowledgment,
org.apache.kafka.clients.consumer.Consumer<?,?> consumer,
java.lang.reflect.Type type)
Convert a
ConsumerRecord to a Message. |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitcommonHeaders, getGroupIdpublic void setGenerateMessageId(boolean generateMessageId)
Message ids for produced messages. If set to false,
will try to use a default value. By default set to false.generateMessageId - true if a message id should be generatedpublic void setGenerateTimestamp(boolean generateTimestamp)
timestamp for produced messages. If set to false, -1 is
used instead. By default set to false.generateTimestamp - true if a timestamp should be generatedpublic void setHeaderMapper(KafkaHeaderMapper headerMapper)
headerMapper - the mapper.public org.springframework.messaging.Message<?> toMessage(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record,
Acknowledgment acknowledgment,
org.apache.kafka.clients.consumer.Consumer<?,?> consumer,
java.lang.reflect.Type type)
RecordMessageConverterConsumerRecord to a Message.toMessage in interface RecordMessageConverterrecord - the record.acknowledgment - the acknowledgment.consumer - the consumertype - the required payload type.public org.apache.kafka.clients.producer.ProducerRecord<?,?> fromMessage(org.springframework.messaging.Message<?> message,
java.lang.String defaultTopic)
RecordMessageConverterfromMessage in interface RecordMessageConvertermessage - the message.defaultTopic - the default topic to use if no header found.protected org.apache.kafka.common.header.Headers initialRecordHeaders(org.springframework.messaging.Message<?> message)
message - the message.protected java.lang.Object convertPayload(org.springframework.messaging.Message<?> message)
message - the message.protected java.lang.Object extractAndConvertValue(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record,
java.lang.reflect.Type type)
record - the record.type - the required type.