Class ZeroMqMessageProducer

All Implemented Interfaces:
Aware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, SmartInitializingSingleton, ApplicationContextAware, Lifecycle, Phased, SmartLifecycle, ComponentSourceAware, ExpressionCapable, MessageProducer, IntegrationPattern, NamedComponent, IntegrationInboundManagement, IntegrationManagement, ManageableLifecycle, ManageableSmartLifecycle, TrackableComponent

@ManagedResource @IntegrationManagedResource public class ZeroMqMessageProducer extends MessageProducerSupport
A MessageProducerSupport implementation for consuming messages from ZeroMq socket. Only SocketType.PAIR, SocketType.SUB and SocketType.PULL are supported. This component can bind or connect the socket.

When the SocketType.SUB is used, the received topic is stored in the ZeroMqHeaders.TOPIC.

Since:
5.4
Author:
Artem Bilan, Alessio Matricardi
  • Field Details

    • DEFAULT_CONSUME_DELAY

      public static final Duration DEFAULT_CONSUME_DELAY
  • Constructor Details

    • ZeroMqMessageProducer

      public ZeroMqMessageProducer(org.zeromq.ZContext context)
    • ZeroMqMessageProducer

      public ZeroMqMessageProducer(org.zeromq.ZContext context, org.zeromq.SocketType socketType)
  • Method Details

    • setConsumeDelay

      public void setConsumeDelay(Duration consumeDelay)
      Specify a Duration to delay consumption when no data received.
      Parameters:
      consumeDelay - the Duration to delay consumption when empty; defaults to DEFAULT_CONSUME_DELAY.
    • setMessageMapper

      public void setMessageMapper(InboundMessageMapper<byte[]> messageMapper)
      Provide an InboundMessageMapper to convert a consumed data into a message to produce. Ignored when setReceiveRaw(boolean) is true.
      Parameters:
      messageMapper - the InboundMessageMapper to use.
    • setMessageConverter

      public void setMessageConverter(MessageConverter messageConverter)
      Provide a MessageConverter (as an alternative to messageMapper) for converting a consumed data into a message to produce. Ignored when setReceiveRaw(boolean) is true.
      Parameters:
      messageConverter - the MessageConverter to use.
    • setReceiveRaw

      public void setReceiveRaw(boolean receiveRaw)
      Whether raw ZMsg is present as a payload of message to produce or it is fully converted to a Message including ZeroMqHeaders.TOPIC header (if any).
      Parameters:
      receiveRaw - to convert from ZMsg or not; defaults to convert.
    • setSocketConfigurer

      public void setSocketConfigurer(Consumer<org.zeromq.ZMQ.Socket> socketConfigurer)
      Provide a Consumer to configure a socket with arbitrary options, like security.
      Parameters:
      socketConfigurer - the configurer for socket options.
    • setTopics

      public void setTopics(String... topics)
      Specify topics the SocketType.SUB socket is going to use for subscription. It is ignored for all other SocketTypes supported.
      Parameters:
      topics - the topics to use.
    • setConnectUrl

      public void setConnectUrl(@Nullable String connectUrl)
      Configure an URL for ZMQ.Socket.connect(String). Mutually exclusive with the setBindPort(int).
      Parameters:
      connectUrl - the URL to connect ZeroMq socket to.
    • setBindPort

      public void setBindPort(int port)
      Configure a port for TCP protocol binding via ZMQ.Socket.bind(String). Mutually exclusive with the setConnectUrl(String).
      Parameters:
      port - the port to bind ZeroMq socket to over TCP.
    • getBoundPort

      public int getBoundPort()
      Return the port a socket is bound or 0 if this message producer has not been started yet or the socket is connected - not bound.
      Returns:
      the port for a socket or 0.
    • unwrapTopic

      public void unwrapTopic(boolean unwrapTopic)
      Specify if the topic that SocketType.SUB socket is going to receive is wrapped with an additional empty frame. It is ignored for all other SocketTypes supported. This attribute is set to true by default.
      Parameters:
      unwrapTopic - true if the received topic is wrapped with an additional empty frame.
      Since:
      6.2.6
    • getComponentType

      public String getComponentType()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this method to provide component type information.
      Specified by:
      getComponentType in interface NamedComponent
      Overrides:
      getComponentType in class IntegrationObjectSupport
    • onInit

      protected void onInit()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this for initialization logic.
      Overrides:
      onInit in class MessageProducerSupport
    • subscribeToTopics

      @ManagedOperation public void subscribeToTopics(String... topics)
    • unsubscribeFromTopics

      @ManagedOperation public void unsubscribeFromTopics(String... topics)
    • doStart

      protected void doStart()
      Description copied from class: MessageProducerSupport
      Take no action by default. Subclasses may override this if they need lifecycle-managed behavior. Protected by 'lifecycleLock'.
      Overrides:
      doStart in class MessageProducerSupport
    • doStop

      protected void doStop()
      Description copied from class: MessageProducerSupport
      Take no action by default. Subclasses may override this if they need lifecycle-managed behavior.
      Overrides:
      doStop in class MessageProducerSupport
    • destroy

      public void destroy()
      Specified by:
      destroy in interface DisposableBean
      Specified by:
      destroy in interface IntegrationManagement
      Overrides:
      destroy in class AbstractEndpoint