public class KafkaMessageBus extends MessageBusSupport implements org.springframework.beans.factory.DisposableBean
| Stream definition | Kafka topic | Kafka partitions | Notes |
|---|---|---|---|
| foo = "http | log" | foo.0 | 1 partition | 1 producer, 1 consumer |
| foo = "http | log", log.count=x | foo.0 | x partitions | 1 producer, x consumers with static group 'springXD', achieves queue semantics |
| foo = "http | log", log.count=x + XD partitioning | still 1 topic 'foo.0' | x partitions + use key computed by XD | 1 producer, x consumers with static group 'springXD', achieves queue semantics |
| foo = "http | log", log.count=x, concurrency=y | foo.0 | x*y partitions | 1 producer, x XD consumers, each with y threads |
| Modifier and Type | Class and Description |
|---|---|
static class |
KafkaMessageBus.Mode |
static class |
KafkaMessageBus.OffsetManagement |
MessageBusSupport.DirectHandler, MessageBusSupport.PartitioningMetadata, MessageBusSupport.SetBuilder, MessageBusSupport.SharedChannelProvider<T extends org.springframework.messaging.MessageChannel>MessageBus.Capability| Modifier and Type | Field and Description |
|---|---|
static java.lang.String |
AUTO_COMMIT_OFFSET_ENABLED |
static org.apache.kafka.common.serialization.ByteArraySerializer |
BYTE_ARRAY_SERIALIZER |
static java.lang.String |
COMPRESSION_CODEC |
static java.lang.String |
FETCH_SIZE |
static int |
METADATA_VERIFICATION_MAX_INTERVAL |
static int |
METADATA_VERIFICATION_RETRY_ATTEMPTS |
static double |
METADATA_VERIFICATION_RETRY_BACKOFF_MULTIPLIER |
static int |
METADATA_VERIFICATION_RETRY_INITIAL_INTERVAL |
protected static java.util.Set<java.lang.Object> |
PRODUCER_COMPRESSION_PROPERTIES |
static java.lang.String |
QUEUE_SIZE |
static java.lang.String |
REQUIRED_ACKS |
static java.lang.String |
SYNC_PRODUCER |
static java.lang.String |
SYNC_PRODUCER_TIMEOUT |
static org.I0Itec.zkclient.serialize.ZkSerializer |
utf8Serializer
Used when writing directly to ZK.
|
CONSUMER_RETRY_PROPERTIES, CONSUMER_STANDARD_PROPERTIES, defaultBackOffInitialInterval, defaultBackOffMaxInterval, defaultBackOffMultiplier, defaultBatchBufferLimit, defaultBatchingEnabled, defaultBatchSize, defaultBatchTimeout, defaultCompress, defaultConcurrency, defaultMaxAttempts, directChannelProvider, evaluationContext, JOB_CHANNEL_TYPE_PREFIX, logger, MEDIATYPES_MEDIATYPE_ALL, P2P_NAMED_CHANNEL_TYPE_PREFIX, PARTITION_HEADER, PRODUCER_BATCHING_ADVANCED_PROPERTIES, PRODUCER_BATCHING_BASIC_PROPERTIES, PRODUCER_PARTITIONING_PROPERTIES, PRODUCER_STANDARD_PROPERTIES, PUBSUB_NAMED_CHANNEL_TYPE_PREFIX, TAP_TYPE_PREFIX| Constructor and Description |
|---|
KafkaMessageBus(org.springframework.integration.kafka.support.ZookeeperConnect zookeeperConnect,
java.lang.String brokers,
java.lang.String zkAddress,
org.springframework.integration.codec.Codec codec,
java.lang.String... headersToMap) |
| Modifier and Type | Method and Description |
|---|---|
void |
afterPropertiesSet() |
void |
bindConsumer(java.lang.String name,
org.springframework.messaging.MessageChannel moduleInputChannel,
java.util.Properties properties)
Bind a message consumer on a p2p channel
|
void |
bindProducer(java.lang.String name,
org.springframework.messaging.MessageChannel moduleOutputChannel,
java.util.Properties properties)
Bind a message producer on a p2p channel.
|
void |
bindPubSubConsumer(java.lang.String name,
org.springframework.messaging.MessageChannel inputChannel,
java.util.Properties properties)
Bind a message consumer on a pub/sub channel
|
void |
bindPubSubProducer(java.lang.String name,
org.springframework.messaging.MessageChannel outputChannel,
java.util.Properties properties)
Bind a message producer on a pub/sub channel.
|
void |
bindReplier(java.lang.String name,
org.springframework.messaging.MessageChannel requests,
org.springframework.messaging.MessageChannel replies,
java.util.Properties properties)
Bind a consumer that handles requests from a requestor and asynchronously sends replies.
|
void |
bindRequestor(java.lang.String name,
org.springframework.messaging.MessageChannel requests,
org.springframework.messaging.MessageChannel replies,
java.util.Properties properties)
Bind a producer that expects async replies.
|
org.springframework.integration.kafka.listener.KafkaMessageListenerContainer |
createMessageListenerContainer(java.util.Properties properties,
java.lang.String group,
int maxConcurrency,
java.lang.String topic,
long referencePoint,
boolean resetOffsets) |
void |
destroy() |
void |
doManualAck(java.util.LinkedList<org.springframework.messaging.MessageHeaders> messageHeadersList)
Perform manual acknowledgement based on the metadata stored in message bus.
|
static java.lang.String |
escapeTopicName(java.lang.String original)
Allowed chars are ASCII alphanumerics, '.', '_' and '-'.
|
org.springframework.integration.kafka.core.ConnectionFactory |
getConnectionFactory() |
boolean |
isCapable(MessageBus.Capability capability)
Return true if the bus supports the capability.
|
void |
setDefaultAutoCommitOffsetEnabled(boolean defaultAutoCommitOffsetEnabled)
Set the default auto commit enabled property; This is used to commit the offset either automatically or
manually.
|
void |
setDefaultCompressionCodec(java.lang.String defaultCompressionCodec) |
void |
setDefaultFetchSize(int defaultFetchSize) |
void |
setDefaultMaxWait(int defaultMaxWait) |
void |
setDefaultMinPartitionCount(int defaultMinPartitionCount) |
void |
setDefaultQueueSize(int defaultQueueSize) |
void |
setDefaultReplicationFactor(int defaultReplicationFactor) |
void |
setDefaultRequiredAcks(int defaultRequiredAcks) |
void |
setDefaultSyncProducer(boolean syncProducer) |
void |
setDefaultSyncProducerTimeout(int timeout) |
void |
setMode(KafkaMessageBus.Mode mode) |
void |
setOffsetManagement(KafkaMessageBus.OffsetManagement offsetManagement)
Set the
KafkaMessageBus.OffsetManagement to use. |
void |
setOffsetStoreBatchBytes(int offsetStoreBatchBytes) |
void |
setOffsetStoreBatchTime(int offsetStoreBatchTime) |
void |
setOffsetStoreMaxFetchSize(int offsetStoreMaxFetchSize) |
void |
setOffsetStoreRequiredAcks(int offsetStoreRequiredAcks) |
void |
setOffsetStoreRetentionTime(int offsetStoreRetentionTime) |
void |
setOffsetStoreSegmentSize(int offsetStoreSegmentSize) |
void |
setOffsetStoreTopic(java.lang.String offsetStoreTopic) |
void |
setOffsetUpdateCount(int offsetUpdateCount) |
void |
setOffsetUpdateShutdownTimeout(int offsetUpdateShutdownTimeout) |
void |
setOffsetUpdateTimeWindow(int offsetUpdateTimeWindow) |
void |
setRetryOperations(org.springframework.retry.RetryOperations retryOperations)
Retry configuration for operations such as validating topic creation
|
void |
setSocketBufferSize(int socketBufferSize) |
addBinding, applyPrefix, applyPubSub, applyRequests, bindDynamicProducer, bindDynamicPubSubProducer, bindExistingProducerDirectlyIfPossible, bindNewProducerDirectlyIfPossible, buildPartitionRoutingExpression, buildRetryTemplateIfRetryEnabled, constructDLQName, deleteBinding, deleteBindings, deserializePayloadIfNecessary, deserializePayloadIfNecessary, determinePartition, doBindDynamicProducer, doBindDynamicPubSubProducer, getApplicationContext, getBeanFactory, getIdGenerator, isNamedChannel, onInit, serializePayloadIfNecessary, setApplicationContext, setCodec, setDefaultBackOffInitialInterval, setDefaultBackOffMaxInterval, setDefaultBackOffMultiplier, setDefaultBatchBufferLimit, setDefaultBatchingEnabled, setDefaultBatchSize, setDefaultBatchTimeout, setDefaultCompress, setDefaultConcurrency, setDefaultDurableSubscription, setDefaultMaxAttempts, setIntegrationEvaluationContext, setPartitionSelector, stopBindings, unbindConsumer, unbindConsumers, unbindProducer, unbindProducers, validateConsumerProperties, validateProducerPropertiespublic static final org.apache.kafka.common.serialization.ByteArraySerializer BYTE_ARRAY_SERIALIZER
public static final int METADATA_VERIFICATION_RETRY_ATTEMPTS
public static final double METADATA_VERIFICATION_RETRY_BACKOFF_MULTIPLIER
public static final int METADATA_VERIFICATION_RETRY_INITIAL_INTERVAL
public static final int METADATA_VERIFICATION_MAX_INTERVAL
public static final java.lang.String FETCH_SIZE
public static final java.lang.String QUEUE_SIZE
public static final java.lang.String REQUIRED_ACKS
public static final java.lang.String COMPRESSION_CODEC
public static final java.lang.String AUTO_COMMIT_OFFSET_ENABLED
public static final java.lang.String SYNC_PRODUCER
public static final java.lang.String SYNC_PRODUCER_TIMEOUT
public static final org.I0Itec.zkclient.serialize.ZkSerializer utf8Serializer
protected static final java.util.Set<java.lang.Object> PRODUCER_COMPRESSION_PROPERTIES
public KafkaMessageBus(org.springframework.integration.kafka.support.ZookeeperConnect zookeeperConnect,
java.lang.String brokers,
java.lang.String zkAddress,
org.springframework.integration.codec.Codec codec,
java.lang.String... headersToMap)
public void setOffsetStoreTopic(java.lang.String offsetStoreTopic)
public void setOffsetStoreSegmentSize(int offsetStoreSegmentSize)
public void setOffsetStoreRetentionTime(int offsetStoreRetentionTime)
public void setSocketBufferSize(int socketBufferSize)
public void setOffsetStoreRequiredAcks(int offsetStoreRequiredAcks)
public void setOffsetStoreMaxFetchSize(int offsetStoreMaxFetchSize)
public void setOffsetUpdateTimeWindow(int offsetUpdateTimeWindow)
public void setOffsetUpdateCount(int offsetUpdateCount)
public void setOffsetUpdateShutdownTimeout(int offsetUpdateShutdownTimeout)
public void setOffsetStoreBatchBytes(int offsetStoreBatchBytes)
public void setOffsetStoreBatchTime(int offsetStoreBatchTime)
public org.springframework.integration.kafka.core.ConnectionFactory getConnectionFactory()
public void setRetryOperations(org.springframework.retry.RetryOperations retryOperations)
retryOperations - the retry configurationpublic void afterPropertiesSet()
throws java.lang.Exception
afterPropertiesSet in interface org.springframework.beans.factory.InitializingBeanafterPropertiesSet in class MessageBusSupportjava.lang.Exceptionpublic void destroy()
throws java.lang.Exception
destroy in interface org.springframework.beans.factory.DisposableBeanjava.lang.Exceptionpublic boolean isCapable(MessageBus.Capability capability)
MessageBusisCapable in interface MessageBusisCapable in class MessageBusSupportcapability - the capability.public static java.lang.String escapeTopicName(java.lang.String original)
public void setDefaultReplicationFactor(int defaultReplicationFactor)
public void setDefaultCompressionCodec(java.lang.String defaultCompressionCodec)
public void setDefaultRequiredAcks(int defaultRequiredAcks)
public void setDefaultAutoCommitOffsetEnabled(boolean defaultAutoCommitOffsetEnabled)
defaultAutoCommitOffsetEnabled - public void setDefaultQueueSize(int defaultQueueSize)
public void setDefaultFetchSize(int defaultFetchSize)
public void setDefaultMinPartitionCount(int defaultMinPartitionCount)
public void setDefaultMaxWait(int defaultMaxWait)
public void setDefaultSyncProducer(boolean syncProducer)
public void setDefaultSyncProducerTimeout(int timeout)
public void setMode(KafkaMessageBus.Mode mode)
public void setOffsetManagement(KafkaMessageBus.OffsetManagement offsetManagement)
KafkaMessageBus.OffsetManagement to use. Default:
KafkaMessageBus.OffsetManagement.kafkaTopic.offsetManagement - the offsetManagement.public void bindConsumer(java.lang.String name,
org.springframework.messaging.MessageChannel moduleInputChannel,
java.util.Properties properties)
MessageBusbindConsumer in interface MessageBusname - the logical identity of the message sourcemoduleInputChannel - the channel bound as a consumerproperties - arbitrary String key/value pairs that will be used in the bindingpublic void bindPubSubConsumer(java.lang.String name,
org.springframework.messaging.MessageChannel inputChannel,
java.util.Properties properties)
MessageBusbindPubSubConsumer in interface MessageBusname - the logical identity of the message sourceinputChannel - the channel bound as a pub/sub consumerproperties - arbitrary String key/value pairs that will be used in the bindingpublic void bindProducer(java.lang.String name,
org.springframework.messaging.MessageChannel moduleOutputChannel,
java.util.Properties properties)
MessageBusbindProducer in interface MessageBusname - the logical identity of the message targetmoduleOutputChannel - the channel bound as a producerproperties - arbitrary String key/value pairs that will be used in the bindingpublic void bindPubSubProducer(java.lang.String name,
org.springframework.messaging.MessageChannel outputChannel,
java.util.Properties properties)
MessageBusbindPubSubProducer in interface MessageBusname - the logical identity of the message targetoutputChannel - the channel bound as a producerproperties - arbitrary String key/value pairs that will be used in the bindingpublic void bindRequestor(java.lang.String name,
org.springframework.messaging.MessageChannel requests,
org.springframework.messaging.MessageChannel replies,
java.util.Properties properties)
MessageBusbindRequestor in interface MessageBusname - The name of the requestor.requests - The request channel - sends requests.replies - The reply channel - receives replies.properties - arbitrary String key/value pairs that will be used in the binding.public void bindReplier(java.lang.String name,
org.springframework.messaging.MessageChannel requests,
org.springframework.messaging.MessageChannel replies,
java.util.Properties properties)
MessageBusbindReplier in interface MessageBusname - The name of the requestor for which this replier will handle requests.requests - The request channel - receives requests.replies - The reply channel - sends replies.properties - arbitrary String key/value pairs that will be used in the binding.public org.springframework.integration.kafka.listener.KafkaMessageListenerContainer createMessageListenerContainer(java.util.Properties properties,
java.lang.String group,
int maxConcurrency,
java.lang.String topic,
long referencePoint,
boolean resetOffsets)
public void doManualAck(java.util.LinkedList<org.springframework.messaging.MessageHeaders> messageHeadersList)
MessageBusSupportdoManualAck in class MessageBusSupport