Spring Integration version 4.2 introduced STOMP Client support.
It is based on the architecture, infrastructure and API from the Spring Framework’s messaging module, stomp package.
Many of Spring STOMP components (e.g. StompSession
or StompClientSupport
)
are used within Spring Integration.
For more information, please, refer to the Spring Framework STOMP Support
chapter in the Spring Framework reference manual.
To configure STOMP (Simple [or Streaming] Text Orientated Messaging Protocol) let’s start with the STOMP Client object. The Spring Framework provides these implementations:
WebSocketStompClient
- built on the Spring WebSocket API with support for standard JSR-356 WebSocket, Jetty 9,
as well as SockJS for HTTP-based WebSocket emulation with SockJS Client.
ReactorNettyTcpStompClient
- built on ReactorNettyTcpClient
from the reactor-netty
project.
Any other StompClientSupport
implementation can be provided.
See the JavaDocs of those classes for more information.
The StompClientSupport
class is designed as a factory to produce a StompSession
for the provided
StompSessionHandler
and all the remaining work is done through the callbacks to that StompSessionHandler
and StompSession
abstraction.
With the Spring Integration adapter abstraction, we
need to provide some managed shared object to represent our application as a STOMP client with its unique session.
For this purpose, Spring Integration provides the StompSessionManager
abstraction to manage the single
StompSession
between any provided StompSessionHandler
.
This allows the use of inbound or outbound channel adapters (or both) for the particular STOMP Broker.
See StompSessionManager
(and its implementations) JavaDocs for more information.
The StompInboundChannelAdapter
is a one-stop MessageProducer
component to subscribe our Spring Integration
application to the provided STOMP destinations and receive messages from them, converted from the STOMP
frames using the provided MessageConverter
on the connected StompSession
.
The destinations (and therefore STOMP subscriptions) can be changed at runtime using appropriate @ManagedOperation
s
on the StompInboundChannelAdapter
.
For more configuration options see Section 28.8, “STOMP Namespace Support” and the StompInboundChannelAdapter
JavaDocs.
The StompMessageHandler
is the MessageHandler
for the <int-stomp:outbound-channel-adapter>
to send the outgoing Message<?>
s to the STOMP destination
(pre-configured or determined at runtime via a SpEL expression) STOMP through the StompSession
, provided by the shared StompSessionManager
.
For more configuration option see Section 28.8, “STOMP Namespace Support” and the StompMessageHandler
JavaDocs.
The STOMP protocol provides headers as part of frame; the entire structure of the STOMP frame has this format:
COMMAND header1:value1 header2:value2 Body^@
Spring Framework provides StompHeaders
, to represent these headers.
See the JavaDocs for more details.
STOMP frames are converted to/from Message<?>
and these headers are mapped to/from MessageHeaders
.
Spring Integration provides a default HeaderMapper
implementation for the STOMP adapters.
The implementation is StompHeaderMapper
which provides fromHeaders()
and toHeaders()
operations for the
inbound and outbound adapters respectively.
As with many other Spring Integration modules, the IntegrationStompHeaders
class has been
introduced to map standard STOMP headers to MessageHeaders
with stomp_
as the header name prefix.
In addition, all MessageHeaders
with that prefix are mapped to the StompHeaders
when sending to a destination.
For more information, see the JavaDocs of those classes and the mapped-headers
attribute description in the
Section 28.8, “STOMP Namespace Support”.
Many STOMP operations are asynchronous, including error handling.
For example, STOMP has a RECEIPT
server frame that is returned when a client frame has requested one by adding
the RECEIPT
header.
To provide access to these asynchronous events, Spring Integration emits StompIntegrationEvent
s which can be
obtained by implementing an ApplicationListener
or using an <int-event:inbound-channel-adapter>
(see Section 12.1, “Receiving Spring Application Events”).
Specifically, a StompExceptionEvent
is emitted from the AbstractStompSessionManager
, when a
stompSessionListenableFuture
receives onFailure()
in case of failure to connect to STOMP Broker.
Another example is the StompMessageHandler
which processes
ERROR
STOMP frames, which are server responses to improper, unaccepted, messages sent by this StompMessageHandler
.
The StompReceiptEvent
s are emitted from the StompMessageHandler
as a part of StompSession.Receiptable
callbacks in the asynchronous answers for the sent messages to the StompSession
.
The StompReceiptEvent
can be positive and negative depending on whether or not the RECEIPT
frame was received
from the server within the receiptTimeLimit
period, which can be configured on the StompClientSupport
instance.
Defaults to 15 * 1000
.
Note | |
---|---|
The |
See the next paragraph for more information how to configure Spring Integration to accept those ApplicationEvent
s.
A comprehensive Java & Annotation Configuration for STOMP Adapters may look like this:
@Configuration @EnableIntegration public class StompConfiguration { @Bean public ReactorNettyTcpStompClient stompClient() { ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613); stompClient.setMessageConverter(new PassThruMessageConverter()); ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.afterPropertiesSet(); stompClient.setTaskScheduler(taskScheduler); stompClient.setReceiptTimeLimit(5000); return stompClient; } @Bean public StompSessionManager stompSessionManager() { ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient()); stompSessionManager.setAutoReceipt(true); return stompSessionManager; } @Bean public PollableChannel stompInputChannel() { return new QueueChannel(); } @Bean public StompInboundChannelAdapter stompInboundChannelAdapter() { StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic"); adapter.setOutputChannel(stompInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "stompOutputChannel") public MessageHandler stompMessageHandler() { StompMessageHandler handler = new StompMessageHandler(stompSessionManager()); handler.setDestination("/topic/myTopic"); return handler; } @Bean public PollableChannel stompEvents() { return new QueueChannel(); } @Bean public ApplicationListener<ApplicationEvent> stompEventListener() { ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer(); producer.setEventTypes(StompIntegrationEvent.class); producer.setOutputChannel(stompEvents()); return producer; } }
Spring Integration STOMP namespace implements the inbound and outbound channel adapter components described below. To include it in your configuration, simply provide the following namespace declaration in your application context configuration file:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/stomp http://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd"> ... </beans>
<int-stomp:outbound-channel-adapter>
<int-stomp:outbound-channel-adapter id="" channel="" stomp-session-manager="" header-mapper="" mapped-headers="" destination="" destination-expression="" auto-startup="" phase=""/>
The component bean name.
The | |
Identifies the channel attached to this adapter.
Optional - if | |
Reference to a | |
Reference to a bean implementing | |
Comma-separated list of names of STOMP Headers to be mapped to the STOMP frame headers.
This can only be provided if the | |
Name of the destination to which STOMP Messages will be sent.
Mutually exclusive with the | |
A SpEL expression to be evaluated at runtime against each Spring Integration | |
Boolean value indicating whether this endpoint should start automatically.
Default to | |
The lifecycle phase within which this endpoint should start and stop.
The lower the value the earlier this endpoint will start and the later it will stop.
The default is |
<int-stomp:inbound-channel-adapter>
<int-stomp:inbound-channel-adapter id="" channel="" error-channel="" stomp-session-manager="" header-mapper="" mapped-headers="" destinations="" send-timeout="" payload-type="" auto-startup="" phase=""/>
The component bean name.
If the | |
Identifies the channel attached to this adapter. | |
The | |
See the same option on the | |
Comma-separated list of names of STOMP Headers to be mapped from the STOMP frame headers.
This can only be provided if the | |
See the same option on the | |
Comma-separated list of STOMP destination names to subscribe.
The list of destinations (and therefore subscriptions) can be modified at runtime
through the | |
Maximum amount of time in milliseconds to wait when sending a message to the channel if the channel may block.
For example, a | |
Fully qualified name of the java type for the target | |
See the same option on the | |
See the same option on the |