Class ZeroMqMessageHandler

All Implemented Interfaces:
Aware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, ApplicationContextAware, Lifecycle, Ordered, ComponentSourceAware, ExpressionCapable, Orderable, IntegrationPattern, NamedComponent, IntegrationManagement, ManageableLifecycle, TrackableComponent, ReactiveMessageHandler

public class ZeroMqMessageHandler extends AbstractReactiveMessageHandler implements ManageableLifecycle
The AbstractReactiveMessageHandler implementation for publishing messages over ZeroMq socket. Only SocketType.PAIR, SocketType.PUB and SocketType.PUSH are supported. This component can bind or connect the socket.

When the SocketType.PUB is used, the topicExpression is evaluated against a request message to inject a topic frame into a ZeroMq message if it is not null. The subscriber side must receive the topic frame first before parsing the actual data.

When the payload of the request message is a ZMsg, no any conversion and topic extraction happen: the ZMsg is sent into a socket as is, and it is not destroyed for possible further reusing.

Since:
5.4
Author:
Artem Bilan, Alessio Matricardi
  • Constructor Details Link icon

    • ZeroMqMessageHandler Link icon

      public ZeroMqMessageHandler(org.zeromq.ZContext context)
      Create an instance based on the provided ZContext.
      Parameters:
      context - the ZContext to use for creating sockets.
      Since:
      6.4
    • ZeroMqMessageHandler Link icon

      public ZeroMqMessageHandler(org.zeromq.ZContext context, org.zeromq.SocketType socketType)
      Create an instance based on the provided ZContext and SocketType.
      Parameters:
      context - the ZContext to use for creating sockets.
      socketType - the SocketType to use; only SocketType.PAIR, SocketType.PUB and SocketType.PUSH are supported.
    • ZeroMqMessageHandler Link icon

      public ZeroMqMessageHandler(org.zeromq.ZContext context, String connectUrl)
      Create an instance based on the provided ZContext and connection string.
      Parameters:
      context - the ZContext to use for creating sockets.
      connectUrl - the URL to connect the socket to.
    • ZeroMqMessageHandler Link icon

      public ZeroMqMessageHandler(org.zeromq.ZContext context, int port)
      Create an instance based on the provided ZContext and binding port.
      Parameters:
      context - the ZContext to use for creating sockets.
      port - the port to bind ZeroMq socket to over TCP.
      Since:
      6.4
    • ZeroMqMessageHandler Link icon

      public ZeroMqMessageHandler(org.zeromq.ZContext context, Supplier<String> connectUrl)
      Create an instance based on the provided ZContext and connection string supplier.
      Parameters:
      context - the ZContext to use for creating sockets.
      connectUrl - the supplier for URL to connect the socket to.
      Since:
      5.5.9
    • ZeroMqMessageHandler Link icon

      public ZeroMqMessageHandler(org.zeromq.ZContext context, String connectUrl, org.zeromq.SocketType socketType)
      Create an instance based on the provided ZContext, connection string and SocketType.
      Parameters:
      context - the ZContext to use for creating sockets.
      connectUrl - the URL to connect the socket to.
      socketType - the SocketType to use; only SocketType.PAIR, SocketType.PUB and SocketType.PUSH are supported.
    • ZeroMqMessageHandler Link icon

      public ZeroMqMessageHandler(org.zeromq.ZContext context, int port, org.zeromq.SocketType socketType)
      Create an instance based on the provided ZContext, binding port and SocketType.
      Parameters:
      context - the ZContext to use for creating sockets.
      port - the port to bind ZeroMq socket to over TCP.
      socketType - the SocketType to use; only SocketType.PAIR, SocketType.PUB and SocketType.PUSH are supported.
      Since:
      6.4
    • ZeroMqMessageHandler Link icon

      public ZeroMqMessageHandler(org.zeromq.ZContext context, Supplier<String> connectUrl, org.zeromq.SocketType socketType)
      Create an instance based on the provided ZContext, connection string supplier and SocketType.
      Parameters:
      context - the ZContext to use for creating sockets.
      connectUrl - the supplier for URL to connect the socket to.
      socketType - the SocketType to use; only SocketType.PAIR, SocketType.PUB and SocketType.PUSH are supported.
      Since:
      5.5.9
  • Method Details Link icon

    • setMessageMapper Link icon

      public void setMessageMapper(OutboundMessageMapper<byte[]> messageMapper)
      Provide an OutboundMessageMapper to convert a request message into byte[] for sending into ZeroMq socket. Ignored when Message.getPayload() is an instance of ZMsg.
      Parameters:
      messageMapper - the OutboundMessageMapper to use.
    • setMessageConverter Link icon

      public void setMessageConverter(MessageConverter messageConverter)
      Provide a MessageConverter (as an alternative to messageMapper) for converting a request message into byte[] for sending into ZeroMq socket. Ignored when Message.getPayload() is an instance of ZMsg.
      Parameters:
      messageConverter - the MessageConverter to use.
    • setSocketConfigurer Link icon

      public void setSocketConfigurer(Consumer<org.zeromq.ZMQ.Socket> socketConfigurer)
      Provide a Consumer to configure a socket with arbitrary options, like security.
      Parameters:
      socketConfigurer - the configurer for socket options.
    • setTopic Link icon

      public void setTopic(String topic)
      Specify a topic the SocketType.PUB socket is going to use for distributing messages into the subscriptions. It is ignored for all other SocketTypes supported.
      Parameters:
      topic - the topic to use.
    • setTopicExpression Link icon

      public void setTopicExpression(Expression topicExpression)
      Specify a SpEL expression to evaluate a topic a SocketType.PUB is going to use for distributing messages into the subscriptions.It is ignored for all other SocketTypes supported.
      Parameters:
      topicExpression - the expression to evaluate topic for publishing.
    • wrapTopic Link icon

      public void wrapTopic(boolean wrapTopic)
      Specify if the topic that SocketType.PUB socket is going to use for distributing messages into the subscriptions must be wrapped with an additional empty frame. It is ignored for all other SocketTypes supported. This attribute is set to true by default.
      Parameters:
      wrapTopic - true if the topic must be wrapped with an additional empty frame.
      Since:
      6.2.6
    • getBoundPort Link icon

      public int getBoundPort()
      Return the port a socket is bound or 0 if this message producer has not been started yet or the socket is connected - not bound.
      Returns:
      the port for a socket or 0.
      Since:
      6.4
    • getComponentType Link icon

      public String getComponentType()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this method to provide component type information.
      Specified by:
      getComponentType in interface NamedComponent
      Overrides:
      getComponentType in class MessageHandlerSupport
    • onInit Link icon

      protected void onInit()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this for initialization logic.
      Overrides:
      onInit in class IntegrationObjectSupport
    • start Link icon

      public void start()
      Specified by:
      start in interface Lifecycle
      Specified by:
      start in interface ManageableLifecycle
    • stop Link icon

      public void stop()
      Specified by:
      stop in interface Lifecycle
      Specified by:
      stop in interface ManageableLifecycle
    • isRunning Link icon

      public boolean isRunning()
      Specified by:
      isRunning in interface Lifecycle
      Specified by:
      isRunning in interface ManageableLifecycle
    • handleMessageInternal Link icon

      protected reactor.core.publisher.Mono<Void> handleMessageInternal(Message<?> message)
      Specified by:
      handleMessageInternal in class AbstractReactiveMessageHandler
    • destroy Link icon

      public void destroy()
      Specified by:
      destroy in interface DisposableBean
      Specified by:
      destroy in interface IntegrationManagement
      Overrides:
      destroy in class MessageHandlerSupport