23. MQTT Support

23.1 Introduction

Spring Integration provides inbound and outbound channel adapters supporting the MQ Telemetry Transport (MQTT) protocol. The current implementation uses the Eclipse Paho MQTT Client library.

Configuration of both adapters is achieved using the DefaultMqttPahoClientFactory. Refer to the Paho documentation for more information about configuration options.

23.2 Inbound (message-driven) Channel Adapter

The inbound channel adapter is implemented by the MqttPahoMessageDrivenChannelAdapter. For convenience, it can be configured using the namespace. A minimal configuration might be:

<bean id="clientFactory"
        class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
    <property name="userName" value="${mqtt.username}"/>
    <property name="password" value="${mqtt.password}"/>
</bean>

<int-mqtt:message-driven-channel-adapter id="mqttInbound"
    client-id="${mqtt.default.client.id}.src"
    url="${mqtt.url}"
    topics="sometopic"
    client-factory="clientFactory"
    channel="output"/>

Attributes:

<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
    client-id="foo"  1
    url="tcp://localhost:1883"  2
    topics="bar,baz"  3
    qos="1,2"  4
    converter="myConverter"  5
    client-factory="clientFactory"  6
    send-timeout="123"  7
    error-channel="errors"  8
    recovery-interval="10000"  9
    channel="out" />

1

The client id.

2

The broker URL.

3

A comma delimited list of topics from which this adapter will receive messages.

4

A comma delimited list of QoS values. Can be a single value that is applied to all topics, or a value for each topic (in which case the lists must the same length).

5

An MqttMessageConverter (optional). The default DefaultPahoMessageConverter produces a message with a String payload (by default) with the following headers: mqtt_topic - the topic from which the message was received mqtt_duplicate - true if the message is a duplicate mqtt_qos - the quality of service The DefaultPahoMessageConverter can be configured to return the raw byte[] in the payload by declaring it as a <bean/> and setting the payloadAsBytes property.

6

The client factory.

7

The send timeout - only applies if the channel might block (such as a bounded QueueChannel that is currently full).

8

The error channel - downstream exceptions will be sent to this channel, if supplied, in an ErrorMessage; the payload is a MessagingException containing the failed message and cause.

9

The recovery interval - controls the interval at which the adapter will attempt to reconnect after a failure; it defaults to 10000ms (ten seconds).

[Note]Note

Starting with version 4.1 the url can be omitted and, instead, the server URIs can be provided in the serverURIs property of the DefaultMqttPahoClientFactory. This enables, for example, connection to a highly available (HA) cluster.

Starting with version 4.2.2, an MqttSubscribedEvent is published when the adapter successfully subscribes to the topic(s). MqttConnectionFailedEvent s are published when the connection/subscription fails. These events can be received by a bean that implements ApplicationListener.

Also, a new property recoveryInterval controls the interval at which the adapter will attempt to reconnect after a failure; it defaults to 10000ms (ten seconds).

[Note]Note

Prior to version 4.2.3, the client always unsubscribed when the adapter was stopped. This was incorrect because if the client QOS is > 0, we need to keep the subscription active so that messages arriving while the adapter is stopped will be delivered on the next start. This also requires setting the cleanSession property on the client factory to false - it defaults to true.

Starting with version 4.2.3, the adapter will not unsubscribe (by default) if the cleanSession property is false.

This behavior can be overridden by setting the consumerCloseAction property on the factory. It can have values: UNSUBSCRIBE_ALWAYS, UNSUBSCRIBE_NEVER, and UNSUBSCRIBE_CLEAN. The latter (the default) will unsubscribe only if the cleanSession property is true.

To revert to the pre-4.2.3 behavior, use UNSUBSCRIBE_ALWAYS.

[Important]Important

