public class BatchMessagingMessageConverter extends java.lang.Object implements BatchMessageConverter
MessageConverter
implementation used with a batch
message listener; the consumer record values are extracted into a collection in
the message payload.
Populates KafkaHeaders
based on the ConsumerRecord
onto the returned message.
Each header is a collection where the position in the collection matches the payload
position.
If a RecordMessageConverter
is provided, and the batch type is a ParameterizedType
with a single generic type parameter, each record will be passed to the converter, thus supporting
a method signature List<Foo> foos
.
Modifier and Type | Field and Description |
---|---|
protected org.apache.commons.logging.Log |
logger |
Constructor and Description |
---|
BatchMessagingMessageConverter()
Create an instance that does not convert the record values.
|
BatchMessagingMessageConverter(RecordMessageConverter recordConverter)
Create an instance that converts record values using the supplied
converter.
|
Modifier and Type | Method and Description |
---|---|
protected java.lang.Object |
convert(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record,
java.lang.reflect.Type type)
Convert the record value.
|
protected java.lang.Object |
extractAndConvertValue(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record,
java.lang.reflect.Type type)
Subclasses can convert the value; by default, it's returned as provided by Kafka
unless a
RecordMessageConverter has been provided. |
java.util.List<org.apache.kafka.clients.producer.ProducerRecord<?,?>> |
fromMessage(org.springframework.messaging.Message<?> message,
java.lang.String defaultTopic)
Convert a message to a producer record.
|
void |
setGenerateMessageId(boolean generateMessageId)
Generate
Message ids for produced messages. |
void |
setGenerateTimestamp(boolean generateTimestamp)
Generate
timestamp for produced messages. |
void |
setHeaderMapper(KafkaHeaderMapper headerMapper)
Set the header mapper to map headers.
|
org.springframework.messaging.Message<?> |
toMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>> records,
Acknowledgment acknowledgment,
org.apache.kafka.clients.consumer.Consumer<?,?> consumer,
java.lang.reflect.Type type)
Convert a list of
ConsumerRecord to a Message . |
public BatchMessagingMessageConverter()
public BatchMessagingMessageConverter(RecordMessageConverter recordConverter)
recordConverter
- the converter.public void setGenerateMessageId(boolean generateMessageId)
Message
ids
for produced messages. If set to false
,
will try to use a default value. By default set to false
.generateMessageId
- true if a message id should be generatedpublic void setGenerateTimestamp(boolean generateTimestamp)
timestamp
for produced messages. If set to false
, -1 is
used instead. By default set to false
.generateTimestamp
- true if a timestamp should be generatedpublic void setHeaderMapper(KafkaHeaderMapper headerMapper)
headerMapper
- the mapper.public org.springframework.messaging.Message<?> toMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>> records, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer, java.lang.reflect.Type type)
BatchMessageConverter
ConsumerRecord
to a Message
.toMessage
in interface BatchMessageConverter
records
- the records.acknowledgment
- the acknowledgment.consumer
- the consumer.type
- the required payload type.public java.util.List<org.apache.kafka.clients.producer.ProducerRecord<?,?>> fromMessage(org.springframework.messaging.Message<?> message, java.lang.String defaultTopic)
BatchMessageConverter
fromMessage
in interface BatchMessageConverter
message
- the message.defaultTopic
- the default topic to use if no header found.protected java.lang.Object extractAndConvertValue(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, java.lang.reflect.Type type)
RecordMessageConverter
has been provided.record
- the record.type
- the required type.protected java.lang.Object convert(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, java.lang.reflect.Type type)
record
- the record.type
- the type - must be a ParameterizedType
with a single generic
type parameter.