Class BatchMessagingMessageConverter
- java.lang.Object
-
- org.springframework.kafka.support.converter.BatchMessagingMessageConverter
-
- All Implemented Interfaces:
BatchMessageConverter
,MessageConverter
public class BatchMessagingMessageConverter extends java.lang.Object implements BatchMessageConverter
A MessagingMessageConverter
implementation used with a batch message listener; the consumer record values are extracted into a collection in the message payload.Populates
KafkaHeaders
based on theConsumerRecord
onto the returned message. Each header is a collection where the position in the collection matches the payload position.If a
RecordMessageConverter
is provided, and the batch type is aParameterizedType
with a single generic type parameter, each record will be passed to the converter, thus supporting a method signatureList<Foo> foos
.- Since:
- 1.1
- Author:
- Marius Bogoevici, Gary Russell, Dariusz Szablinski, Biju Kunjummen
-
-
Field Summary
Fields Modifier and Type Field Description protected org.springframework.core.log.LogAccessor
logger
-
Constructor Summary
Constructors Constructor Description BatchMessagingMessageConverter()
Create an instance that does not convert the record values.BatchMessagingMessageConverter(RecordMessageConverter recordConverter)
Create an instance that converts record values using the supplied converter.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected java.lang.Object
convert(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, java.lang.reflect.Type type, java.util.List<ConversionException> conversionFailures)
Convert the record value.protected java.lang.Object
extractAndConvertValue(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, java.lang.reflect.Type type)
Subclasses can convert the value; by default, it's returned as provided by Kafka unless aRecordMessageConverter
has been provided.java.util.List<org.apache.kafka.clients.producer.ProducerRecord<?,?>>
fromMessage(org.springframework.messaging.Message<?> message, java.lang.String defaultTopic)
Convert a message to a producer record.RecordMessageConverter
getRecordMessageConverter()
Return the record converter used by this batch converter, if configured, or null.void
setGenerateMessageId(boolean generateMessageId)
GenerateMessage
ids
for produced messages.void
setGenerateTimestamp(boolean generateTimestamp)
Generatetimestamp
for produced messages.void
setHeaderMapper(KafkaHeaderMapper headerMapper)
Set the header mapper to map headers.void
setRawRecordHeader(boolean rawRecordHeader)
Set to true to add the rawList<ConsumerRecord<?, ?>>
as a headerKafkaHeaders.RAW_DATA
.org.springframework.messaging.Message<?>
toMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>> records, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer, java.lang.reflect.Type type)
Convert a list ofConsumerRecord
to aMessage
.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.springframework.kafka.support.converter.MessageConverter
commonHeaders
-
-
-
-
Constructor Detail
-
BatchMessagingMessageConverter
public BatchMessagingMessageConverter()
Create an instance that does not convert the record values.
-
BatchMessagingMessageConverter
public BatchMessagingMessageConverter(RecordMessageConverter recordConverter)
Create an instance that converts record values using the supplied converter.- Parameters:
recordConverter
- the converter.- Since:
- 1.3.2
-
-
Method Detail
-
setGenerateMessageId
public void setGenerateMessageId(boolean generateMessageId)
GenerateMessage
ids
for produced messages. If set tofalse
, will try to use a default value. By default set tofalse
.- Parameters:
generateMessageId
- true if a message id should be generated
-
setGenerateTimestamp
public void setGenerateTimestamp(boolean generateTimestamp)
Generatetimestamp
for produced messages. If set tofalse
, -1 is used instead. By default set tofalse
.- Parameters:
generateTimestamp
- true if a timestamp should be generated
-
setHeaderMapper
public void setHeaderMapper(KafkaHeaderMapper headerMapper)
Set the header mapper to map headers.- Parameters:
headerMapper
- the mapper.- Since:
- 1.3
-
getRecordMessageConverter
public RecordMessageConverter getRecordMessageConverter()
Description copied from interface:BatchMessageConverter
Return the record converter used by this batch converter, if configured, or null.- Specified by:
getRecordMessageConverter
in interfaceBatchMessageConverter
- Returns:
- the converter or null.
-
setRawRecordHeader
public void setRawRecordHeader(boolean rawRecordHeader)
Set to true to add the rawList<ConsumerRecord<?, ?>>
as a headerKafkaHeaders.RAW_DATA
.- Parameters:
rawRecordHeader
- true to add the header.- Since:
- 2.7
-
toMessage
public org.springframework.messaging.Message<?> toMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>> records, @Nullable Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer, java.lang.reflect.Type type)
Description copied from interface:BatchMessageConverter
Convert a list ofConsumerRecord
to aMessage
.- Specified by:
toMessage
in interfaceBatchMessageConverter
- Parameters:
records
- the records.acknowledgment
- the acknowledgment.consumer
- the consumer.type
- the required payload type.- Returns:
- the message.
-
fromMessage
public java.util.List<org.apache.kafka.clients.producer.ProducerRecord<?,?>> fromMessage(org.springframework.messaging.Message<?> message, java.lang.String defaultTopic)
Description copied from interface:BatchMessageConverter
Convert a message to a producer record.- Specified by:
fromMessage
in interfaceBatchMessageConverter
- Parameters:
message
- the message.defaultTopic
- the default topic to use if no header found.- Returns:
- the producer records.
-
extractAndConvertValue
protected java.lang.Object extractAndConvertValue(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, java.lang.reflect.Type type)
Subclasses can convert the value; by default, it's returned as provided by Kafka unless aRecordMessageConverter
has been provided.- Parameters:
record
- the record.type
- the required type.- Returns:
- the value.
-
convert
protected java.lang.Object convert(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, java.lang.reflect.Type type, java.util.List<ConversionException> conversionFailures)
Convert the record value.- Parameters:
record
- the record.type
- the type - must be aParameterizedType
with a single generic type parameter.conversionFailures
- Conversion failures.- Returns:
- the converted payload.
-
-