public class KafkaMessageBus extends MessageBusSupport
Stream definition | Kafka topic | Kafka partitions | Notes |
---|---|---|---|
foo = "http | log" | foo.0 | 1 partition | 1 producer, 1 consumer |
foo = "http | log", log.count=x | foo.0 | x partitions | 1 producer, x consumers with static group 'springXD', achieves queue semantics |
foo = "http | log", log.count=x + XD partitioning | still 1 topic 'foo.0' | x partitions + use key computed by XD | 1 producer, x consumers with static group 'springXD', achieves queue semantics |
foo = "http | log", log.count=x, concurrency=y | foo.0 | x*y partitions | 1 producer, x XD consumers, each with y threads |
foo = "http | log", log.count=0, x actual log containers | foo.0 | 10(configurable) partitions | 1 producer, x XD consumers. Can't know the number of partitions beforehand, so decide a number that better be greater than number of containers |
MessageBusSupport.DirectHandler, MessageBusSupport.PartitioningMetadata, MessageBusSupport.SetBuilder, MessageBusSupport.SharedChannelProvider<T extends org.springframework.messaging.MessageChannel>
Modifier and Type | Field and Description |
---|---|
static java.lang.String |
BATCH_SIZE |
static java.lang.String |
BATCH_TIMEOUT |
static java.lang.String |
BATCHING_ENABLED |
static java.lang.String |
COMPRESSION_CODEC |
static java.lang.String |
CONCURRENCY |
static int |
METADATA_VERIFICATION_MAX_INTERVAL |
static int |
METADATA_VERIFICATION_RETRY_ATTEMPTS |
static double |
METADATA_VERIFICATION_RETRY_BACKOFF_MULTIPLIER |
static int |
METADATA_VERIFICATION_RETRY_INITIAL_INTERVAL |
protected static java.util.Set<java.lang.Object> |
PRODUCER_COMPRESSION_PROPERTIES |
static java.lang.String |
REPLICATION_FACTOR |
static java.lang.String |
REQUIRED_ACKS |
static org.I0Itec.zkclient.serialize.ZkSerializer |
utf8Serializer
Used when writing directly to ZK.
|
CONSUMER_RETRY_PROPERTIES, CONSUMER_STANDARD_PROPERTIES, defaultBackOffInitialInterval, defaultBackOffMaxInterval, defaultBackOffMultiplier, defaultBatchBufferLimit, defaultBatchingEnabled, defaultBatchSize, defaultBatchTimeout, defaultCompress, defaultConcurrency, defaultMaxAttempts, directChannelProvider, evaluationContext, JOB_CHANNEL_TYPE_PREFIX, logger, MEDIATYPES_MEDIATYPE_ALL, P2P_NAMED_CHANNEL_TYPE_PREFIX, PARTITION_HEADER, PRODUCER_BATCHING_ADVANCED_PROPERTIES, PRODUCER_BATCHING_BASIC_PROPERTIES, PRODUCER_PARTITIONING_PROPERTIES, PRODUCER_STANDARD_PROPERTIES, PUBSUB_NAMED_CHANNEL_TYPE_PREFIX
Constructor and Description |
---|
KafkaMessageBus(org.springframework.integration.kafka.support.ZookeeperConnect zookeeperConnect,
java.lang.String brokers,
java.lang.String zkAddress,
MultiTypeCodec<java.lang.Object> codec,
java.lang.String... headersToMap) |
Modifier and Type | Method and Description |
---|---|
void |
afterPropertiesSet() |
void |
bindConsumer(java.lang.String name,
org.springframework.messaging.MessageChannel moduleInputChannel,
java.util.Properties properties)
Bind a message consumer on a p2p channel
|
void |
bindProducer(java.lang.String name,
org.springframework.messaging.MessageChannel moduleOutputChannel,
java.util.Properties properties)
Bind a message producer on a p2p channel.
|
void |
bindPubSubConsumer(java.lang.String name,
org.springframework.messaging.MessageChannel inputChannel,
java.util.Properties properties)
Bind a message consumer on a pub/sub channel
|
void |
bindPubSubProducer(java.lang.String name,
org.springframework.messaging.MessageChannel outputChannel,
java.util.Properties properties)
Bind a message producer on a pub/sub channel.
|
void |
bindReplier(java.lang.String name,
org.springframework.messaging.MessageChannel requests,
org.springframework.messaging.MessageChannel replies,
java.util.Properties properties)
Bind a consumer that handles requests from a requestor and asynchronously sends replies.
|
void |
bindRequestor(java.lang.String name,
org.springframework.messaging.MessageChannel requests,
org.springframework.messaging.MessageChannel replies,
java.util.Properties properties)
Bind a producer that expects async replies.
|
org.springframework.integration.kafka.listener.KafkaMessageListenerContainer |
createMessageListenerContainer(java.lang.String group,
int numThreads,
java.util.Collection<org.springframework.integration.kafka.core.Partition> listenedPartitions,
long referencePoint) |
org.springframework.integration.kafka.listener.KafkaMessageListenerContainer |
createMessageListenerContainer(java.lang.String group,
int numThreads,
java.lang.String topic,
long referencePoint) |
static java.lang.String |
escapeTopicName(java.lang.String original)
Allowed chars are ASCII alphanumerics, '.', '_' and '-'.
|
java.lang.String[] |
getMessageBusSpecificProperties() |
void |
setDefaultCompressionCodec(java.lang.String defaultCompressionCodec) |
void |
setDefaultReplicationFactor(int defaultReplicationFactor) |
void |
setDefaultRequiredAcks(int defaultRequiredAcks) |
void |
setOffsetStoreTopic(java.lang.String offsetStoreTopic) |
void |
setRetryOperations(org.springframework.retry.RetryOperations retryOperations)
Retry configuration for operations such as validating topic creation
|
addBinding, bindDynamicProducer, bindDynamicPubSubProducer, bindExistingProducerDirectlyIfPossible, bindNewProducerDirectlyIfPossible, buildPartitionRoutingExpression, buildRetryTemplateIfRetryEnabled, deleteBinding, deleteBindings, deserializePayloadIfNecessary, determinePartition, doBindDynamicProducer, doBindDynamicPubSubProducer, getApplicationContext, getBeanFactory, getIdGenerator, isNamedChannel, onInit, serializePayloadIfNecessary, setApplicationContext, setCodec, setDefaultBackOffInitialInterval, setDefaultBackOffMaxInterval, setDefaultBackOffMultiplier, setDefaultBatchBufferLimit, setDefaultBatchingEnabled, setDefaultBatchSize, setDefaultBatchTimeout, setDefaultCompress, setDefaultConcurrency, setDefaultMaxAttempts, setIntegrationEvaluationContext, setPartitionSelector, stopBindings, unbindConsumer, unbindConsumers, unbindProducer, unbindProducers, validateConsumerProperties, validateProducerProperties
public static final int METADATA_VERIFICATION_RETRY_ATTEMPTS
public static final double METADATA_VERIFICATION_RETRY_BACKOFF_MULTIPLIER
public static final int METADATA_VERIFICATION_RETRY_INITIAL_INTERVAL
public static final int METADATA_VERIFICATION_MAX_INTERVAL
public static final java.lang.String BATCHING_ENABLED
public static final java.lang.String BATCH_SIZE
public static final java.lang.String BATCH_TIMEOUT
public static final java.lang.String REPLICATION_FACTOR
public static final java.lang.String CONCURRENCY
public static final java.lang.String REQUIRED_ACKS
public static final java.lang.String COMPRESSION_CODEC
public static final org.I0Itec.zkclient.serialize.ZkSerializer utf8Serializer
protected static final java.util.Set<java.lang.Object> PRODUCER_COMPRESSION_PROPERTIES
public KafkaMessageBus(org.springframework.integration.kafka.support.ZookeeperConnect zookeeperConnect, java.lang.String brokers, java.lang.String zkAddress, MultiTypeCodec<java.lang.Object> codec, java.lang.String... headersToMap)
public void setOffsetStoreTopic(java.lang.String offsetStoreTopic)
public void setRetryOperations(org.springframework.retry.RetryOperations retryOperations)
retryOperations
- public void afterPropertiesSet() throws java.lang.Exception
afterPropertiesSet
in interface org.springframework.beans.factory.InitializingBean
afterPropertiesSet
in class MessageBusSupport
java.lang.Exception
public static java.lang.String escapeTopicName(java.lang.String original)
public void setDefaultReplicationFactor(int defaultReplicationFactor)
public void setDefaultCompressionCodec(java.lang.String defaultCompressionCodec)
public void setDefaultRequiredAcks(int defaultRequiredAcks)
public void bindConsumer(java.lang.String name, org.springframework.messaging.MessageChannel moduleInputChannel, java.util.Properties properties)
MessageBus
name
- the logical identity of the message sourcemoduleInputChannel
- the channel bound as a consumerproperties
- arbitrary String key/value pairs that will be used in the bindingpublic void bindPubSubConsumer(java.lang.String name, org.springframework.messaging.MessageChannel inputChannel, java.util.Properties properties)
MessageBus
name
- the logical identity of the message sourceinputChannel
- the channel bound as a pub/sub consumerproperties
- arbitrary String key/value pairs that will be used in the bindingpublic void bindProducer(java.lang.String name, org.springframework.messaging.MessageChannel moduleOutputChannel, java.util.Properties properties)
MessageBus
name
- the logical identity of the message targetmoduleOutputChannel
- the channel bound as a producerproperties
- arbitrary String key/value pairs that will be used in the bindingpublic void bindPubSubProducer(java.lang.String name, org.springframework.messaging.MessageChannel outputChannel, java.util.Properties properties)
MessageBus
name
- the logical identity of the message targetoutputChannel
- the channel bound as a producerproperties
- arbitrary String key/value pairs that will be used in the bindingpublic void bindRequestor(java.lang.String name, org.springframework.messaging.MessageChannel requests, org.springframework.messaging.MessageChannel replies, java.util.Properties properties)
MessageBus
name
- The name of the requestor.requests
- The request channel - sends requests.replies
- The reply channel - receives replies.properties
- arbitrary String key/value pairs that will be used in the binding.public void bindReplier(java.lang.String name, org.springframework.messaging.MessageChannel requests, org.springframework.messaging.MessageChannel replies, java.util.Properties properties)
MessageBus
name
- The name of the requestor for which this replier will handle requests.requests
- The request channel - receives requests.replies
- The reply channel - sends replies.properties
- arbitrary String key/value pairs that will be used in the binding.public org.springframework.integration.kafka.listener.KafkaMessageListenerContainer createMessageListenerContainer(java.lang.String group, int numThreads, java.lang.String topic, long referencePoint)
public org.springframework.integration.kafka.listener.KafkaMessageListenerContainer createMessageListenerContainer(java.lang.String group, int numThreads, java.util.Collection<org.springframework.integration.kafka.core.Partition> listenedPartitions, long referencePoint)
public java.lang.String[] getMessageBusSpecificProperties()
getMessageBusSpecificProperties
in class MessageBusSupport