Zookeeper support was added to the framework in version 4.2, comprised of:
The ZookeeperMetadataStore
can be used where any MetadataStore
is needed, such as peristent file list filters,
etc.
See Section 10.5, “Metadata Store” for more information.
<bean id="client" class="org.springframework.integration.zookeeper.config.CuratorFrameworkFactoryBean"> <constructor-arg value="${connect.string}" /> </bean> <bean id="meta" class="org.springframework.integration.zookeeper.metadata.ZookeeperMetadataStore"> <constructor-arg ref="client" /> </bean>
@Bean public MetadataStore zkStore(CuratorFramework client) { return new ZookeeperMetadataStore(client); }
The ZookeeperLockRegistry
can be used where any LockRegistry
is needed, such as when using an Aggregator
in a
clustered environment, with a shared MessageStore
.
A LocRegistry
is used to "look up" a lock based on a key (the aggregator uses the correlationId
).
By default, locks in the ZookeeperLockRegistry
are maintained in zookeeper under the path
/SpringIntegration-LockRegistry/
.
You can customize the path by providing an implementation of ZookeeperLockRegistry.KeyToPathStrategy
.
public interface KeyToPathStrategy { String pathFor(String key); boolean bounded(); }
If the strategy returns true
from isBounded
, unused locks do not need to be harvested.
For unbounded strategies (such as the default) you will need to invoke expireUnusedOlderThan(long age)
from time
to time, to remove old unused locks from memory.
To configure an application for leader election using Zookeeper in XML:
<int-zk:leader-listener client="client" path="/siNamespace" role="cluster" />
client
is a reference to a CuratorFramework
bean; a CuratorFrameworkFactoryBean
is available.
When a leader is elected, an OnGrantedEvent
will be published for the role cluster
; any endpoints in that role
will be started.
When leadership is revoked, an OnRevokedEvent
will be published for the role cluster
; any endpoints in that role
will be stopped.
See Section 8.2, “Endpoint Roles” for more information.
In Java configuration you can create an instance of the leader initiator like this:
@Bean public LeaderInitiatorFactoryBean leaderInitiator(CuratorFramework client) { return new LeaderInitiatorFactoryBean() .setClient(client) .setPath("/siTest/") .setRole("cluster"); }