28. STOMP Support

28.1 Introduction

Spring Integration version 4.2 introduced STOMP Client support. It is based on the architecture, infrastructure and API from the Spring Framework’s messaging module, stomp package. Many of Spring STOMP components (e.g. StompSession or StompClientSupport) are used within Spring Integration. For more information, please, refer to the Spring Framework STOMP Support chapter in the Spring Framework reference manual.

28.2 Overview

To configure STOMP (Simple [or Streaming] Text Orientated Messaging Protocol) let’s start with the STOMP Client object. The Spring Framework provides these implementations:

  • WebSocketStompClient - built on the Spring WebSocket API with support for standard JSR-356 WebSocket, Jetty 9, as well as SockJS for HTTP-based WebSocket emulation with SockJS Client.
  • ReactorNettyTcpStompClient - built on ReactorNettyTcpClient from the reactor-netty project.

Any other StompClientSupport implementation can be provided. See the JavaDocs of those classes for more information.

The StompClientSupport class is designed as a factory to produce a StompSession for the provided StompSessionHandler and all the remaining work is done through the callbacks to that StompSessionHandler and StompSession abstraction. With the Spring Integration adapter abstraction, we need to provide some managed shared object to represent our application as a STOMP client with its unique session. For this purpose, Spring Integration provides the StompSessionManager abstraction to manage the single StompSession between any provided StompSessionHandler. This allows the use of inbound or outbound channel adapters (or both) for the particular STOMP Broker. See StompSessionManager (and its implementations) JavaDocs for more information.

28.3 STOMP Inbound Channel Adapter

The StompInboundChannelAdapter is a one-stop MessageProducer component to subscribe our Spring Integration application to the provided STOMP destinations and receive messages from them, converted from the STOMP frames using the provided MessageConverter on the connected StompSession. The destinations (and therefore STOMP subscriptions) can be changed at runtime using appropriate @ManagedOperation s on the StompInboundChannelAdapter.

For more configuration options see Section 28.8, “STOMP Namespace Support” and the StompInboundChannelAdapter JavaDocs.

28.4 STOMP Outbound Channel Adapter

The StompMessageHandler is the MessageHandler for the <int-stomp:outbound-channel-adapter> to send the outgoing Message<?> s to the STOMP destination (pre-configured or determined at runtime via a SpEL expression) STOMP through the StompSession, provided by the shared StompSessionManager.

For more configuration option see Section 28.8, “STOMP Namespace Support” and the StompMessageHandler JavaDocs.

28.5 STOMP Headers Mapping

The STOMP protocol provides headers as part of frame; the entire structure of the STOMP frame has this format:

COMMAND
header1:value1
header2:value2

Body^@

Spring Framework provides StompHeaders, to represent these headers. See the JavaDocs for more details. STOMP frames are converted to/from Message<?> and these headers are mapped to/from MessageHeaders. Spring Integration provides a default HeaderMapper implementation for the STOMP adapters. The implementation is StompHeaderMapper which provides fromHeaders() and toHeaders() operations for the inbound and outbound adapters respectively.

As with many other Spring Integration modules, the IntegrationStompHeaders class has been introduced to map standard STOMP headers to MessageHeaders with stomp_ as the header name prefix. In addition, all MessageHeaders with that prefix are mapped to the StompHeaders when sending to a destination.

For more information, see the JavaDocs of those classes and the mapped-headers attribute description in the Section 28.8, “STOMP Namespace Support”.

28.6 STOMP Integration Events

Many STOMP operations are asynchronous, including error handling. For example, STOMP has a RECEIPT server frame that is returned when a client frame has requested one by adding the RECEIPT header. To provide access to these asynchronous events, Spring Integration emits StompIntegrationEvent s which can be obtained by implementing an ApplicationListener or using an <int-event:inbound-channel-adapter> (see Section 12.1, “Receiving Spring Application Events”).

Specifically, a StompExceptionEvent is emitted from the AbstractStompSessionManager, when a stompSessionListenableFuture receives onFailure() in case of failure to connect to STOMP Broker. Another example is the StompMessageHandler which processes ERROR STOMP frames, which are server responses to improper, unaccepted, messages sent by this StompMessageHandler.

The StompReceiptEvent s are emitted from the StompMessageHandler as a part of StompSession.Receiptable callbacks in the asynchronous answers for the sent messages to the StompSession. The StompReceiptEvent can be positive and negative depending on whether or not the RECEIPT frame was received from the server within the receiptTimeLimit period, which can be configured on the StompClientSupport instance. Defaults to 15 * 1000.

[Note]Note

The StompSession.Receiptable callbacks are added only if the RECEIPT STOMP header of the message to send is not null. Automatic RECEIPT header generation can be enabled on the StompSession through its autoReceipt option and on the StompSessionManager respectively.

See the next paragraph for more information how to configure Spring Integration to accept those ApplicationEvent s.

28.7 STOMP Adapters Java Configuration

A comprehensive Java & Annotation Configuration for STOMP Adapters may look like this:

@Configuration
@EnableIntegration
public class StompConfiguration {

    @Bean
    public ReactorNettyTcpStompClient stompClient() {
        ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
        stompClient.setMessageConverter(new PassThruMessageConverter());
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.afterPropertiesSet();
        stompClient.setTaskScheduler(taskScheduler);
        stompClient.setReceiptTimeLimit(5000);
        return stompClient;
    }

