public class KafkaMessageBus extends MessageBusSupport
Stream definition | Kafka topic | Kafka partitions | Notes |
---|---|---|---|
foo = "http | log" | foo.0 | 1 partition | 1 producer, 1 consumer |
foo = "http | log", log.count=x | foo.0 | x partitions | 1 producer, x consumers with static group 'springXD', achieves queue semantics |
foo = "http | log", log.count=x + XD partitioning | still 1 topic 'foo.0' | x partitions + use key computed by XD | 1 producer, x consumers with static group 'springXD', achieves queue semantics |
foo = "http | log", log.count=x, concurrency=y | foo.0 | x*y partitions | 1 producer, x XD consumers, each with y threads |
foo = "http | log", log.count=0, x actual log containers | foo.0 | 10(configurable) partitions | 1 producer, x XD consumers. Can't know the number of partitions beforehand, so decide a number that better be greater than number of containers |
MessageBusSupport.DirectHandler, MessageBusSupport.PartitioningMetadata, MessageBusSupport.SetBuilder, MessageBusSupport.SharedChannelProvider<T extends org.springframework.messaging.MessageChannel>
Modifier and Type | Field and Description |
---|---|
static java.lang.String |
AUTO_COMMIT_ENABLED |
static java.lang.String |
BATCH_SIZE |
static java.lang.String |
BATCH_TIMEOUT |
static java.lang.String |
BATCHING_ENABLED |
static java.lang.String |
COMPRESSION_CODEC |
static java.lang.String |
CONCURRENCY |
static java.lang.String |
FETCH_SIZE |
static int |
METADATA_VERIFICATION_MAX_INTERVAL |
static int |
METADATA_VERIFICATION_RETRY_ATTEMPTS |
static double |
METADATA_VERIFICATION_RETRY_BACKOFF_MULTIPLIER |
static int |
METADATA_VERIFICATION_RETRY_INITIAL_INTERVAL |
protected static java.util.Set<java.lang.Object> |
PRODUCER_COMPRESSION_PROPERTIES |
static java.lang.String |
QUEUE_SIZE |
static java.lang.String |
REPLICATION_FACTOR |
static java.lang.String |
REQUIRED_ACKS |
static org.I0Itec.zkclient.serialize.ZkSerializer |
utf8Serializer
Used when writing directly to ZK.
|
CONSUMER_RETRY_PROPERTIES, CONSUMER_STANDARD_PROPERTIES, defaultBackOffInitialInterval, defaultBackOffMaxInterval, defaultBackOffMultiplier, defaultBatchBufferLimit, defaultBatchingEnabled, defaultBatchSize, defaultBatchTimeout, defaultCompress, defaultConcurrency, defaultMaxAttempts, directChannelProvider, evaluationContext, JOB_CHANNEL_TYPE_PREFIX, logger, MEDIATYPES_MEDIATYPE_ALL, P2P_NAMED_CHANNEL_TYPE_PREFIX, PARTITION_HEADER, PRODUCER_BATCHING_ADVANCED_PROPERTIES, PRODUCER_BATCHING_BASIC_PROPERTIES, PRODUCER_PARTITIONING_PROPERTIES, PRODUCER_STANDARD_PROPERTIES, PUBSUB_NAMED_CHANNEL_TYPE_PREFIX
Constructor and Description |
---|
KafkaMessageBus(org.springframework.integration.kafka.support.ZookeeperConnect zookeeperConnect,
java.lang.String brokers,
java.lang.String zkAddress,
MultiTypeCodec<java.lang.Object> codec,
java.lang.String... headersToMap) |
Modifier and Type | Method and Description |
---|---|
void |
afterPropertiesSet() |
void |
bindConsumer(java.lang.String name,
org.springframework.messaging.MessageChannel moduleInputChannel,
java.util.Properties properties)
Bind a message consumer on a p2p channel
|
void |
bindProducer(java.lang.String name,
org.springframework.messaging.MessageChannel moduleOutputChannel,
java.util.Properties properties)
Bind a message producer on a p2p channel.
|
void |
bindPubSubConsumer(java.lang.String name,
org.springframework.messaging.MessageChannel inputChannel,
java.util.Properties properties)
Bind a message consumer on a pub/sub channel
|
void |
bindPubSubProducer(java.lang.String name,
org.springframework.messaging.MessageChannel outputChannel,
java.util.Properties properties)
Bind a message producer on a pub/sub channel.
|
void |
bindReplier(java.lang.String name,
org.springframework.messaging.MessageChannel requests,
org.springframework.messaging.MessageChannel replies,
java.util.Properties properties)
Bind a consumer that handles requests from a requestor and asynchronously sends replies.
|
void |
bindRequestor(java.lang.String name,
org.springframework.messaging.MessageChannel requests,
org.springframework.messaging.MessageChannel replies,
java.util.Properties properties)
Bind a producer that expects async replies.
|
org.springframework.integration.kafka.listener.KafkaMessageListenerContainer |
createMessageListenerContainer(java.util.Properties properties,
java.lang.String group,
int numThreads,
java.lang.String topic,
long referencePoint) |
void |
doManualAck(java.util.LinkedList<org.springframework.messaging.MessageHeaders> messageHeadersList)
Perform manual acknowledgement based on the metadata stored in message bus.
|
static java.lang.String |
escapeTopicName(java.lang.String original)
Allowed chars are ASCII alphanumerics, '.', '_' and '-'.
|
void |
setDefaultAutoCommitEnabled(boolean defaultAutoCommitEnabled)
Set the default auto commit enabled property; This is used to
commit the offset either automatically or manually.
|
void |
setDefaultCompressionCodec(java.lang.String defaultCompressionCodec) |
void |
setDefaultFetchSize(int defaultFetchSize) |
void |
setDefaultQueueSize(int defaultQueueSize) |
void |
setDefaultReplicationFactor(int defaultReplicationFactor) |
void |
setDefaultRequiredAcks(int defaultRequiredAcks) |
void |
setOffsetStoreBatchEnabled(boolean offsetStoreBatchEnabled) |
void |
setOffsetStoreBatchSize(int offsetStoreBatchSize) |
void |
setOffsetStoreBatchTime(int offsetStoreBatchTime) |
void |
setOffsetStoreMaxFetchSize(int offsetStoreMaxFetchSize) |
void |
setOffsetStoreRequiredAcks(int offsetStoreRequiredAcks) |
void |
setOffsetStoreRetentionTime(int offsetStoreRetentionTime) |
void |
setOffsetStoreSegmentSize(int offsetStoreSegmentSize) |
void |
setOffsetStoreTopic(java.lang.String offsetStoreTopic) |
void |
setOffsetUpdateCount(int offsetUpdateCount) |
void |
setOffsetUpdateShutdownTimeout(int offsetUpdateShutdownTimeout) |
void |
setOffsetUpdateTimeWindow(int offsetUpdateTimeWindow) |
void |
setRetryOperations(org.springframework.retry.RetryOperations retryOperations)
Retry configuration for operations such as validating topic creation
|
void |
setSocketBufferSize(int socketBufferSize) |
addBinding, bindDynamicProducer, bindDynamicPubSubProducer, bindExistingProducerDirectlyIfPossible, bindNewProducerDirectlyIfPossible, buildPartitionRoutingExpression, buildRetryTemplateIfRetryEnabled, deleteBinding, deleteBindings, deserializePayloadIfNecessary, determinePartition, doBindDynamicProducer, doBindDynamicPubSubProducer, getApplicationContext, getBeanFactory, getIdGenerator, isNamedChannel, onInit, serializePayloadIfNecessary, setApplicationContext, setCodec, setDefaultBackOffInitialInterval, setDefaultBackOffMaxInterval, setDefaultBackOffMultiplier, setDefaultBatchBufferLimit, setDefaultBatchingEnabled, setDefaultBatchSize, setDefaultBatchTimeout, setDefaultCompress, setDefaultConcurrency, setDefaultMaxAttempts, setIntegrationEvaluationContext, setPartitionSelector, stopBindings, unbindConsumer, unbindConsumers, unbindProducer, unbindProducers, validateConsumerProperties, validateProducerProperties
public static final int METADATA_VERIFICATION_RETRY_ATTEMPTS
public static final double METADATA_VERIFICATION_RETRY_BACKOFF_MULTIPLIER
public static final int METADATA_VERIFICATION_RETRY_INITIAL_INTERVAL
public static final int METADATA_VERIFICATION_MAX_INTERVAL
public static final java.lang.String BATCHING_ENABLED
public static final java.lang.String BATCH_SIZE
public static final java.lang.String BATCH_TIMEOUT
public static final java.lang.String REPLICATION_FACTOR
public static final java.lang.String CONCURRENCY
public static final java.lang.String FETCH_SIZE
public static final java.lang.String QUEUE_SIZE
public static final java.lang.String REQUIRED_ACKS
public static final java.lang.String COMPRESSION_CODEC
public static final java.lang.String AUTO_COMMIT_ENABLED
public static final org.I0Itec.zkclient.serialize.ZkSerializer utf8Serializer
protected static final java.util.Set<java.lang.Object> PRODUCER_COMPRESSION_PROPERTIES
public KafkaMessageBus(org.springframework.integration.kafka.support.ZookeeperConnect zookeeperConnect, java.lang.String brokers, java.lang.String zkAddress, MultiTypeCodec<java.lang.Object> codec, java.lang.String... headersToMap)
public void setOffsetStoreTopic(java.lang.String offsetStoreTopic)
public void setOffsetStoreSegmentSize(int offsetStoreSegmentSize)
public void setOffsetStoreRetentionTime(int offsetStoreRetentionTime)
public void setSocketBufferSize(int socketBufferSize)
public void setOffsetStoreRequiredAcks(int offsetStoreRequiredAcks)
public void setOffsetStoreMaxFetchSize(int offsetStoreMaxFetchSize)
public void setOffsetUpdateTimeWindow(int offsetUpdateTimeWindow)
public void setOffsetUpdateCount(int offsetUpdateCount)
public void setOffsetUpdateShutdownTimeout(int offsetUpdateShutdownTimeout)
public void setOffsetStoreBatchEnabled(boolean offsetStoreBatchEnabled)
public void setOffsetStoreBatchSize(int offsetStoreBatchSize)
public void setOffsetStoreBatchTime(int offsetStoreBatchTime)
public void setRetryOperations(org.springframework.retry.RetryOperations retryOperations)
retryOperations
- public void afterPropertiesSet() throws java.lang.Exception
afterPropertiesSet
in interface org.springframework.beans.factory.InitializingBean
afterPropertiesSet
in class MessageBusSupport
java.lang.Exception
public static java.lang.String escapeTopicName(java.lang.String original)
public void setDefaultReplicationFactor(int defaultReplicationFactor)
public void setDefaultCompressionCodec(java.lang.String defaultCompressionCodec)
public void setDefaultRequiredAcks(int defaultRequiredAcks)
public void setDefaultAutoCommitEnabled(boolean defaultAutoCommitEnabled)
defaultAutoCommitEnabled
- public void setDefaultQueueSize(int defaultQueueSize)
public void setDefaultFetchSize(int defaultFetchSize)
public void bindConsumer(java.lang.String name, org.springframework.messaging.MessageChannel moduleInputChannel, java.util.Properties properties)
MessageBus
name
- the logical identity of the message sourcemoduleInputChannel
- the channel bound as a consumerproperties
- arbitrary String key/value pairs that will be used in the bindingpublic void bindPubSubConsumer(java.lang.String name, org.springframework.messaging.MessageChannel inputChannel, java.util.Properties properties)
MessageBus
name
- the logical identity of the message sourceinputChannel
- the channel bound as a pub/sub consumerproperties
- arbitrary String key/value pairs that will be used in the bindingpublic void bindProducer(java.lang.String name, org.springframework.messaging.MessageChannel moduleOutputChannel, java.util.Properties properties)
MessageBus
name
- the logical identity of the message targetmoduleOutputChannel
- the channel bound as a producerproperties
- arbitrary String key/value pairs that will be used in the bindingpublic void bindPubSubProducer(java.lang.String name, org.springframework.messaging.MessageChannel outputChannel, java.util.Properties properties)
MessageBus
name
- the logical identity of the message targetoutputChannel
- the channel bound as a producerproperties
- arbitrary String key/value pairs that will be used in the bindingpublic void bindRequestor(java.lang.String name, org.springframework.messaging.MessageChannel requests, org.springframework.messaging.MessageChannel replies, java.util.Properties properties)
MessageBus
name
- The name of the requestor.requests
- The request channel - sends requests.replies
- The reply channel - receives replies.properties
- arbitrary String key/value pairs that will be used in the binding.public void bindReplier(java.lang.String name, org.springframework.messaging.MessageChannel requests, org.springframework.messaging.MessageChannel replies, java.util.Properties properties)
MessageBus
name
- The name of the requestor for which this replier will handle requests.requests
- The request channel - receives requests.replies
- The reply channel - sends replies.properties
- arbitrary String key/value pairs that will be used in the binding.public org.springframework.integration.kafka.listener.KafkaMessageListenerContainer createMessageListenerContainer(java.util.Properties properties, java.lang.String group, int numThreads, java.lang.String topic, long referencePoint)
public void doManualAck(java.util.LinkedList<org.springframework.messaging.MessageHeaders> messageHeadersList)
MessageBusSupport
doManualAck
in class MessageBusSupport