public class KafkaMessageBus extends MessageBusSupport
Stream definition | Kafka topic | Kafka partitions | Notes |
---|---|---|---|
foo = "http | log" | foo.0 | 1 partition | 1 producer, 1 consumer |
foo = "http | log", log.count=x | foo.0 | x partitions | 1 producer, x consumers with static group 'springXD', achieves queue semantics |
foo = "http | log", log.count=x + XD partitioning | still 1 topic 'foo.0' | x partitions + use key computed by XD | 1 producer, x consumers with static group 'springXD', achieves queue semantics |
foo = "http | log", log.count=x, concurrency=y | foo.0 | x*y partitions | 1 producer, x XD consumers, each with y threads |
foo = "http | log", log.count=0, x actual log containers | foo.0 | 10(configurable) partitions | 1 producer, x XD consumers. Can't know the number of partitions beforehand, so decide a number that better be greater than number of containers |
Modifier and Type | Class and Description |
---|---|
static class |
KafkaMessageBus.Mode |
MessageBusSupport.DirectHandler, MessageBusSupport.PartitioningMetadata, MessageBusSupport.SetBuilder, MessageBusSupport.SharedChannelProvider<T extends org.springframework.messaging.MessageChannel>
MessageBus.Capability
Modifier and Type | Field and Description |
---|---|
static java.lang.String |
AUTO_COMMIT_ENABLED |
static org.apache.kafka.common.serialization.ByteArraySerializer |
BYTE_ARRAY_SERIALIZER |
static java.lang.String |
COMPRESSION_CODEC |
static java.lang.String |
FETCH_SIZE |
static int |
METADATA_VERIFICATION_MAX_INTERVAL |
static int |
METADATA_VERIFICATION_RETRY_ATTEMPTS |
static double |
METADATA_VERIFICATION_RETRY_BACKOFF_MULTIPLIER |
static int |
METADATA_VERIFICATION_RETRY_INITIAL_INTERVAL |
protected static java.util.Set<java.lang.Object> |
PRODUCER_COMPRESSION_PROPERTIES |
static java.lang.String |
QUEUE_SIZE |
static java.lang.String |
REQUIRED_ACKS |
static org.I0Itec.zkclient.serialize.ZkSerializer |
utf8Serializer
Used when writing directly to ZK.
|
CONSUMER_RETRY_PROPERTIES, CONSUMER_STANDARD_PROPERTIES, defaultBackOffInitialInterval, defaultBackOffMaxInterval, defaultBackOffMultiplier, defaultBatchBufferLimit, defaultBatchingEnabled, defaultBatchSize, defaultBatchTimeout, defaultCompress, defaultConcurrency, defaultDurableSubscription, defaultMaxAttempts, directChannelProvider, evaluationContext, JOB_CHANNEL_TYPE_PREFIX, logger, MEDIATYPES_MEDIATYPE_ALL, P2P_NAMED_CHANNEL_TYPE_PREFIX, PARTITION_HEADER, PRODUCER_BATCHING_ADVANCED_PROPERTIES, PRODUCER_BATCHING_BASIC_PROPERTIES, PRODUCER_PARTITIONING_PROPERTIES, PRODUCER_STANDARD_PROPERTIES, PUBSUB_NAMED_CHANNEL_TYPE_PREFIX
Constructor and Description |
---|
KafkaMessageBus(org.springframework.integration.kafka.support.ZookeeperConnect zookeeperConnect,
java.lang.String brokers,
java.lang.String zkAddress,
MultiTypeCodec<java.lang.Object> codec,
java.lang.String... headersToMap) |
Modifier and Type | Method and Description |
---|---|
void |
afterPropertiesSet() |
void |
bindConsumer(java.lang.String name,
org.springframework.messaging.MessageChannel moduleInputChannel,
java.util.Properties properties)
Bind a message consumer on a p2p channel
|
void |
bindProducer(java.lang.String name,
org.springframework.messaging.MessageChannel moduleOutputChannel,
java.util.Properties properties)
Bind a message producer on a p2p channel.
|
void |
bindPubSubConsumer(java.lang.String name,
org.springframework.messaging.MessageChannel inputChannel,
java.util.Properties properties)
Bind a message consumer on a pub/sub channel
|
void |
bindPubSubProducer(java.lang.String name,
org.springframework.messaging.MessageChannel outputChannel,
java.util.Properties properties)
Bind a message producer on a pub/sub channel.
|
void |
bindReplier(java.lang.String name,
org.springframework.messaging.MessageChannel requests,
org.springframework.messaging.MessageChannel replies,
java.util.Properties properties)
Bind a consumer that handles requests from a requestor and asynchronously sends replies.
|
void |
bindRequestor(java.lang.String name,
org.springframework.messaging.MessageChannel requests,
org.springframework.messaging.MessageChannel replies,
java.util.Properties properties)
Bind a producer that expects async replies.
|
org.springframework.integration.kafka.listener.KafkaMessageListenerContainer |
createMessageListenerContainer(java.util.Properties properties,
java.lang.String group,
int maxConcurrency,
java.lang.String topic,
long referencePoint) |
void |
doManualAck(java.util.LinkedList<org.springframework.messaging.MessageHeaders> messageHeadersList)
Perform manual acknowledgement based on the metadata stored in message bus.
|
static java.lang.String |
escapeTopicName(java.lang.String original)
Allowed chars are ASCII alphanumerics, '.', '_' and '-'.
|
org.springframework.integration.kafka.core.ConnectionFactory |
getConnectionFactory() |
void |
setDefaultAutoCommitEnabled(boolean defaultAutoCommitEnabled)
Set the default auto commit enabled property; This is used to commit the offset either automatically or
manually.
|
void |
setDefaultCompressionCodec(java.lang.String defaultCompressionCodec) |
void |
setDefaultFetchSize(int defaultFetchSize) |
void |
setDefaultMaxWait(int defaultMaxWait) |
void |
setDefaultMinPartitionCount(int defaultMinPartitionCount) |
void |
setDefaultQueueSize(int defaultQueueSize) |
void |
setDefaultReplicationFactor(int defaultReplicationFactor) |
void |
setDefaultRequiredAcks(int defaultRequiredAcks) |
void |
setMode(KafkaMessageBus.Mode mode) |
void |
setOffsetStoreBatchBytes(int offsetStoreBatchBytes) |
void |
setOffsetStoreBatchTime(int offsetStoreBatchTime) |
void |
setOffsetStoreMaxFetchSize(int offsetStoreMaxFetchSize) |
void |
setOffsetStoreRequiredAcks(int offsetStoreRequiredAcks) |
void |
setOffsetStoreRetentionTime(int offsetStoreRetentionTime) |
void |
setOffsetStoreSegmentSize(int offsetStoreSegmentSize) |
void |
setOffsetStoreTopic(java.lang.String offsetStoreTopic) |
void |
setOffsetUpdateCount(int offsetUpdateCount) |
void |
setOffsetUpdateShutdownTimeout(int offsetUpdateShutdownTimeout) |
void |
setOffsetUpdateTimeWindow(int offsetUpdateTimeWindow) |
void |
setRetryOperations(org.springframework.retry.RetryOperations retryOperations)
Retry configuration for operations such as validating topic creation
|
void |
setSocketBufferSize(int socketBufferSize) |
addBinding, applyPrefix, applyPubSub, applyRequests, bindDynamicProducer, bindDynamicPubSubProducer, bindExistingProducerDirectlyIfPossible, bindNewProducerDirectlyIfPossible, buildPartitionRoutingExpression, buildRetryTemplateIfRetryEnabled, constructDLQName, deleteBinding, deleteBindings, deserializePayloadIfNecessary, deserializePayloadIfNecessary, determinePartition, doBindDynamicProducer, doBindDynamicPubSubProducer, getApplicationContext, getBeanFactory, getIdGenerator, isCapable, isNamedChannel, onInit, serializePayloadIfNecessary, setApplicationContext, setCodec, setDefaultBackOffInitialInterval, setDefaultBackOffMaxInterval, setDefaultBackOffMultiplier, setDefaultBatchBufferLimit, setDefaultBatchingEnabled, setDefaultBatchSize, setDefaultBatchTimeout, setDefaultCompress, setDefaultConcurrency, setDefaultDurableSubscription, setDefaultMaxAttempts, setIntegrationEvaluationContext, setPartitionSelector, stopBindings, unbindConsumer, unbindConsumers, unbindProducer, unbindProducers, validateConsumerProperties, validateProducerProperties
public static final org.apache.kafka.common.serialization.ByteArraySerializer BYTE_ARRAY_SERIALIZER
public static final int METADATA_VERIFICATION_RETRY_ATTEMPTS
public static final double METADATA_VERIFICATION_RETRY_BACKOFF_MULTIPLIER
public static final int METADATA_VERIFICATION_RETRY_INITIAL_INTERVAL
public static final int METADATA_VERIFICATION_MAX_INTERVAL
public static final java.lang.String FETCH_SIZE
public static final java.lang.String QUEUE_SIZE
public static final java.lang.String REQUIRED_ACKS
public static final java.lang.String COMPRESSION_CODEC
public static final java.lang.String AUTO_COMMIT_ENABLED
public static final org.I0Itec.zkclient.serialize.ZkSerializer utf8Serializer
protected static final java.util.Set<java.lang.Object> PRODUCER_COMPRESSION_PROPERTIES
public KafkaMessageBus(org.springframework.integration.kafka.support.ZookeeperConnect zookeeperConnect, java.lang.String brokers, java.lang.String zkAddress, MultiTypeCodec<java.lang.Object> codec, java.lang.String... headersToMap)
public void setOffsetStoreTopic(java.lang.String offsetStoreTopic)
public void setOffsetStoreSegmentSize(int offsetStoreSegmentSize)
public void setOffsetStoreRetentionTime(int offsetStoreRetentionTime)
public void setSocketBufferSize(int socketBufferSize)
public void setOffsetStoreRequiredAcks(int offsetStoreRequiredAcks)
public void setOffsetStoreMaxFetchSize(int offsetStoreMaxFetchSize)
public void setOffsetUpdateTimeWindow(int offsetUpdateTimeWindow)
public void setOffsetUpdateCount(int offsetUpdateCount)
public void setOffsetUpdateShutdownTimeout(int offsetUpdateShutdownTimeout)
public void setOffsetStoreBatchBytes(int offsetStoreBatchBytes)
public void setOffsetStoreBatchTime(int offsetStoreBatchTime)
public org.springframework.integration.kafka.core.ConnectionFactory getConnectionFactory()
public void setRetryOperations(org.springframework.retry.RetryOperations retryOperations)
retryOperations
- the retry configurationpublic void afterPropertiesSet() throws java.lang.Exception
afterPropertiesSet
in interface org.springframework.beans.factory.InitializingBean
afterPropertiesSet
in class MessageBusSupport
java.lang.Exception
public static java.lang.String escapeTopicName(java.lang.String original)
public void setDefaultReplicationFactor(int defaultReplicationFactor)
public void setDefaultCompressionCodec(java.lang.String defaultCompressionCodec)
public void setDefaultRequiredAcks(int defaultRequiredAcks)
public void setDefaultAutoCommitEnabled(boolean defaultAutoCommitEnabled)
defaultAutoCommitEnabled
- public void setDefaultQueueSize(int defaultQueueSize)
public void setDefaultFetchSize(int defaultFetchSize)
public void setDefaultMinPartitionCount(int defaultMinPartitionCount)
public void setDefaultMaxWait(int defaultMaxWait)
public void setMode(KafkaMessageBus.Mode mode)
public void bindConsumer(java.lang.String name, org.springframework.messaging.MessageChannel moduleInputChannel, java.util.Properties properties)
MessageBus
name
- the logical identity of the message sourcemoduleInputChannel
- the channel bound as a consumerproperties
- arbitrary String key/value pairs that will be used in the bindingpublic void bindPubSubConsumer(java.lang.String name, org.springframework.messaging.MessageChannel inputChannel, java.util.Properties properties)
MessageBus
name
- the logical identity of the message sourceinputChannel
- the channel bound as a pub/sub consumerproperties
- arbitrary String key/value pairs that will be used in the bindingpublic void bindProducer(java.lang.String name, org.springframework.messaging.MessageChannel moduleOutputChannel, java.util.Properties properties)
MessageBus
name
- the logical identity of the message targetmoduleOutputChannel
- the channel bound as a producerproperties
- arbitrary String key/value pairs that will be used in the bindingpublic void bindPubSubProducer(java.lang.String name, org.springframework.messaging.MessageChannel outputChannel, java.util.Properties properties)
MessageBus
name
- the logical identity of the message targetoutputChannel
- the channel bound as a producerproperties
- arbitrary String key/value pairs that will be used in the bindingpublic void bindRequestor(java.lang.String name, org.springframework.messaging.MessageChannel requests, org.springframework.messaging.MessageChannel replies, java.util.Properties properties)
MessageBus
name
- The name of the requestor.requests
- The request channel - sends requests.replies
- The reply channel - receives replies.properties
- arbitrary String key/value pairs that will be used in the binding.public void bindReplier(java.lang.String name, org.springframework.messaging.MessageChannel requests, org.springframework.messaging.MessageChannel replies, java.util.Properties properties)
MessageBus
name
- The name of the requestor for which this replier will handle requests.requests
- The request channel - receives requests.replies
- The reply channel - sends replies.properties
- arbitrary String key/value pairs that will be used in the binding.public org.springframework.integration.kafka.listener.KafkaMessageListenerContainer createMessageListenerContainer(java.util.Properties properties, java.lang.String group, int maxConcurrency, java.lang.String topic, long referencePoint)
public void doManualAck(java.util.LinkedList<org.springframework.messaging.MessageHeaders> messageHeadersList)
MessageBusSupport
doManualAck
in class MessageBusSupport