Starting with version 5.0, the topic, qos and retained properties are mapped to .RECEIVED_... headers (MqttHeaders.RECEIVED_TOPIC, MqttHeaders.RECEIVED_QOS, and MqttHeaders.RECEIVED_RETAINED), to avoid inadvertent propagation to an outbound message which (by default) uses the MqttHeaders.TOPIC, MqttHeaders.QOS, and MqttHeaders.RETAINED headers.

23.2.1 Adding/Removing Topics at Runtime

Starting with version 4.1, it is possible to programmatically change the topics to which the adapter is subscribed. Methods addTopic() and removeTopic() are provided. When adding topics, you can optionally specify the QoS (default: 1). You can also modify the topics by sending an appropriate message to a <control-bus/> with an appropriate payload: "myMqttAdapter.addTopic('foo', 1)".

Stopping/starting the adapter has no effect on the topic list (it does not revert to the original settings in the configuration). The changes are not retained beyond the life cycle of the application context; a new application context will revert to the configured settings.

Changing the topics while the adapter is stopped (or disconnected from the broker) will take effect the next time a connection is established.

23.2.2 Configuring with Java Configuration

The following Spring Boot application provides an example of configuring the inbound adapter using Java configuration:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
                .web(false)
                .run(args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
                                                 "topic1", "topic2");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

23.3 Outbound Channel Adapter

The outbound channel adapter is implemented by the MqttPahoMessageHandler which is wrapped in a ConsumerEndpoint. For convenience, it can be configured using the namespace.

Starting with version 4.1, the adapter supports asynchronous sends, avoiding blocking until the delivery is confirmed; application events can be emitted to enable applications to confirm delivery if desired.

Attributes:

<int-mqtt:outbound-channel-adapter id="withConverter"
    client-id="foo"  1
    url="tcp://localhost:1883"  2
    converter="myConverter"  3
    client-factory="clientFactory"  4
    default-qos="1"  5
    qos-expression="" 6
    default-retained="true"  7
    retained-expression="" 8
    default-topic="bar"  9
    topic-expression="" 10
    async="false"  11
    async-events="false"  12
    channel="target" />

1

The client id.

2

The broker URL.

3

An MqttMessageConverter (optional). The default DefaultPahoMessageConverter recognizes the following headers: mqtt_topic - the topic to which the message will be sent mqtt_retained - true if the message is to be retained mqtt_qos - the quality of service

4

The client factory.

5

The default quality of service (used if no mqtt_qos header is found or the qos-expression returns null. Not used if a custom converter is supplied.

6

An expression to evaluate to determine the qos; default headers[mqtt_qos].

7

The default value of the retained flag (used if no mqtt_retained header is found). Not used if a custom converter is supplied.

8

An expression to evaluate to determine the retained boolean; default headers[mqtt_retained].

9

The default topic to which the message will be sent (used if no mqtt_topic header is found).

10

An expression to evaluate to determine the destination topic; default headers['topic'].

11

When true, the caller will not block waiting for delivery confirmation when a message is sent. Default:false (the send blocks until delivery is confirmed).

12

When async and async-events are both true, an MqttMessageSentEvent is emitted, containing the message, the topic, the messageId generated by the client library, the clientId and the clientInstance (incremented each time the client is connected). When the delivery is confirmed by the client library, an MqttMessageDeliveredEvent is emitted, containing the the messageId, clientId and the clientInstance, enabling delivery to be correlated with the send. These events can be received by any ApplicationListener, or by an event inbound channel adapter. Note that it is possible that the MqttMessageDeliveredEvent might be received before the MqttMessageSentEvent. Default: false.

[Note]Note

Starting with version 4.1 the url can be omitted and, instead, the server URIs can be provided in the serverURIs property of the DefaultMqttPahoClientFactory. This enables, for example, connection to a highly available (HA) cluster.

23.3.1 Configuring with Java Configuration

The following Spring Boot application provides an example of configuring the outbound adapter using Java configuration:

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs("tcp://host1:1883", "tcp://host2:1883");
        factory.setUserName("username");
        factory.setPassword("password");
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

}