Zookeeper Support

Version 4.2 added Zookeeper support to the framework in version 4.2, which consists of:

You need to include this dependency into your project:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-zookeeper</artifactId>
    <version>5.5.9</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-zookeeper:5.5.9"

Zookeeper Metadata Store

You ca use the ZookeeperMetadataStore where any MetadataStore is needed, such as for persistent file list filters. See Metadata Store for more information. The following example configures a Zookeeper metadata store with XML:

<bean id="client" class="org.springframework.integration.zookeeper.config.CuratorFrameworkFactoryBean">
    <constructor-arg value="${connect.string}" />
</bean>

<bean id="meta" class="org.springframework.integration.zookeeper.metadata.ZookeeperMetadataStore">
    <constructor-arg ref="client" />
</bean>

The following example shows how to configure a Zookeeper metadata store with Java:

@Bean
public MetadataStore zkStore(CuratorFramework client) {
    return new ZookeeperMetadataStore(client);
}

Zookeeper Lock Registry

The ZookeeperLockRegistry can be used where any LockRegistry is needed, such as when using an aggregator in a clustered environment with a shared MessageStore.

A LockRegistry is used to “look up” a lock based on a key (the aggregator uses correlationId). By default, locks in the ZookeeperLockRegistry are maintained in zookeeper under the following path: /SpringIntegration-LockRegistry/. You can customize the path by providing an implementation of ZookeeperLockRegistry.KeyToPathStrategy, as the following example shows:

public interface KeyToPathStrategy {

    String pathFor(String key);

    boolean bounded();

}

If the strategy returns true from isBounded, unused locks do not need to be harvested. For unbounded strategies (such as the default), you need to periodically invoke expireUnusedOlderThan(long age) to remove old unused locks from memory.

String with version 5.5.6, the ZookeeperLockRegistry is support automatically clean up cache for ZkLock in ZookeeperLockRegistry.locks via ZookeeperLockRegistry.setCacheCapacity(). See its JavaDocs for more information.

Zookeeper Leadership Event Handling

The following example uses XML to configure an application for leader election in Zookeeper:

<int-zk:leader-listener client="client" path="/siNamespace" role="cluster" />

client is a reference to a CuratorFramework bean. A CuratorFrameworkFactoryBean is available. When a leader is elected, an OnGrantedEvent is published for the role cluster. Any endpoints in that role are started. When leadership is revoked, an OnRevokedEvent is published for the role cluster. Any endpoints in that role are stopped. See Endpoint Roles for more information.

You can use Java configuration to create an instance of the leader initiator, as the following example shows:

@Bean
public LeaderInitiatorFactoryBean leaderInitiator(CuratorFramework client) {
    return new LeaderInitiatorFactoryBean()
                .setClient(client)
                .setPath("/siTest/")
                .setRole("cluster");
}

Starting with version 5.3, a candidate option is exposed on the LeaderInitiatorFactoryBean for more configuration control of the externally provided Candidate instance. Only one of the candidate or role options has to be provided, but not both; the role options creates internally a DefaultCandidate instance with an UUID for id option.