public class KafkaPartitionAllocator
extends java.lang.Object
implements org.springframework.beans.factory.InitializingBean, org.springframework.beans.factory.FactoryBean<org.springframework.integration.kafka.core.Partition[]>, org.springframework.context.ApplicationListener<org.springframework.context.event.ContextClosedEvent>
FactoryBean
it will return the partitions that the current module instance should listen to.
The list of partitions is stored in ZooKeeper, and is created when the module is started for the first time.
If multiple module instances are started at the same time, they synchronize via ZooKeeper, and only the first
will end up creating the module list, while the other instances will read it.
Restarted modules will read the list of partitions that corresponds to their own sequence. Zero-count Kafka source
modules are not supported.
As an ApplicationListener
, it checks whether the current Stream state cleans up the partition table in
ZooKeeper when the stream is undeploying by validating that against the stream data path in ZooKeeper.Modifier and Type | Field and Description |
---|---|
static java.lang.String |
STREAM_PATH_PATTERN |
static java.lang.String |
STREAM_STATUS_UNDEPLOYED |
static java.lang.String |
STREAM_STATUS_UNDEPLOYING |
Constructor and Description |
---|
KafkaPartitionAllocator(org.apache.curator.framework.CuratorFramework client,
org.springframework.integration.kafka.core.ConnectionFactory connectionFactory,
java.lang.String moduleName,
java.lang.String streamName,
java.lang.String topic,
java.lang.String partitionList,
int sequence,
int count,
org.springframework.integration.kafka.listener.OffsetManager offsetManager) |
Modifier and Type | Method and Description |
---|---|
void |
afterPropertiesSet() |
org.springframework.integration.kafka.core.Partition[] |
getObject() |
java.lang.Class<?> |
getObjectType() |
boolean |
isSingleton() |
void |
onApplicationEvent(org.springframework.context.event.ContextClosedEvent event) |
static java.lang.Iterable<java.lang.Integer> |
parseNumberList(java.lang.String numberList)
Expects a String containing a list of numbers or ranges, e.g.
|
public static final java.lang.String STREAM_STATUS_UNDEPLOYED
public static final java.lang.String STREAM_STATUS_UNDEPLOYING
public static final java.lang.String STREAM_PATH_PATTERN
public KafkaPartitionAllocator(org.apache.curator.framework.CuratorFramework client, org.springframework.integration.kafka.core.ConnectionFactory connectionFactory, java.lang.String moduleName, java.lang.String streamName, java.lang.String topic, java.lang.String partitionList, int sequence, int count, org.springframework.integration.kafka.listener.OffsetManager offsetManager)
public void afterPropertiesSet() throws java.lang.Exception
afterPropertiesSet
in interface org.springframework.beans.factory.InitializingBean
java.lang.Exception
public org.springframework.integration.kafka.core.Partition[] getObject() throws java.lang.Exception
getObject
in interface org.springframework.beans.factory.FactoryBean<org.springframework.integration.kafka.core.Partition[]>
java.lang.Exception
public java.lang.Class<?> getObjectType()
getObjectType
in interface org.springframework.beans.factory.FactoryBean<org.springframework.integration.kafka.core.Partition[]>
public boolean isSingleton()
isSingleton
in interface org.springframework.beans.factory.FactoryBean<org.springframework.integration.kafka.core.Partition[]>
public void onApplicationEvent(org.springframework.context.event.ContextClosedEvent event)
onApplicationEvent
in interface org.springframework.context.ApplicationListener<org.springframework.context.event.ContextClosedEvent>
public static java.lang.Iterable<java.lang.Integer> parseNumberList(java.lang.String numberList) throws java.lang.IllegalArgumentException
"1-10"
, "1,3,5"
,
"1,5,10-20,26,100-110,145"
. One-sized ranges or ranges where the start is after the end
are not permitted.
Returns an array of Integers containing the actual numbers.numberList
- a string containing numbers, or rangesjava.lang.IllegalArgumentException
- if the format of the list is incorrect