public class KafkaMessageBus extends MessageBusSupport implements org.springframework.beans.factory.DisposableBean
Stream definition | Kafka topic | Kafka partitions | Notes |
---|---|---|---|
foo = "http | log" | foo.0 | 1 partition | 1 producer, 1 consumer |
foo = "http | log", log.count=x | foo.0 | x partitions | 1 producer, x consumers with static group 'springXD', achieves queue semantics |
foo = "http | log", log.count=x + XD partitioning | still 1 topic 'foo.0' | x partitions + use key computed by XD | 1 producer, x consumers with static group 'springXD', achieves queue semantics |
foo = "http | log", log.count=x, concurrency=y | foo.0 | x*y partitions | 1 producer, x XD consumers, each with y threads |
Modifier and Type | Class and Description |
---|---|
static class |
KafkaMessageBus.Mode |
static class |
KafkaMessageBus.OffsetManagement |
MessageBusSupport.DirectHandler, MessageBusSupport.PartitioningMetadata, MessageBusSupport.SetBuilder, MessageBusSupport.SharedChannelProvider<T extends org.springframework.messaging.MessageChannel>
MessageBus.Capability
Modifier and Type | Field and Description |
---|---|
static java.lang.String |
AUTO_COMMIT_OFFSET_ENABLED |
static org.apache.kafka.common.serialization.ByteArraySerializer |
BYTE_ARRAY_SERIALIZER |
static java.lang.String |
COMPRESSION_CODEC |
static java.lang.String |
FETCH_SIZE |
static int |
METADATA_VERIFICATION_MAX_INTERVAL |
static int |
METADATA_VERIFICATION_RETRY_ATTEMPTS |
static double |
METADATA_VERIFICATION_RETRY_BACKOFF_MULTIPLIER |
static int |
METADATA_VERIFICATION_RETRY_INITIAL_INTERVAL |
protected static java.util.Set<java.lang.Object> |
PRODUCER_COMPRESSION_PROPERTIES |
static java.lang.String |
QUEUE_SIZE |
static java.lang.String |
REQUIRED_ACKS |
static java.lang.String |
SYNC_PRODUCER |
static java.lang.String |
SYNC_PRODUCER_TIMEOUT |
static org.I0Itec.zkclient.serialize.ZkSerializer |
utf8Serializer
Used when writing directly to ZK.
|
CONSUMER_RETRY_PROPERTIES, CONSUMER_STANDARD_PROPERTIES, defaultBackOffInitialInterval, defaultBackOffMaxInterval, defaultBackOffMultiplier, defaultBatchBufferLimit, defaultBatchingEnabled, defaultBatchSize, defaultBatchTimeout, defaultCompress, defaultConcurrency, defaultMaxAttempts, directChannelProvider, evaluationContext, JOB_CHANNEL_TYPE_PREFIX, logger, MEDIATYPES_MEDIATYPE_ALL, P2P_NAMED_CHANNEL_TYPE_PREFIX, PARTITION_HEADER, PRODUCER_BATCHING_ADVANCED_PROPERTIES, PRODUCER_BATCHING_BASIC_PROPERTIES, PRODUCER_PARTITIONING_PROPERTIES, PRODUCER_STANDARD_PROPERTIES, PUBSUB_NAMED_CHANNEL_TYPE_PREFIX, TAP_TYPE_PREFIX
Constructor and Description |
---|
KafkaMessageBus(org.springframework.integration.kafka.support.ZookeeperConnect zookeeperConnect,
java.lang.String brokers,
java.lang.String zkAddress,
org.springframework.integration.codec.Codec codec,
java.lang.String... headersToMap) |
Modifier and Type | Method and Description |
---|---|
void |
afterPropertiesSet() |
void |
bindConsumer(java.lang.String name,
org.springframework.messaging.MessageChannel moduleInputChannel,
java.util.Properties properties)
Bind a message consumer on a p2p channel
|
void |
bindProducer(java.lang.String name,
org.springframework.messaging.MessageChannel moduleOutputChannel,
java.util.Properties properties)
Bind a message producer on a p2p channel.
|
void |
bindPubSubConsumer(java.lang.String name,
org.springframework.messaging.MessageChannel inputChannel,
java.util.Properties properties)
Bind a message consumer on a pub/sub channel
|
void |
bindPubSubProducer(java.lang.String name,
org.springframework.messaging.MessageChannel outputChannel,
java.util.Properties properties)
Bind a message producer on a pub/sub channel.
|
void |
bindReplier(java.lang.String name,
org.springframework.messaging.MessageChannel requests,
org.springframework.messaging.MessageChannel replies,
java.util.Properties properties)
Bind a consumer that handles requests from a requestor and asynchronously sends replies.
|
void |
bindRequestor(java.lang.String name,
org.springframework.messaging.MessageChannel requests,
org.springframework.messaging.MessageChannel replies,
java.util.Properties properties)
Bind a producer that expects async replies.
|
org.springframework.integration.kafka.listener.KafkaMessageListenerContainer |
createMessageListenerContainer(java.util.Properties properties,
java.lang.String group,
int maxConcurrency,
java.lang.String topic,
long referencePoint,
boolean resetOffsets) |
void |
destroy() |
void |
doManualAck(java.util.LinkedList<org.springframework.messaging.MessageHeaders> messageHeadersList)
Perform manual acknowledgement based on the metadata stored in message bus.
|
static java.lang.String |
escapeTopicName(java.lang.String original)
Allowed chars are ASCII alphanumerics, '.', '_' and '-'.
|
org.springframework.integration.kafka.core.ConnectionFactory |
getConnectionFactory() |
boolean |
isCapable(MessageBus.Capability capability)
Return true if the bus supports the capability.
|
void |
setDefaultAutoCommitOffsetEnabled(boolean defaultAutoCommitOffsetEnabled)
Set the default auto commit enabled property; This is used to commit the offset either automatically or
manually.
|
void |
setDefaultCompressionCodec(java.lang.String defaultCompressionCodec) |
void |
setDefaultFetchSize(int defaultFetchSize) |
void |
setDefaultMaxWait(int defaultMaxWait) |
void |
setDefaultMinPartitionCount(int defaultMinPartitionCount) |
void |
setDefaultQueueSize(int defaultQueueSize) |
void |
setDefaultReplicationFactor(int defaultReplicationFactor) |
void |
setDefaultRequiredAcks(int defaultRequiredAcks) |
void |
setDefaultSyncProducer(boolean syncProducer) |
void |
setDefaultSyncProducerTimeout(int timeout) |
void |
setMode(KafkaMessageBus.Mode mode) |
void |
setOffsetManagement(KafkaMessageBus.OffsetManagement offsetManagement)
Set the
KafkaMessageBus.OffsetManagement to use. |
void |
setOffsetStoreBatchBytes(int offsetStoreBatchBytes) |
void |
setOffsetStoreBatchTime(int offsetStoreBatchTime) |
void |
setOffsetStoreMaxFetchSize(int offsetStoreMaxFetchSize) |
void |
setOffsetStoreRequiredAcks(int offsetStoreRequiredAcks) |
void |
setOffsetStoreRetentionTime(int offsetStoreRetentionTime) |
void |
setOffsetStoreSegmentSize(int offsetStoreSegmentSize) |
void |
setOffsetStoreTopic(java.lang.String offsetStoreTopic) |
void |
setOffsetUpdateCount(int offsetUpdateCount) |
void |
setOffsetUpdateShutdownTimeout(int offsetUpdateShutdownTimeout) |
void |
setOffsetUpdateTimeWindow(int offsetUpdateTimeWindow) |
void |
setRetryOperations(org.springframework.retry.RetryOperations retryOperations)
Retry configuration for operations such as validating topic creation
|
void |
setSocketBufferSize(int socketBufferSize) |
addBinding, applyPrefix, applyPubSub, applyRequests, bindDynamicProducer, bindDynamicPubSubProducer, bindExistingProducerDirectlyIfPossible, bindNewProducerDirectlyIfPossible, buildPartitionRoutingExpression, buildRetryTemplateIfRetryEnabled, constructDLQName, deleteBinding, deleteBindings, deserializePayloadIfNecessary, deserializePayloadIfNecessary, determinePartition, doBindDynamicProducer, doBindDynamicPubSubProducer, getApplicationContext, getBeanFactory, getIdGenerator, isNamedChannel, onInit, serializePayloadIfNecessary, setApplicationContext, setCodec, setDefaultBackOffInitialInterval, setDefaultBackOffMaxInterval, setDefaultBackOffMultiplier, setDefaultBatchBufferLimit, setDefaultBatchingEnabled, setDefaultBatchSize, setDefaultBatchTimeout, setDefaultCompress, setDefaultConcurrency, setDefaultDurableSubscription, setDefaultMaxAttempts, setIntegrationEvaluationContext, setPartitionSelector, stopBindings, unbindConsumer, unbindConsumers, unbindProducer, unbindProducers, validateConsumerProperties, validateProducerProperties
public static final org.apache.kafka.common.serialization.ByteArraySerializer BYTE_ARRAY_SERIALIZER
public static final int METADATA_VERIFICATION_RETRY_ATTEMPTS
public static final double METADATA_VERIFICATION_RETRY_BACKOFF_MULTIPLIER
public static final int METADATA_VERIFICATION_RETRY_INITIAL_INTERVAL
public static final int METADATA_VERIFICATION_MAX_INTERVAL
public static final java.lang.String FETCH_SIZE
public static final java.lang.String QUEUE_SIZE
public static final java.lang.String REQUIRED_ACKS
public static final java.lang.String COMPRESSION_CODEC
public static final java.lang.String AUTO_COMMIT_OFFSET_ENABLED
public static final java.lang.String SYNC_PRODUCER
public static final java.lang.String SYNC_PRODUCER_TIMEOUT
public static final org.I0Itec.zkclient.serialize.ZkSerializer utf8Serializer
protected static final java.util.Set<java.lang.Object> PRODUCER_COMPRESSION_PROPERTIES
public KafkaMessageBus(org.springframework.integration.kafka.support.ZookeeperConnect zookeeperConnect, java.lang.String brokers, java.lang.String zkAddress, org.springframework.integration.codec.Codec codec, java.lang.String... headersToMap)
public void setOffsetStoreTopic(java.lang.String offsetStoreTopic)
public void setOffsetStoreSegmentSize(int offsetStoreSegmentSize)
public void setOffsetStoreRetentionTime(int offsetStoreRetentionTime)
public void setSocketBufferSize(int socketBufferSize)
public void setOffsetStoreRequiredAcks(int offsetStoreRequiredAcks)
public void setOffsetStoreMaxFetchSize(int offsetStoreMaxFetchSize)
public void setOffsetUpdateTimeWindow(int offsetUpdateTimeWindow)
public void setOffsetUpdateCount(int offsetUpdateCount)
public void setOffsetUpdateShutdownTimeout(int offsetUpdateShutdownTimeout)
public void setOffsetStoreBatchBytes(int offsetStoreBatchBytes)
public void setOffsetStoreBatchTime(int offsetStoreBatchTime)
public org.springframework.integration.kafka.core.ConnectionFactory getConnectionFactory()
public void setRetryOperations(org.springframework.retry.RetryOperations retryOperations)
retryOperations
- the retry configurationpublic void afterPropertiesSet() throws java.lang.Exception
afterPropertiesSet
in interface org.springframework.beans.factory.InitializingBean
afterPropertiesSet
in class MessageBusSupport
java.lang.Exception
public void destroy() throws java.lang.Exception
destroy
in interface org.springframework.beans.factory.DisposableBean
java.lang.Exception
public boolean isCapable(MessageBus.Capability capability)
MessageBus
isCapable
in interface MessageBus
isCapable
in class MessageBusSupport
capability
- the capability.public static java.lang.String escapeTopicName(java.lang.String original)
public void setDefaultReplicationFactor(int defaultReplicationFactor)
public void setDefaultCompressionCodec(java.lang.String defaultCompressionCodec)
public void setDefaultRequiredAcks(int defaultRequiredAcks)
public void setDefaultAutoCommitOffsetEnabled(boolean defaultAutoCommitOffsetEnabled)
defaultAutoCommitOffsetEnabled
- public void setDefaultQueueSize(int defaultQueueSize)
public void setDefaultFetchSize(int defaultFetchSize)
public void setDefaultMinPartitionCount(int defaultMinPartitionCount)
public void setDefaultMaxWait(int defaultMaxWait)
public void setDefaultSyncProducer(boolean syncProducer)
public void setDefaultSyncProducerTimeout(int timeout)
public void setMode(KafkaMessageBus.Mode mode)
public void setOffsetManagement(KafkaMessageBus.OffsetManagement offsetManagement)
KafkaMessageBus.OffsetManagement
to use. Default:
KafkaMessageBus.OffsetManagement.kafkaTopic
.offsetManagement
- the offsetManagement.public void bindConsumer(java.lang.String name, org.springframework.messaging.MessageChannel moduleInputChannel, java.util.Properties properties)
MessageBus
bindConsumer
in interface MessageBus
name
- the logical identity of the message sourcemoduleInputChannel
- the channel bound as a consumerproperties
- arbitrary String key/value pairs that will be used in the bindingpublic void bindPubSubConsumer(java.lang.String name, org.springframework.messaging.MessageChannel inputChannel, java.util.Properties properties)
MessageBus
bindPubSubConsumer
in interface MessageBus
name
- the logical identity of the message sourceinputChannel
- the channel bound as a pub/sub consumerproperties
- arbitrary String key/value pairs that will be used in the bindingpublic void bindProducer(java.lang.String name, org.springframework.messaging.MessageChannel moduleOutputChannel, java.util.Properties properties)
MessageBus
bindProducer
in interface MessageBus
name
- the logical identity of the message targetmoduleOutputChannel
- the channel bound as a producerproperties
- arbitrary String key/value pairs that will be used in the bindingpublic void bindPubSubProducer(java.lang.String name, org.springframework.messaging.MessageChannel outputChannel, java.util.Properties properties)
MessageBus
bindPubSubProducer
in interface MessageBus
name
- the logical identity of the message targetoutputChannel
- the channel bound as a producerproperties
- arbitrary String key/value pairs that will be used in the bindingpublic void bindRequestor(java.lang.String name, org.springframework.messaging.MessageChannel requests, org.springframework.messaging.MessageChannel replies, java.util.Properties properties)
MessageBus
bindRequestor
in interface MessageBus
name
- The name of the requestor.requests
- The request channel - sends requests.replies
- The reply channel - receives replies.properties
- arbitrary String key/value pairs that will be used in the binding.public void bindReplier(java.lang.String name, org.springframework.messaging.MessageChannel requests, org.springframework.messaging.MessageChannel replies, java.util.Properties properties)
MessageBus
bindReplier
in interface MessageBus
name
- The name of the requestor for which this replier will handle requests.requests
- The request channel - receives requests.replies
- The reply channel - sends replies.properties
- arbitrary String key/value pairs that will be used in the binding.public org.springframework.integration.kafka.listener.KafkaMessageListenerContainer createMessageListenerContainer(java.util.Properties properties, java.lang.String group, int maxConcurrency, java.lang.String topic, long referencePoint, boolean resetOffsets)
public void doManualAck(java.util.LinkedList<org.springframework.messaging.MessageHeaders> messageHeadersList)
MessageBusSupport
doManualAck
in class MessageBusSupport