Class ZeroMqChannel

All Implemented Interfaces:
Aware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, ApplicationContextAware, ExpressionCapable, IntegrationPattern, NamedComponent, IntegrationManagement, TrackableComponent, MessageChannel, SubscribableChannel, InterceptableChannel

public class ZeroMqChannel extends AbstractMessageChannel implements SubscribableChannel
The SubscribableChannel implementation over ZeroMQ sockets. It can work in two messaging models: - push-pull, where sent messages are distributed to subscribers in a round-robin manner according a respective ZeroMQ SocketType.PUSH and SocketType.PULL socket types logic; - pub-sub, where sent messages are distributed to all subscribers.

This message channel can work in local mode, when a pair of ZeroMQ sockets of SocketType.PAIR type are connected between publisher (send operation) and subscriber using inter-thread transport binding.

In distributed mode this channel has to be connected to an externally managed ZeroMQ proxy. The setConnectUrl(String) has to be as a standard ZeroMQ connect string, but with an extra port over the colon - representing a frontend and backend sockets pair on ZeroMQ proxy. For example: tcp://localhost:6001:6002. Another option is to provide a reference to the ZeroMqProxy instance managed in the same application: frontend and backend ports are evaluated from this proxy and the respective connection string is built from them.

This way sending and receiving operations on this channel are similar to interaction over a messaging broker.

An internal logic of this message channel implementation is based on the project Reactor using its Mono, Flux and Scheduler API for better thread model and flow control to avoid concurrency primitives for multi-publisher(subscriber) communication within the same application.

Since:
5.4
Author:
Artem Bilan
  • Field Details

    • DEFAULT_CONSUME_DELAY

      public static final Duration DEFAULT_CONSUME_DELAY
  • Constructor Details

    • ZeroMqChannel

      public ZeroMqChannel(org.zeromq.ZContext context)
      Create a channel instance based on the provided ZContext with push/pull communication model.
      Parameters:
      context - the ZContext to use.
    • ZeroMqChannel

      public ZeroMqChannel(org.zeromq.ZContext context, boolean pubSub)
      Create a channel instance based on the provided ZContext and provided communication model.
      Parameters:
      context - the ZContext to use.
      pubSub - the communication model: push/pull or pub/sub.
  • Method Details

    • setConnectUrl

      public void setConnectUrl(@Nullable String connectUrl)
      Configure a connection to the ZeroMQ proxy with the pair of ports over colon for proxy frontend and backend sockets. Mutually exclusive with the setZeroMqProxy(ZeroMqProxy).
      Parameters:
      connectUrl - the connection string in format PROTOCOL://HOST:FRONTEND_PORT:BACKEND_PORT, e.g. tcp://localhost:6001:6002
    • setZeroMqProxy

      public void setZeroMqProxy(@Nullable ZeroMqProxy zeroMqProxy)
      Specify a reference to a ZeroMqProxy instance in the same application to rely on its ports configuration and make a natural lifecycle dependency without guessing when the proxy is started. Mutually exclusive with the setConnectUrl(String).
      Parameters:
      zeroMqProxy - the ZeroMqProxy instance to use
    • setConsumeDelay

      public void setConsumeDelay(Duration consumeDelay)
      Specify a Duration to delay consumption when no data received.
      Parameters:
      consumeDelay - the Duration to delay consumption when empty; defaults to DEFAULT_CONSUME_DELAY.
    • setMessageMapper

      public void setMessageMapper(BytesMessageMapper messageMapper)
      Provide a BytesMessageMapper to convert to/from messages when send or receive happens on the sockets.
      Parameters:
      messageMapper - the BytesMessageMapper to use; defaults to EmbeddedJsonHeadersMessageMapper.
    • setSendSocketConfigurer

      public void setSendSocketConfigurer(Consumer<org.zeromq.ZMQ.Socket> sendSocketConfigurer)
      The Consumer callback to configure a publishing socket. The send socket is connected to the frontend socket of ZeroMQ proxy (if any).
      Parameters:
      sendSocketConfigurer - the Consumer to use.
    • setSubscribeSocketConfigurer

      public void setSubscribeSocketConfigurer(Consumer<org.zeromq.ZMQ.Socket> subscribeSocketConfigurer)
      The Consumer callback to configure a consuming socket. The subscribe socket is connected to the backend socket of ZeroMQ proxy (if any).
      Parameters:
      subscribeSocketConfigurer - the Consumer to use.
    • onInit

      protected void onInit()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this for initialization logic.
      Overrides:
      onInit in class AbstractMessageChannel
    • doSend

      protected boolean doSend(Message<?> message, long timeout)
      Description copied from class: AbstractMessageChannel
      Subclasses must implement this method. A non-negative timeout indicates how long to wait if the channel is at capacity (if the value is 0, it must return immediately with or without success). A negative timeout value indicates that the method should block until either the message is accepted or the blocking thread is interrupted.
      Specified by:
      doSend in class AbstractMessageChannel
      Parameters:
      message - The message.
      timeout - The timeout.
      Returns:
      true if the send was successful.
    • subscribe

      public boolean subscribe(MessageHandler handler)
      Specified by:
      subscribe in interface SubscribableChannel
    • unsubscribe

      public boolean unsubscribe(MessageHandler handler)
      Specified by:
      unsubscribe in interface SubscribableChannel
    • destroy

      public void destroy()
      Specified by:
      destroy in interface DisposableBean
      Specified by:
      destroy in interface IntegrationManagement
      Overrides:
      destroy in class AbstractMessageChannel