    @Bean
    public StompSessionManager stompSessionManager() {
        ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
        stompSessionManager.setAutoReceipt(true);
        return stompSessionManager;
    }

    @Bean
    public PollableChannel stompInputChannel() {
        return new QueueChannel();
    }

    @Bean
    public StompInboundChannelAdapter stompInboundChannelAdapter() {
        StompInboundChannelAdapter adapter =
        		new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
        adapter.setOutputChannel(stompInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "stompOutputChannel")
    public MessageHandler stompMessageHandler() {
        StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
        handler.setDestination("/topic/myTopic");
        return handler;
    }

    @Bean
    public PollableChannel stompEvents() {
        return new QueueChannel();
    }

    @Bean
    public ApplicationListener<ApplicationEvent> stompEventListener() {
        ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
        producer.setEventTypes(StompIntegrationEvent.class);
        producer.setOutputChannel(stompEvents());
        return producer;
    }

}

28.8 STOMP Namespace Support

Spring Integration STOMP namespace implements the inbound and outbound channel adapter components described below. To include it in your configuration, simply provide the following namespace declaration in your application context configuration file:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/stomp
    http://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
    ...
</beans>

<int-stomp:outbound-channel-adapter>

<int-stomp:outbound-channel-adapter
                           id=""  1
                           channel=""  2
                           stomp-session-manager=""  3
                           header-mapper=""  4
                           mapped-headers=""  5
                           destination=""  6
                           destination-expression=""  7
                           auto-startup=""  8
                           phase=""/>  9

1

The component bean name. The MessageHandler is registered with the bean alias id + '.handler'. If the channel attribute isn’t provided, a DirectChannel is created and registered with the application context with this id attribute as the bean name. In this case, the endpoint is registered with the bean name id + '.adapter'.

2

Identifies the channel attached to this adapter. Optional - if id is present - see id.

3

Reference to a StompSessionManager bean, which encapsulates the low-level connection and StompSession handling operations. Required.

4

Reference to a bean implementing HeaderMapper<StompHeaders> that maps Spring Integration MessageHeaders to/from STOMP frame headers. This is mutually exclusive with mapped-headers. Defaults to StompHeaderMapper.

5

Comma-separated list of names of STOMP Headers to be mapped to the STOMP frame headers. This can only be provided if the header-mapper reference is not set. The values in this list can also be simple patterns to be matched against the header names (e.g. "foo*" or "*foo"). A special token STOMP_OUTBOUND_HEADERS represents all the standard STOMP headers (content-length, receipt, heart-beat etc); they are included by default. If you wish to add your own headers, you must also include this token if you wish the standard headers to also be mapped or provide your own HeaderMapper implementation using header-mapper.

6

Name of the destination to which STOMP Messages will be sent. Mutually exclusive with the destination-expression.

7

A SpEL expression to be evaluated at runtime against each Spring Integration Message as the root object. Mutually exclusive with the destination.

8

Boolean value indicating whether this endpoint should start automatically. Default to true.

9

The lifecycle phase within which this endpoint should start and stop. The lower the value the earlier this endpoint will start and the later it will stop. The default is Integer.MIN_VALUE. Values can be negative. See SmartLifeCycle.

<int-stomp:inbound-channel-adapter>

<int-stomp:inbound-channel-adapter
                           id=""  1
                           channel=""  2
                           error-channel=""  3
                           stomp-session-manager=""  4
                           header-mapper=""  5
                           mapped-headers=""  6
                           destinations=""  7
                           send-timeout=""  8
                           payload-type=""  9
                           auto-startup=""  10
                           phase=""/>  11

1

The component bean name. If the channel attribute isn’t provided, a DirectChannel is created and registered with the application context with this id attribute as the bean name. In this case, the endpoint is registered with the bean name id + '.adapter'.

2

Identifies the channel attached to this adapter.

3

The MessageChannel bean reference to which the ErrorMessages should be sent.

4

See the same option on the <int-stomp:outbound-channel-adapter>.

5

Comma-separated list of names of STOMP Headers to be mapped from the STOMP frame headers. This can only be provided if the header-mapper reference is not set. The values in this list can also be simple patterns to be matched against the header names (e.g. "foo*" or "*foo"). A special token STOMP_INBOUND_HEADERS represents all the standard STOMP headers (content-length, receipt, heart-beat etc); they are included by default. If you wish to add your own headers, you must also include this token if you wish the standard headers to also be mapped or provide your own HeaderMapper implementation using header-mapper.

6

See the same option on the <int-stomp:outbound-channel-adapter>.

7

Comma-separated list of STOMP destination names to subscribe. The list of destinations (and therefore subscriptions) can be modified at runtime through the addDestination() and removeDestination() @ManagedOperation s.

8

Maximum amount of time in milliseconds to wait when sending a message to the channel if the channel may block. For example, a QueueChannel can block until space is available if its maximum capacity has been reached.

9

Fully qualified name of the java type for the target payload to convert from the incoming STOMP Frame. Default to String.class.

10

See the same option on the <int-stomp:outbound-channel-adapter>.

11

See the same option on the <int-stomp:outbound-channel-adapter>.