Class AbstractMqttMessageHandler<T,C>

Type Parameters:
T - MQTT Client type
C - MQTT connection options type (v5 or v3)
All Implemented Interfaces:
org.reactivestreams.Subscriber<Message<?>>, Aware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, ApplicationContextAware, ApplicationEventPublisherAware, Lifecycle, Ordered, ComponentSourceAware, ExpressionCapable, Orderable, IntegrationPattern, NamedComponent, IntegrationManagement, ManageableLifecycle, TrackableComponent, MessageHandler, reactor.core.CoreSubscriber<Message<?>>
Direct Known Subclasses:
MqttPahoMessageHandler, Mqttv5PahoMessageHandler

public abstract class AbstractMqttMessageHandler<T,C> extends AbstractMessageHandler implements ManageableLifecycle, ApplicationEventPublisherAware
Abstract class for MQTT outbound channel adapters.
Since:
4.0
Author:
Gary Russell, Artem Bilan, Artem Vozhdayenko
  • Field Details

    • DISCONNECT_COMPLETION_TIMEOUT

      public static final long DISCONNECT_COMPLETION_TIMEOUT
      The default disconnect completion timeout in milliseconds.
      See Also:
    • DEFAULT_COMPLETION_TIMEOUT

      public static final long DEFAULT_COMPLETION_TIMEOUT
      The default completion timeout in milliseconds.
      See Also:
    • lock

      protected final Lock lock
  • Constructor Details

    • AbstractMqttMessageHandler

      public AbstractMqttMessageHandler(@Nullable String url, String clientId)
    • AbstractMqttMessageHandler

      public AbstractMqttMessageHandler(ClientManager<T,C> clientManager)
  • Method Details

    • setApplicationEventPublisher

      public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
      Specified by:
      setApplicationEventPublisher in interface ApplicationEventPublisherAware
    • getApplicationEventPublisher

      protected ApplicationEventPublisher getApplicationEventPublisher()
    • setDefaultTopic

      public void setDefaultTopic(String defaultTopic)
      Set the topic to which the message will be published if the topicExpression evaluates to `null`.
      Parameters:
      defaultTopic - the default topic.
    • getDefaultTopic

      protected String getDefaultTopic()
    • setTopicExpression

      public void setTopicExpression(Expression topicExpression)
      Set the topic expression; default "headers['mqtt_topic']".
      Parameters:
      topicExpression - the expression.
      Since:
      5.0
    • setTopicExpressionString

      public void setTopicExpressionString(String topicExpression)
      Set the topic expression; default "headers['mqtt_topic']".
      Parameters:
      topicExpression - the expression.
      Since:
      5.0
    • getTopicProcessor

      protected MessageProcessor<String> getTopicProcessor()
    • setDefaultQos

      public void setDefaultQos(int defaultQos)
      Set the qos for messages if the qosExpression evaluates to null. Only applies if a message converter is not provided.
      Parameters:
      defaultQos - the default qos.
      See Also:
    • getDefaultQos

      protected int getDefaultQos()
    • setQosExpression

      public void setQosExpression(Expression qosExpression)
      Set the qos expression; default "headers['mqtt_qos']". Only applies if a message converter is not provided.
      Parameters:
      qosExpression - the expression.
      Since:
      5.0
      See Also:
    • setQosExpressionString

      public void setQosExpressionString(String qosExpression)
      Set the qos expression; default "headers['mqtt_qos']". Only applies if a message converter is not provided.
      Parameters:
      qosExpression - the expression.
      Since:
      5.0
      See Also:
    • getQosProcessor

      protected MessageProcessor<Integer> getQosProcessor()
    • setDefaultRetained

      public void setDefaultRetained(boolean defaultRetained)
      Set the retained boolean for messages if the retainedExpression evaluates to null. Only applies if a message converter is not provided.
      Parameters:
      defaultRetained - the default defaultRetained.
      See Also:
    • getDefaultRetained

      protected boolean getDefaultRetained()
    • setRetainedExpression

      public void setRetainedExpression(Expression retainedExpression)
      Set the retained expression; default "headers['mqtt_retained']". Only applies if a message converter is not provided.
      Parameters:
      retainedExpression - the expression.
      Since:
      5.0
      See Also:
    • setRetainedExpressionString

      public void setRetainedExpressionString(String retainedExpression)
      Set the retained expression; default "headers['mqtt_retained']". Only applies if a message converter is not provided.
      Parameters:
      retainedExpression - the expression.
      Since:
      5.0
      See Also:
    • getRetainedProcessor

      protected MessageProcessor<Boolean> getRetainedProcessor()
    • setConverter

      public void setConverter(MessageConverter converter)
      Set the message converter to use; if this is provided, the adapter qos and retained settings are ignored.
      Parameters:
      converter - the converter.
    • getConverter

      protected MessageConverter getConverter()
    • getUrl

      @Nullable protected String getUrl()
    • getClientId

      @Nullable public String getClientId()
    • getClientInstance

      public int getClientInstance()
      Incremented each time the client is connected.
      Returns:
      The instance;
      Since:
      4.1
    • getComponentType

      public String getComponentType()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this method to provide component type information.
      Specified by:
      getComponentType in interface NamedComponent
      Overrides:
      getComponentType in class MessageHandlerSupport
    • incrementClientInstance

      protected void incrementClientInstance()
    • setCompletionTimeout

      public void setCompletionTimeout(long completionTimeout)
      Set the completion timeout for async operations. Not settable using the namespace. Default 30000L milliseconds.
      Parameters:
      completionTimeout - The timeout.
      Since:
      4.1
    • getCompletionTimeout

      protected long getCompletionTimeout()
    • setDisconnectCompletionTimeout

      public void setDisconnectCompletionTimeout(long completionTimeout)
      Set the completion timeout when disconnecting. Not settable using the namespace. Default 5000L milliseconds.
      Parameters:
      completionTimeout - The timeout.
      Since:
      5.1.10
    • getDisconnectCompletionTimeout

      protected long getDisconnectCompletionTimeout()
    • getClientManager

      @Nullable protected ClientManager<T,C> getClientManager()
    • setAsync

      public void setAsync(boolean async)
      Set to true if you don't want to block when sending messages. Default false. When true, message sent/delivered events will be published for reception by a suitably configured 'ApplicationListener' or an event inbound-channel-adapter.
      Parameters:
      async - true for async.
      See Also:
    • isAsync

      protected boolean isAsync()
    • setAsyncEvents

      public void setAsyncEvents(boolean asyncEvents)
      When setAsync(boolean) is true, setting this to true enables publication of MqttMessageSentEvent and MqttMessageDeliveredEvent to be emitted. Default false.
      Parameters:
      asyncEvents - the asyncEvents.
    • onInit

      protected void onInit()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this for initialization logic.
      Overrides:
      onInit in class IntegrationObjectSupport
    • start

      public final void start()
      Specified by:
      start in interface Lifecycle
      Specified by:
      start in interface ManageableLifecycle
    • doStart

      protected abstract void doStart()
    • stop

      public final void stop()
      Specified by:
      stop in interface Lifecycle
      Specified by:
      stop in interface ManageableLifecycle
    • doStop

      protected abstract void doStop()
    • isRunning

      public boolean isRunning()
      Specified by:
      isRunning in interface Lifecycle
      Specified by:
      isRunning in interface ManageableLifecycle
    • handleMessageInternal

      protected void handleMessageInternal(Message<?> message)
      Specified by:
      handleMessageInternal in class AbstractMessageHandler
    • messageSentEvent

      protected void messageSentEvent(Message<?> message, String topic, int messageId)
    • sendDeliveryCompleteEvent

      protected void sendDeliveryCompleteEvent(int messageId)
    • sendFailedDeliveryEvent

      protected void sendFailedDeliveryEvent(int messageId, Throwable exception)
    • publish

      protected abstract void publish(String topic, Object mqttMessage, Message<?> message)