K
- the key type.V
- the value type.S
- the KafkaProducerMessageHandlerSpec
extension type.public class KafkaProducerMessageHandlerSpec<K,V,S extends KafkaProducerMessageHandlerSpec<K,V,S>> extends MessageHandlerSpec<S,KafkaProducerMessageHandler<K,V>>
MessageHandlerSpec
implementation for the KafkaProducerMessageHandler
.Modifier and Type | Class and Description |
---|---|
static class |
KafkaProducerMessageHandlerSpec.KafkaProducerMessageHandlerTemplateSpec<K,V>
A
KafkaTemplate -based KafkaProducerMessageHandlerSpec extension. |
PARSER, target
logger
DEFAULT_PHASE
OBJECT_TYPE_ATTRIBUTE
Modifier and Type | Method and Description |
---|---|
<P> S |
flush(java.util.function.Function<Message<P>,Boolean> flushFunction)
Configure a
Function that will be invoked at runtime to determine whether
or not to flush the producer after a send. |
S |
flushExpression(Expression flushExpression)
Configure an
Expression to determine whether or not to flush the producer
after a send. |
S |
flushExpression(String flushExpression)
Configure a SpEL expression to determine whether or not to flush the producer after
a send.
|
S |
futuresChannel(MessageChannel futuresChannel)
Set the channel to which send futures are sent.
|
S |
futuresChannel(String futuresChannel)
Set the channel to which send futures are sent.
|
S |
headerMapper(org.springframework.kafka.support.KafkaHeaderMapper mapper)
Specify a header mapper to map spring messaging headers to Kafka headers.
|
<P> S |
messageKey(java.util.function.Function<Message<P>,?> messageKeyFunction)
Configure a
Function that will be invoked at runtime to determine the
message key under which a message will be stored in the topic. |
S |
messageKey(String messageKey)
Configure the message key to store message in Kafka topic.
|
S |
messageKeyExpression(Expression messageKeyExpression)
Configure an
Expression to determine the Kafka message key to store at runtime against
request Message as a root object of evaluation context. |
S |
messageKeyExpression(String messageKeyExpression)
Configure a SpEL expression to determine the Kafka message key to store at runtime against
request Message as a root object of evaluation context.
|
<P> S |
partitionId(java.util.function.Function<Message<P>,Integer> partitionIdFunction)
Configure a
Function that will be invoked at runtime to determine the
partition id under which a message will be stored in the topic. |
S |
partitionId(Integer partitionId)
Configure a partitionId of Kafka topic.
|
S |
partitionIdExpression(Expression partitionIdExpression)
Configure an
Expression to determine the topic partitionId at runtime against
request Message as a root object of evaluation context. |
S |
partitionIdExpression(String partitionIdExpression)
Configure a SpEL expression to determine the topic partitionId at runtime against
request Message as a root object of evaluation context.
|
S |
sendFailureChannel(MessageChannel sendFailureChannel)
Set the channel to which failed send results are sent.
|
S |
sendFailureChannel(String sendFailureChannel)
Set the channel to which failed send results are sent.
|
S |
sendSuccessChannel(MessageChannel sendSuccessChannel)
Set the channel to which successful send results are sent.
|
S |
sendSuccessChannel(String sendSuccessChannel)
Set the channel to which successful send results are sent.
|
S |
sendTimeout(long sendTimeout)
Specify a timeout in milliseconds how long
KafkaProducerMessageHandler
should wait wait for send operation results. |
S |
sync(boolean sync)
A
boolean indicating if the KafkaProducerMessageHandler
should wait for the send operation results or not. |
<P> S |
timestamp(java.util.function.Function<Message<P>,Long> timestampFunction)
Configure a
Function that will be invoked at runtime to determine the Kafka
record timestamp will be stored in the topic. |
S |
timestampExpression(Expression timestampExpression)
Configure an
Expression to determine the timestamp at runtime against a
request Message as a root object of evaluation context. |
S |
timestampExpression(String timestampExpression)
Configure a SpEL expression to determine the timestamp at runtime against a
request Message as a root object of evaluation context.
|
<P> S |
topic(java.util.function.Function<Message<P>,String> topicFunction)
Configure a
Function that will be invoked at runtime to determine the topic
to which a message will be sent. |
S |
topic(String topic)
Configure the Kafka topic to send messages.
|
S |
topicExpression(Expression topicExpression)
Configure an
Expression to determine the Kafka topic at runtime against
request Message as a root object of evaluation context. |
S |
topicExpression(String topicExpression)
Configure a SpEL expression to determine the Kafka topic at runtime against
request Message as a root object of evaluation context.
|
_this, createInstance, destroyInstance, doGet, get, getId, getObjectType, getPhase, id, isAutoStartup, isRunning, start, stop, stop
afterPropertiesSet, destroy, getBeanFactory, getBeanTypeConverter, getEarlySingletonInterfaces, getObject, isSingleton, setBeanClassLoader, setBeanFactory, setSingleton
public S topic(String topic)
topic
- the Kafka topic name.public S topicExpression(String topicExpression)
topicExpression
- the topic SpEL expression.public S topicExpression(Expression topicExpression)
Expression
to determine the Kafka topic at runtime against
request Message as a root object of evaluation context.topicExpression
- the topic expression.public <P> S topic(java.util.function.Function<Message<P>,String> topicFunction)
Function
that will be invoked at runtime to determine the topic
to which a message will be sent. Typically used with a Java 8 Lambda expression:
.<Foo>topic(m -> m.getPayload().getTopic())
P
- the expected payload type.topicFunction
- the topic function.KafkaProducerMessageHandlerSpec
.FunctionExpression
public S messageKeyExpression(String messageKeyExpression)
messageKeyExpression
- the message key SpEL expression.public S messageKey(String messageKey)
messageKey
- the message key to use.public S messageKeyExpression(Expression messageKeyExpression)
Expression
to determine the Kafka message key to store at runtime against
request Message as a root object of evaluation context.messageKeyExpression
- the message key expression.public <P> S messageKey(java.util.function.Function<Message<P>,?> messageKeyFunction)
Function
that will be invoked at runtime to determine the
message key under which a message will be stored in the topic. Typically used with
a Java 8 Lambda expression:
.<Foo>messageKey(m -> m.getPayload().getKey())
P
- the expected payload type.messageKeyFunction
- the message key function.KafkaProducerMessageHandlerSpec
.FunctionExpression
public S partitionId(Integer partitionId)
partitionId
- the partitionId to use.public S partitionIdExpression(String partitionIdExpression)
partitionIdExpression
- the partitionId expression to use.public <P> S partitionId(java.util.function.Function<Message<P>,Integer> partitionIdFunction)
Function
that will be invoked at runtime to determine the
partition id under which a message will be stored in the topic. Typically used with
a Java 8 Lambda expression:
.partitionId(m -> m.getHeaders().get("partitionId", Integer.class))
P
- the expected payload type.partitionIdFunction
- the partitionId function.public S partitionIdExpression(Expression partitionIdExpression)
Expression
to determine the topic partitionId at runtime against
request Message as a root object of evaluation context.partitionIdExpression
- the partitionId expression to use.public S timestampExpression(String timestampExpression)
timestampExpression
- the timestamp expression to use.public <P> S timestamp(java.util.function.Function<Message<P>,Long> timestampFunction)
Function
that will be invoked at runtime to determine the Kafka
record timestamp will be stored in the topic. Typically used with a Java 8 Lambda
expression:
.timestamp(m -> m.getHeaders().get("mytimestamp_header", Long.class))
P
- the expected payload type.timestampFunction
- the timestamp function.public S timestampExpression(Expression timestampExpression)
Expression
to determine the timestamp at runtime against a
request Message as a root object of evaluation context.timestampExpression
- the timestamp expression to use.public S flushExpression(String flushExpression)
kafka_flush
has a
value Boolean.TRUE
.flushExpression
- the timestamp expression to use.public <P> S flush(java.util.function.Function<Message<P>,Boolean> flushFunction)
Function
that will be invoked at runtime to determine whether
or not to flush the producer after a send. By default the producer is flushed if a
header kafka_flush
has a value Boolean.TRUE
. Typically used with a
Java 8 Lambda expression:
.flush(m -> m.getPayload().shouldFlush())
P
- the expected payload type.flushFunction
- the flush function.public S flushExpression(Expression flushExpression)
Expression
to determine whether or not to flush the producer
after a send. By default the producer is flushed if a header kafka_flush
has a value Boolean.TRUE
.flushExpression
- the timestamp expression to use.public S sync(boolean sync)
boolean
indicating if the KafkaProducerMessageHandler
should wait for the send operation results or not. Defaults to false
.
In sync
mode a downstream send operation exception will be re-thrown.sync
- the send mode; async by default.public S sendTimeout(long sendTimeout)
KafkaProducerMessageHandler
should wait wait for send operation results. Defaults to 10 seconds.sendTimeout
- the timeout to wait for result fo send operation.public S headerMapper(org.springframework.kafka.support.KafkaHeaderMapper mapper)
mapper
- the mapper.public S sendSuccessChannel(MessageChannel sendSuccessChannel)
sendSuccessChannel
- the channel.public S sendSuccessChannel(String sendSuccessChannel)
sendSuccessChannel
- the channel name.public S sendFailureChannel(MessageChannel sendFailureChannel)
sendFailureChannel
- the channel.public S sendFailureChannel(String sendFailureChannel)
sendFailureChannel
- the channel name.public S futuresChannel(MessageChannel futuresChannel)
futuresChannel
- the channel.