ZeroMQ Support
Spring Integration provides components to support ZeroMQ communication in the application. The implementation is based on the well-supported Java API of the JeroMQ library. All components encapsulate ZeroMQ socket lifecycles and manage threads for them internally making interactions with these components lock-free and thread-safe.
You need to include this dependency into your project:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zeromq</artifactId>
<version>5.5.17</version>
</dependency>
compile "org.springframework.integration:spring-integration-zeromq:5.5.17"
ZeroMQ Proxy
The ZeroMqProxy
is a Spring-friendly wrapper for the built-in ZMQ.proxy()
function.
It encapsulates socket lifecycles and thread management.
The clients of this proxy still can use a standard ZeroMQ socket connection and interaction API.
Alongside with the standard ZContext
it requires one of the well-known ZeroMQ proxy modes: SUB/PUB, PULL/PUSH or ROUTER/DEALER.
This way an appropriate pair of ZeroMQ socket types are used for the frontend and backend of the proxy.
See ZeroMqProxy.Type
for details.
The ZeroMqProxy
implements SmartLifecycle
to create, bind and configure the sockets and to start ZMQ.proxy()
in a dedicated thread from an Executor
(if any).
The binding for frontend and backend sockets is done over the tcp://
protocol onto all of the available network interfaces with the provided ports.
Otherwise they are bound to random ports which can be obtained later via the respective getFrontendPort()
and getBackendPort()
API methods.
The control socket is exposed as a SocketType.PAIR
with an inter-thread transport on the "inproc://" + beanName + ".control"
address; it can be obtained via getControlAddress()
.
It should be used with the same application from another SocketType.PAIR
socket to send ZMQ.PROXY_TERMINATE
, ZMQ.PROXY_PAUSE
and/or ZMQ.PROXY_RESUME
commands.
The ZeroMqProxy
performs a ZMQ.PROXY_TERMINATE
command when stop()
is called for its lifecycle to terminate the ZMQ.proxy()
loop and close all the bound sockets gracefully.
The setExposeCaptureSocket(boolean)
option causes this component to bind an additional inter-thread socket with SocketType.PUB
to capture and publish all the communication between the frontend and backend sockets as it states with ZMQ.proxy()
implementation.
This socket is bound to the "inproc://" + beanName + ".capture"
address and doesn’t expect any specific subscription for filtering.
The frontend and backend sockets can be customized with additional properties, such as read/write timeout or security.
This customization is available through setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)
and setBackendSocketConfigurer(Consumer<ZMQ.Socket>)
callbacks, respectively.
The ZeroMqProxy
could be provided as simple bean like this:
@Bean
ZeroMqProxy zeroMqProxy() {
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(6001);
proxy.setBackendPort(6002);
return proxy;
}
All the client nodes should connect to the host of this proxy via tcp://
and use the respective port of their interest.
ZeroMQ Message Channel
The ZeroMqChannel
is a SubscribableChannel
which uses a pair of ZeroMQ sockets to connect publishers and subscribers for messaging interaction.
It can work in a PUB/SUB mode (defaults to PUSH/PULL); it can also be used as a local inter-thread channel (uses PAIR
sockets) - the connectUrl
is not provided in this case.
In distributed mode it has to be connected to an externally managed ZeroMQ proxy, where it can exchange messages with other similar channels connected to the same proxy.
The connect url option is a standard ZeroMQ connection string with the protocol and host and a pair of ports over colon for frontend and backend sockets of the ZeroMQ proxy.
For convenience, the channel could be supplied with the ZeroMqProxy
instance instead of connection string, if it is configured in the same application as the proxy.
Both sending and receiving sockets are managed in their own dedicated threads making this channel concurrency-friendly.
This way we can publish and consume to/from a ZeroMqChannel
from different threads without synchronization.
By default the ZeroMqChannel
uses an EmbeddedJsonHeadersMessageMapper
to (de)serialize the Message
(including headers) from/to byte[]
using a Jackson JSON processor.
This logic can be configured via setMessageMapper(BytesMessageMapper)
.
Sending and receiving sockets can be customized for any options (read/write timeout, security etc.) via respective setSendSocketConfigurer(Consumer<ZMQ.Socket>)
and setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>)
callbacks.
The internal logic of the ZeroMqChannel
is based on the reactive streams via Project Reactor Flux
and Mono
operators.
This provides easier threading control and allows lock-free concurrent publication and consumption to/from the channel.
Local PUB/SUB logic is implemented as a Flux.publish()
operator to allow all of the local subscribers to this channel to receive the same published message, as distributed subscribers to the PUB
socket.
The following is a simple example of a ZeroMqChannel
configuration:
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://localhost:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
return channel;
}
ZeroMQ Inbound Channel Adapter
The ZeroMqMessageProducer
is a MessageProducerSupport
implementation with reactive semantics.
It constantly reads the data from a ZeroMQ socket in a non-blocking manner and publishes the messages to an infinite Flux
which is subscribed to by a FluxMessageChannel
or explicitly in the start()
method, if the output channel is not reactive.
When no data are received on the socket, a consumeDelay
(defaults to 1 second) is applied before the next read attempt.
Only SocketType.PAIR
, SocketType.PULL
and SocketType.SUB
are supported by the ZeroMqMessageProducer
.
This component can connect to the remote socket or bind onto TCP protocol with the provided or random port.
The actual port can be obtained via getBoundPort()
after this component is started and ZeroMQ socket is bound.
The socket options (e.g. security or write timeout) can be configured via setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
callback.
If the receiveRaw
option is set to true
, a ZMsg
, consumed from the socket, is sent as is in the payload of the produced Message
: it’s up to the downstream flow to parse and convert the ZMsg
.
Otherwise an InboundMessageMapper
is used to convert the consumed data into a Message
.
If the received ZMsg
is multi-frame, the first frame is treated as the ZeroMqHeaders.TOPIC
header this ZeroMQ message was published to.
With SocketType.SUB
, the ZeroMqMessageProducer
uses the provided topics
option for subscriptions; defaults to subscribe to all.
Subscriptions can be adjusted at runtime using subscribeToTopics()
and unsubscribeFromTopics()
@ManagedOperation
s.
Here is a sample of ZeroMqMessageProducer
configuration:
@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("some");
messageProducer.setReceiveRaw(true);
messageProducer.setBindPort(7070);
messageProducer.setConsumeDelay(Duration.ofMillis(100));
return messageProducer;
}
ZeroMQ Outbound Channel Adapter
The ZeroMqMessageHandler
is a ReactiveMessageHandler
implementation to produce publish messages into a ZeroMQ socket.
Only SocketType.PAIR
, SocketType.PUSH
and SocketType.PUB
are supported.
The ZeroMqMessageHandler
only supports connecting the ZeroMQ socket; binding is not supported.
When the SocketType.PUB
is used, the topicExpression
is evaluated against a request message to inject a topic frame into a ZeroMQ message if it is not null.
The subscriber side (SocketType.SUB
) must receive the topic frame first before parsing the actual data.
When the payload of the request message is a ZMsg
, no conversion or topic extraction is performed: the ZMsg
is sent into a socket as is and it is not destroyed for possible further reuse.
Otherwise an OutboundMessageMapper<byte[]>
is used to convert a request message (or just its payload) into a ZeroMQ frame to publish.
By default a ConvertingBytesMessageMapper
is used supplied with a ConfigurableCompositeMessageConverter
.
The socket options (e.g. security or write timeout) can be configured via setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
callback.
Here is a sample of ZeroMqMessageHandler
configuration:
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
ZeroMQ Java DSL Support
The spring-integration-zeromq
provide a convenient Java DSL fluent API via ZeroMq
factory and IntegrationComponentSpec
implementations for the components mentioned above.
This is a sample of Java DSL for ZeroMqChannel
:
.channel(ZeroMq.zeroMqChannel(this.context)
.connectUrl("tcp://localhost:6001:6002")
.consumeDelay(Duration.ofMillis(100)))
}
The Inbound Channel Adapter for ZeroMQ Java DSL is:
IntegrationFlows.from(
ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
.connectUrl("tcp://localhost:9000")
.topics("someTopic")
.receiveRaw(true)
.consumeDelay(Duration.ofMillis(100)))
}
The Outbound Channel Adapter for ZeroMQ Java DSL is:
.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
.topicFunction(message -> message.getHeaders().get("myTopic")))
}