public class KafkaMessageChannelBinder extends AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner> implements ExtendedPropertiesBinder<org.springframework.messaging.MessageChannel,org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>
Binder
that uses Kafka as the underlying middleware.AbstractMessageChannelBinder.ErrorInfrastructure, AbstractMessageChannelBinder.PolledConsumerResources
Modifier and Type | Field and Description |
---|---|
static String |
X_EXCEPTION_FQCN |
static String |
X_EXCEPTION_MESSAGE |
static String |
X_EXCEPTION_STACKTRACE |
static String |
X_ORIGINAL_OFFSET |
static String |
X_ORIGINAL_PARTITION |
static String |
X_ORIGINAL_TIMESTAMP |
static String |
X_ORIGINAL_TIMESTAMP_TYPE |
static String |
X_ORIGINAL_TOPIC |
provisioningProvider
logger
Constructor and Description |
---|
KafkaMessageChannelBinder(org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties configurationProperties,
org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner provisioningProvider) |
KafkaMessageChannelBinder(org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties configurationProperties,
org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner provisioningProvider,
ListenerContainerCustomizer<org.springframework.kafka.listener.AbstractMessageListenerContainer<?,?>> containerCustomizer) |
Modifier and Type | Method and Description |
---|---|
protected org.springframework.integration.core.MessageProducer |
createConsumerEndpoint(ConsumerDestination destination,
String group,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> extendedConsumerProperties)
Creates
MessageProducer that receives data from the consumer destination. |
protected org.springframework.kafka.core.ConsumerFactory<?,?> |
createKafkaConsumerFactory(boolean anonymous,
String consumerGroup,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> consumerProperties) |
protected AbstractMessageChannelBinder.PolledConsumerResources |
createPolledConsumerResources(String name,
String group,
ConsumerDestination destination,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> consumerProperties) |
protected org.springframework.messaging.MessageHandler |
createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties,
org.springframework.messaging.MessageChannel errorChannel)
Create a
MessageHandler with the ability to send data to the target
middleware. |
protected org.springframework.messaging.MessageHandler |
createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties,
org.springframework.messaging.MessageChannel channel,
org.springframework.messaging.MessageChannel errorChannel)
Create a
MessageHandler with the ability to send data to the target
middleware. |
String |
getDefaultsPrefix()
Extended binding properties can define a default prefix to place all the extended
common producer and consumer properties.
|
protected org.springframework.messaging.MessageHandler |
getErrorMessageHandler(ConsumerDestination destination,
String group,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> properties)
Binders can return a message handler to be subscribed to the error channel.
|
protected org.springframework.integration.support.ErrorMessageStrategy |
getErrorMessageStrategy()
Binders can return an
ErrorMessageStrategy for building error messages; binder
implementations typically might add extra headers to the error message. |
org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties |
getExtendedConsumerProperties(String channelName) |
org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties |
getExtendedProducerProperties(String channelName) |
Class<? extends BinderSpecificPropertiesProvider> |
getExtendedPropertiesEntryClass()
Extended properties class which should be a subclass of
BinderSpecificPropertiesProvider
against which default extended producer and consumer properties
are resolved. |
protected org.springframework.messaging.MessageHandler |
getPolledConsumerErrorMessageHandler(ConsumerDestination destination,
String group,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> properties)
Binders can return a message handler to be subscribed to the error channel.
|
protected org.springframework.kafka.core.DefaultKafkaProducerFactory<byte[],byte[]> |
getProducerFactory(String transactionIdPrefix,
ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties) |
protected void |
postProcessPollableSource(DefaultPollableMessageSource bindingTarget) |
Collection<org.apache.kafka.common.PartitionInfo> |
processTopic(String group,
ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> extendedConsumerProperties,
org.springframework.kafka.core.ConsumerFactory<?,?> consumerFactory,
int partitionCount,
boolean usingPatterns,
boolean groupManagement,
String topic) |
void |
setExtendedBindingProperties(org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties extendedBindingProperties) |
void |
setProducerListener(org.springframework.kafka.support.ProducerListener<byte[],byte[]> producerListener) |
protected boolean |
useNativeEncoding(ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties)
Whether the producer for the destination being created should be configured to use
native encoding which may, or may not, be determined from the properties.
|
afterUnbindConsumer, afterUnbindProducer, bindPollableConsumer, doBindConsumer, doBindProducer, errorsBaseName, errorsBaseName, getApplicationEventPublisher, getContainerCustomizer, getDefaultErrorMessageHandler, getErrorBridgeName, getErrorBridgeName, getErrorMessageHandlerName, getErrorRecovererName, getPolledConsumerRecoveryCallback, postProcessOutputChannel, registerErrorInfrastructure, registerErrorInfrastructure, setApplicationEventPublisher
afterPropertiesSet, applyPrefix, bindConsumer, bindProducer, buildPartitionRoutingExpression, buildRetryTemplate, constructDLQName, getApplicationContext, getBeanFactory, getEvaluationContext, groupedName, onInit, serializePayloadIfNecessary, setApplicationContext
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
bindConsumer, bindProducer
public static final String X_EXCEPTION_FQCN
public static final String X_EXCEPTION_STACKTRACE
public static final String X_EXCEPTION_MESSAGE
public static final String X_ORIGINAL_TOPIC
public static final String X_ORIGINAL_PARTITION
public static final String X_ORIGINAL_OFFSET
public static final String X_ORIGINAL_TIMESTAMP
public static final String X_ORIGINAL_TIMESTAMP_TYPE
public KafkaMessageChannelBinder(org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties configurationProperties, org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner provisioningProvider)
public KafkaMessageChannelBinder(org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties configurationProperties, org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner provisioningProvider, ListenerContainerCustomizer<org.springframework.kafka.listener.AbstractMessageListenerContainer<?,?>> containerCustomizer)
public void setExtendedBindingProperties(org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties extendedBindingProperties)
public void setProducerListener(org.springframework.kafka.support.ProducerListener<byte[],byte[]> producerListener)
public org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties getExtendedConsumerProperties(String channelName)
getExtendedConsumerProperties
in interface ExtendedBindingProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>
public org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties getExtendedProducerProperties(String channelName)
getExtendedProducerProperties
in interface ExtendedBindingProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>
public String getDefaultsPrefix()
ExtendedBindingProperties
getDefaultsPrefix
in interface ExtendedBindingProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass()
ExtendedBindingProperties
BinderSpecificPropertiesProvider
against which default extended producer and consumer properties
are resolved.getExtendedPropertiesEntryClass
in interface ExtendedBindingProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties,org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>
protected org.springframework.messaging.MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties, org.springframework.messaging.MessageChannel errorChannel) throws Exception
AbstractMessageChannelBinder
MessageHandler
with the ability to send data to the target
middleware. If the returned instance is also a Lifecycle
, it will be
stopped automatically by the binder.
In order to be fully compliant, the MessageHandler
of the binder must
observe the following headers:
BinderHeaders.PARTITION_HEADER
- indicates the target partition where
the message must be sentcreateProducerMessageHandler
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
destination
- the name of the target destinationproducerProperties
- the producer propertieserrorChannel
- the error channel (if enabled, otherwise null). If not null,
the binder must wire this channel into the producer endpoint so that errors
are forwarded to it.Exception
protected org.springframework.messaging.MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties, org.springframework.messaging.MessageChannel channel, org.springframework.messaging.MessageChannel errorChannel) throws Exception
AbstractMessageChannelBinder
MessageHandler
with the ability to send data to the target
middleware. If the returned instance is also a Lifecycle
, it will be
stopped automatically by the binder.
In order to be fully compliant, the MessageHandler
of the binder must
observe the following headers:
BinderHeaders.PARTITION_HEADER
- indicates the target partition where
the message must be sentcreateProducerMessageHandler
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
destination
- the name of the target destination.producerProperties
- the producer properties.channel
- the channel to bind.errorChannel
- the error channel (if enabled, otherwise null). If not null,
the binder must wire this channel into the producer endpoint so that errors
are forwarded to it.Exception
protected org.springframework.kafka.core.DefaultKafkaProducerFactory<byte[],byte[]> getProducerFactory(String transactionIdPrefix, ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties)
protected boolean useNativeEncoding(ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties> producerProperties)
AbstractMessageChannelBinder
P#isUseNativeEncoding()
.useNativeEncoding
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
producerProperties
- the properties.protected org.springframework.integration.core.MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> extendedConsumerProperties)
AbstractMessageChannelBinder
MessageProducer
that receives data from the consumer destination.
will be started and stopped by the binder.createConsumerEndpoint
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
destination
- reference to the consumer destinationgroup
- the consumer groupextendedConsumerProperties
- the consumer propertiespublic Collection<org.apache.kafka.common.PartitionInfo> processTopic(String group, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> extendedConsumerProperties, org.springframework.kafka.core.ConsumerFactory<?,?> consumerFactory, int partitionCount, boolean usingPatterns, boolean groupManagement, String topic)
protected AbstractMessageChannelBinder.PolledConsumerResources createPolledConsumerResources(String name, String group, ConsumerDestination destination, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> consumerProperties)
createPolledConsumerResources
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
protected void postProcessPollableSource(DefaultPollableMessageSource bindingTarget)
postProcessPollableSource
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
protected org.springframework.integration.support.ErrorMessageStrategy getErrorMessageStrategy()
AbstractMessageChannelBinder
ErrorMessageStrategy
for building error messages; binder
implementations typically might add extra headers to the error message.getErrorMessageStrategy
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
protected org.springframework.messaging.MessageHandler getErrorMessageHandler(ConsumerDestination destination, String group, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> properties)
AbstractMessageChannelBinder
getErrorMessageHandler
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
destination
- the destination.group
- the group.properties
- the properties.protected org.springframework.messaging.MessageHandler getPolledConsumerErrorMessageHandler(ConsumerDestination destination, String group, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> properties)
AbstractMessageChannelBinder
getPolledConsumerErrorMessageHandler
in class AbstractMessageChannelBinder<ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties>,ExtendedProducerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties>,org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner>
destination
- the destination.group
- the group.properties
- the properties.protected org.springframework.kafka.core.ConsumerFactory<?,?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup, ExtendedConsumerProperties<org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties> consumerProperties)
Copyright © 2018 Pivotal Software, Inc.. All rights reserved.