Spring Integration provides inbound and outbound channel adapters supporting the MQ Telemetry Transport (MQTT) protocol. The current implementation uses the Eclipse Paho MQTT Client library.
Configuration of both adapters is achieved using the
DefaultMqttPahoClientFactory
.
Refer to the Paho documentation for more information about configuration
options.
The inbound channel adapter is implemented by the
MqttPahoMessageDrivenChannelAdapter
. For convenience, it
can be configured using the namespace. A minimal configuration might be:
<bean id="clientFactory" class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory"> <property name="userName" value="${mqtt.username}"/> <property name="password" value="${mqtt.password}"/> </bean> <int-mqtt:message-driven-channel-adapter id="mqttInbound" client-id="${mqtt.default.client.id}.src" url="${mqtt.url}" topics="sometopic" client-factory="clientFactory" channel="output"/>
Attributes:
<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter" client-id="foo" url="tcp://localhost:1883" topics="bar,baz" qos="1,2" converter="myConverter" client-factory="clientFactory" send-timeout="123" error-channel="errors" channel="out" />
The client id. | ||||
The broker URL.
| ||||
A comma delimited list of topics from which this adapter will receive messages. | ||||
A comma delimited list of QoS values. Can be a single value that is applied to all topics, or a value for each topic (in which case the lists must the same length). | ||||
An MqttMessageConverter (optional). The default
DefaultPahoMessageConverter produces a message with a String
payload (by default) with the following headers:
DefaultPahoMessageConverter can be configured to return the raw
byte[] in the payload by declaring it as a <bean/> and setting the
payloadAsBytes property.
| ||||
The client factory. | ||||
The send timeout - only applies if the channel might block (such as a bounded QueueChannel
that is currently full).
| ||||
The error channel - downstream exceptions will be sent to this channel, if supplied, in an
ErrorMessage ; the payload is a MessagingException
containing the failed message and cause.
|
Starting with version 4.1, it is possible to programmatically change the topics
to which the adapter is subscribed. Methods addTopic()
and removeTopic()
are
provided. When adding topics, you can optionally specify the QoS
(default: 1). You can
also modify the topics by sending an appropriate message to a <control-bus/>
with
an appropriate payload: "myMqttAdapter.addTopic('foo', 1)"
.
Stopping/starting the adapter has no effect on the topic list (it does not revert to the original settings in the configuration). The changes are not retained beyond the life cycle of the application context; a new application context will revert to the configured settings.
Changing the topics while the adapter is stopped (or disconnected from the broker) will take effect the next time a connection is established.
The outbound channel adapter is implemented by the MqttPahoMessageHandler
which
is wrapped in a ConsumerEndpoint
. For convenience, it
can be configured using the namespace.
Starting with version 4.1, the adapter supports asynchronous sends, avoiding blocking until the delivery is confirmed; application events can be emitted to enable applications to confirm delivery if desired.
Attributes:
<int-mqtt:outbound-channel-adapter id="withConverter" client-id="foo" url="tcp://localhost:1883" converter="myConverter" client-factory="clientFactory" default-qos="1" default-retained="true" default-topic="bar" async="false" async-events="false" channel="target" />
The client id. | ||||
The broker URL.
| ||||
An MqttMessageConverter (optional). The default
DefaultPahoMessageConverter
recognizes the following headers:
| ||||
The client factory. | ||||
The default quality of service (used if no mqtt_qos header is found). Not allowed
if a custom converter is supplied.
| ||||
The default value of the retained flag (used if no mqtt_retaind header is found). Not allowed
if a custom converter is supplied.
| ||||
The default topic to which the message will be sent (used if no mqtt_topic header is found).
| ||||
When true , the caller will not block waiting for delivery confirmation when a message is
sent.
Default:false (the send blocks until delivery is confirmed).
| ||||
When async and async-events are both true , an
MqttMessageSentEvent is emitted, containing the message, the
topic, the messageId generated by the client library, the clientId
and the clientInstance (incremented each time the client is connected).
When the delivery is confirmed by the client library, an
MqttMessageDeliveredEvent is emitted, containing the the messageId ,
clientId and the clientInstance , enabling
delivery to be correlated with the send. These events can be received by any
ApplicationListener , or by an event inbound channel adapter. Note that
it is possible that the MqttMessageDeliveredEvent might be received before
the MqttMessageSentEvent .
Default: false .
|