ZeroMQ Support

Spring Integration provides components to support ZeroMQ communication in the application. The implementation is based on the well-supported Java API of the JeroMQ library. All components encapsulate ZeroMQ socket lifecycles and manage threads for them internally making interactions with these components lock-free and thread-safe.

You need to include this dependency into your project:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-zeromq</artifactId>
    <version>5.4.0-M3</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-zeromq:5.4.0-M3"

ZeroMQ Proxy

The ZeroMqProxy is a Spring-friendly wrapper for the built-in ZMQ.proxy() function. It encapsulates socket lifecycles and thread management. The clients of this proxy still can use a standard ZeroMQ socket connection and interaction API. Alongside with the standard ZContext it requires one of the well-known ZeroMQ proxy modes: SUB/PUB, PULL/PUSH or ROUTER/DEALER. This way an appropriate pair of ZeroMQ socket types are used for the frontend and backend of the proxy. See ZeroMqProxy.Type for details.

The ZeroMqProxy implements SmartLifecycle to create, bind and configure the sockets and to start ZMQ.proxy() in a dedicated thread from an Executor (if any). The binding for frontend and backend sockets is done over the tcp:// protocol onto all of the available network interfaces with the provided ports. Otherwise they are bound to random ports which can be obtained later via the respective getFrontendPort() and getBackendPort() API methods.

The control socket is exposed as a SocketType.PAIR with an inter-thread transport on the "inproc://" + beanName + ".control" address; it can be obtained via getControlAddress(). It should be used with the same application from another SocketType.PAIR socket to send ZMQ.PROXY_TERMINATE, ZMQ.PROXY_PAUSE and/or ZMQ.PROXY_RESUME commands. The ZeroMqProxy performs a ZMQ.PROXY_TERMINATE command when stop() is called for its lifecycle to terminate the ZMQ.proxy() loop and close all the bound sockets gracefully.

The setExposeCaptureSocket(boolean) option causes this component to bind an additional inter-thread socket with SocketType.PUB to capture and publish all the communication between the frontend and backend sockets as it states with ZMQ.proxy() implementation. This socket is bound to the "inproc://" + beanName + ".capture" address and doesn’t expect any specific subscription for filtering.

The frontend and backend sockets can be customized with additional properties, such as read/write timeout or security. This customization is available through setFrontendSocketConfigurer(Consumer<ZMQ.Socket>) and setBackendSocketConfigurer(Consumer<ZMQ.Socket>) callbacks, respectively.

The ZeroMqProxy could be provided as simple bean like this:

@Bean
ZeroMqProxy zeroMqProxy() {
    ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
    proxy.setExposeCaptureSocket(true);
    proxy.setFrontendPort(6001);
    proxy.setBackendPort(6002);
    return proxy;
}

All the client nodes should connect to the host of this proxy via tcp:// and use the respective port of their interest.

ZeroMQ Message Channel

The ZeroMqChannel is a SubscribableChannel which uses a pair of ZeroMQ sockets to connect publishers and subscribers for messaging interaction. It can work in a PUB/SUB mode (defaults to PUSH/PULL); it can also be used as a local inter-thread channel (uses PAIR sockets) - the connectUrl is not provided in this case. In distributed mode it has to be connected to an externally managed ZeroMQ proxy, where it can exchange messages with other similar channels connected to the same proxy. The connect url option is a standard ZeroMQ connection string with the protocol and host and a pair of ports over colon for frontend and backend sockets of the ZeroMQ proxy. For convenience, the channel could be supplied with the ZeroMqProxy instance instead of connection string, if it is configured in the same application as the proxy.

Both sending and receiving sockets are managed in their own dedicated threads making this channel concurrency-friendly. This way we can publish and consume to/from a ZeroMqChannel from different threads without synchronization.

By default the ZeroMqChannel uses an EmbeddedJsonHeadersMessageMapper to (de)serialize the Message (including headers) from/to byte[] using a Jackson JSON processor. This logic can be configured via setMessageMapper(BytesMessageMapper).

Sending and receiving sockets can be customized for any options (read/write timeout, security etc.) via respective setSendSocketConfigurer(Consumer<ZMQ.Socket>) and setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>) callbacks.

The internal logic of the ZeroMqChannel is based on the reactive streams via Project Reactor Flux and Mono operators. This provides easier threading control and allows lock-free concurrent publication and consumption to/from the channel. Local PUB/SUB logic is implemented as a Flux.publish() operator to allow all of the local subscribers to this channel to receive the same published message, as distributed subscribers to the PUB socket.

The following is a simple example of a ZeroMqChannel configuration:

@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
    ZeroMqChannel channel = new ZeroMqChannel(context, true);
    channel.setConnectUrl("tcp://localhost:6001:6002");
    channel.setConsumeDelay(Duration.ofMillis(100));
    return channel;
}