WebSockets Support

Starting with version 4.1, Spring Integration has WebSocket support. It is based on the architecture, infrastructure, and API from the Spring Framework’s web-socket module. Therefore, many of Spring WebSocket’s components (such as SubProtocolHandler or WebSocketClient) and configuration options (such as @EnableWebSocketMessageBroker) can be reused within Spring Integration. For more information, see the Spring Framework WebSocket Support chapter in the Spring Framework reference manual.

You need to include this dependency into your project:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-websocket</artifactId>
    <version>5.5.11</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-websocket:5.5.11"

For server side, the org.springframework:spring-webmvc dependency must be included explicitly.

The Spring Framework WebSocket infrastructure is based on the Spring messaging foundation and provides a basic messaging framework based on the same MessageChannel implementations and MessageHandler implementations that Spring Integration uses (and some POJO-method annotation mappings). Consequently, Spring Integration can be directly involved in a WebSocket flow, even without WebSocket adapters. For this purpose, you can configure a Spring Integration @MessagingGateway with appropriate annotations, as the following example shows:

@MessagingGateway
@Controller
public interface WebSocketGateway {

    @MessageMapping("/greeting")
    @SendToUser("/queue/answer")
    @Gateway(requestChannel = "greetingChannel")
    String greeting(String payload);

}

Overview

Since the WebSocket protocol is streaming by definition and we can send and receive messages to and from a WebSocket at the same time, we can deal with an appropriate WebSocketSession, regardless of being on the client or server side. To encapsulate the connection management and WebSocketSession registry, the IntegrationWebSocketContainer is provided with ClientWebSocketContainer and ServerWebSocketContainer implementations. Thanks to the WebSocket API and its implementation in the Spring Framework (with many extensions), the same classes are used on the server side as well as the client side (from a Java perspective, of course). Consequently, most connection and WebSocketSession registry options are the same on both sides. That lets us reuse many configuration items and infrastructure hooks to build WebSocket applications on the server side as well as on the client side. The following example shows how components can serve both purposes:

//Client side
@Bean
public WebSocketClient webSocketClient() {
    return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}

@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
    return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}

//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
    return new ServerWebSocketContainer("/endpoint").withSockJs();
}

The IntegrationWebSocketContainer is designed to achieve bidirectional messaging and can be shared between inbound and outbound channel adapters (see below), can be referenced from only one of them when using one-way (sending or receiving) WebSocket messaging. It can be used without any channel adapter, but, in this case, IntegrationWebSocketContainer only plays a role as the WebSocketSession registry.

The ServerWebSocketContainer implements WebSocketConfigurer to register an internal IntegrationWebSocketContainer.IntegrationWebSocketHandler as an Endpoint. It does so under the provided paths and other server WebSocket options (such as HandshakeHandler or SockJS fallback) within the ServletWebSocketHandlerRegistry for the target vendor WebSocket Container. This registration is achieved with an infrastructural WebSocketIntegrationConfigurationInitializer component, which does the same as the @EnableWebSocket annotation. This means that, by using @EnableIntegration (or any Spring Integration namespace in the application context), you can omit the @EnableWebSocket declaration, because the Spring Integration infrastructure detects all WebSocket endpoints.

WebSocket Inbound Channel Adapter

The WebSocketInboundChannelAdapter implements the receiving part of WebSocketSession interaction. You must supply it with a IntegrationWebSocketContainer, and the adapter registers itself as a WebSocketListener to handle incoming messages and WebSocketSession events.

Only one WebSocketListener can be registered in the IntegrationWebSocketContainer.

For WebSocket subprotocols, the WebSocketInboundChannelAdapter can be configured with SubProtocolHandlerRegistry as the second constructor argument. The adapter delegates to the SubProtocolHandlerRegistry to determine the appropriate SubProtocolHandler for the accepted WebSocketSession and to convert a WebSocketMessage to a Message according to the sub-protocol implementation.

By default, the WebSocketInboundChannelAdapter relies only on the raw PassThruSubProtocolHandler implementation, which converts the WebSocketMessage to a Message.

The WebSocketInboundChannelAdapter accepts and sends to the underlying integration flow only Message instances that have SimpMessageType.MESSAGE or an empty simpMessageType header. All other Message types are handled through the ApplicationEvent instances emitted from a SubProtocolHandler implementation (such as StompSubProtocolHandler).

On the server side, if the @EnableWebSocketMessageBroker configuration is present, you can configure WebSocketInboundChannelAdapter with the useBroker = true option. In this case, all non-MESSAGE Message types are delegated to the provided AbstractBrokerMessageHandler. In addition, if the broker relay is configured with destination prefixes, those messages that match the Broker destinations are routed to the AbstractBrokerMessageHandler instead of to the outputChannel of the WebSocketInboundChannelAdapter.

If useBroker = false and the received message is of the SimpMessageType.CONNECT type, the WebSocketInboundChannelAdapter immediately sends a SimpMessageType.CONNECT_ACK message to the WebSocketSession without sending it to the channel.

Spring’s WebSocket Support allows the configuration of only one broker relay. Consequently, we do not require an AbstractBrokerMessageHandler reference. It is detected in the Application Context.

For more configuration options, see WebSockets Namespace Support.

WebSocket Outbound Channel Adapter

The WebSocketOutboundChannelAdapter:

  1. Accepts Spring Integration messages from its MessageChannel

  2. Determines the WebSocketSession id from the MessageHeaders

  3. Retrieves the WebSocketSession from the provided IntegrationWebSocketContainer

  4. Delegates the conversion and sending of WebSocketMessage work to the appropriate SubProtocolHandler from the provided SubProtocolHandlerRegistry.

On the client side, the WebSocketSession id message header is not required, because ClientWebSocketContainer deals only with a single connection and its WebSocketSession respectively.

To use the STOMP sub-protocol, you should configure this adapter with a StompSubProtocolHandler. Then you can send any STOMP message type to this adapter, using StompHeaderAccessor.create(StompCommand…​) and a MessageBuilder, or just using a HeaderEnricher (see Header Enricher).

The rest of this chapter covers largely additional configuration options.

WebSockets Namespace Support

The Spring Integration WebSocket namespace includes several components described in the remainder of this chapter. To include it in your configuration, use the following namespace declaration in your application context configuration file:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/websocket
    https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
    ...
</beans>

<int-websocket:client-container> Attributes

The following listing shows the attributes available for the <int-websocket:client-container> element:

<int-websocket:client-container
                  id=""                        (1)
                  client=""                    (2)
                  uri=""                       (3)
                  uri-variables=""             (4)
                  origin=""                    (5)
                  send-time-limit=""           (6)
                  send-buffer-size-limit=""    (7)
                  auto-startup=""              (8)
                  phase="">                    (9)
                <int-websocket:http-headers>
                  <entry key="" value=""/>
                </int-websocket:http-headers>  (10)
</int-websocket:client-container>
1 The component bean name.
2 The WebSocketClient bean reference.
3 The uri or uriTemplate to the target WebSocket service. If you use it as a uriTemplate with URI variable placeholders, the uri-variables attribute is required.
4 Comma-separated values for the URI variable placeholders within the uri attribute value. The values are replaced into the placeholders according to their order in the uri. See UriComponents.expand(Object…​uriVariableValues).
5 The Origin Handshake HTTP header value.
6 The WebSocket session 'send' timeout limit. Defaults to 10000.
7 The WebSocket session 'send' message size limit. Defaults to 524288.
8 Boolean value indicating whether this endpoint should start automatically. Defaults to false, assuming that this container is started from the WebSocket inbound adapter.
9 The lifecycle phase within which this endpoint should start and stop. The lower the value, the earlier this endpoint starts and the later it stops. The default is Integer.MAX_VALUE. Values can be negative. See SmartLifeCycle.
10 A Map of HttpHeaders to be used with the Handshake request.

<int-websocket:server-container> Attributes

The following listing shows the attributes available for the <int-websocket:server-container> element:

<int-websocket:server-container
          id=""                         (1)
          path=""                       (2)
          handshake-handler=""          (3)
          handshake-interceptors=""     (4)
          decorator-factories=""        (5)
          send-time-limit=""            (6)
          send-buffer-size-limit=""     (7)
          allowed-origins="">           (8)
          <int-websocket:sockjs
            client-library-url=""       (9)
            stream-bytes-limit=""       (10)
            session-cookie-needed=""    (11)
            heartbeat-time=""           (12)
            disconnect-delay=""         (13)
            message-cache-size=""       (14)
            websocket-enabled=""        (15)
            scheduler=""                (16)
            message-codec=""            (17)
            transport-handlers=""       (18)
            suppress-cors="true"="" />  (19)
</int-websocket:server-container>
1 The component bean name.
2 A path (or comma-separated paths) that maps a particular request to a WebSocketHandler. Supports exact path mapping URIs (such as /myPath) and ant-style path patterns (such as /myPath/**).
3 The HandshakeHandler bean reference. Defaults to DefaultHandshakeHandler.
4 List of HandshakeInterceptor bean references.
5 List of one or more factories (WebSocketHandlerDecoratorFactory) that decorate the handler used to process WebSocket messages. This may be useful for some advanced use cases (for example, to allow Spring Security to forcibly close the WebSocket session when the corresponding HTTP session expires). See the Spring Session Project for more information.
6 See the same option on the <int-websocket:client-container>.
7 See the same option on the <int-websocket:client-container>.
8 The allowed origin header values. You can specify multiple origins as a comma-separated list. This check is mostly designed for browser clients. There is nothing preventing other types of client from modifying the origin header value. When SockJS is enabled and allowed origins are restricted, transport types that do not use origin headers for cross-origin requests (jsonp-polling, iframe-xhr-polling, iframe-eventsource, and iframe-htmlfile) are disabled. As a consequence, IE6 and IE7 are not supported, and IE8 and IE9 are supported only without cookies. By default, all origins are allowed.
9 Transports with no native cross-domain communication (such as eventsource and htmlfile) must get a simple page from the “foreign” domain in an invisible iframe so that code in the iframe can run from a domain local to the SockJS server. Since the iframe needs to load the SockJS javascript client library, this property lets you specify the location from which to load it. By default, it points to https://d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js. However, you can also set it to point to a URL served by the application. Note that it is possible to specify a relative URL, in which case the URL must be relative to the iframe URL. For example, assuming a SockJS endpoint mapped to /sockjs and the resulting iframe URL is /sockjs/iframe.html, the relative URL must start with "../../" to traverse up to the location above the SockJS mapping. For prefix-based servlet mapping, you may need one more traversal.
10 Minimum number of bytes that can be sent over a single HTTP streaming request before it is closed. Defaults to 128K (that is, 128*1024 or 131072 bytes).
11 The cookie_needed value in the response from the SockJs /info endpoint. This property indicates whether a JSESSIONID cookie is required for the application to function correctly (for example, for load balancing or in Java Servlet containers for the use of an HTTP session).
12 The amount of time (in milliseconds) when the server has not sent any messages and after which the server should send a heartbeat frame to the client in order to keep the connection from breaking. The default value is 25,000 (25 seconds).
13 The amount of time (in milliseconds) before a client is considered disconnected after not having a receiving connection (that is, an active connection over which the server can send data to the client). The default value is 5000.
14 The number of server-to-client messages that a session can cache while waiting for the next HTTP polling request from the client. The default size is 100.
15 Some load balancers do not support WebSockets. Set this option to false to disable the WebSocket transport on the server side. The default value is true.
16 The TaskScheduler bean reference. A new ThreadPoolTaskScheduler instance is created if no value is provided. This scheduler instance is used for scheduling heart-beat messages.
17 The SockJsMessageCodec bean reference to use for encoding and decoding SockJS messages. By default, Jackson2SockJsMessageCodec is used, which requires the Jackson library to be present on the classpath.
18 List of TransportHandler bean references.
19 Whether to disable automatic addition of CORS headers for SockJS requests. The default value is false.

<int-websocket:outbound-channel-adapter> Attributes

The following listing shows the attributes available for the <int-websocket:outbound-channel-adapter> element:

<int-websocket:outbound-channel-adapter
                          id=""                             (1)
                          channel=""                        (2)
                          container=""                      (3)
                          default-protocol-handler=""       (4)
                          protocol-handlers=""              (5)
                          message-converters=""             (6)
                          merge-with-default-converters=""  (7)
                          auto-startup=""                   (8)
                          phase=""/>                        (9)
1 The component bean name. If you do not provide the channel attribute, a DirectChannel is created and registered in the application context with this id attribute as the bean name. In this case, the endpoint is registered with the bean name id plus .adapter. And the MessageHandler is registered with the bean alias id plus .handler.
2 Identifies the channel attached to this adapter.
3 The reference to the IntegrationWebSocketContainer bean, which encapsulates the low-level connection and WebSocketSession handling operations. Required.
4 Optional reference to a SubProtocolHandler instance. It is used when the client did not request a sub-protocol or it is a single protocol-handler. If this reference or a protocol-handlers list is not provided, the PassThruSubProtocolHandler is used by default.
5 List of SubProtocolHandler bean references for this channel adapter. If you provide only a single bean reference and do not provide a default-protocol-handler, that single SubProtocolHandler is used as the default-protocol-handler. If you do not set this attribute or default-protocol-handler, the PassThruSubProtocolHandler is used by default.
6 List of MessageConverter bean references for this channel adapter.
7 Boolean value indicating whether the default converters should be registered after any custom converters. This flag is used only if message-converters is provided. Otherwise, all default converters are registered. Defaults to false. The default converters are (in order): StringMessageConverter, ByteArrayMessageConverter, and MappingJackson2MessageConverter (if the Jackson library is present on the classpath).
8 Boolean value indicating whether this endpoint should start automatically. Defaults to true.
9 The lifecycle phase within which this endpoint should start and stop. The lower the value, the earlier this endpoint starts and the later it stops. The default is Integer.MIN_VALUE. Values can be negative. See SmartLifeCycle.

<int-websocket:inbound-channel-adapter> Attributes

The following listing shows the attributes available for the <int-websocket:outbound-channel-adapter> element:

<int-websocket:inbound-channel-adapter
                            id=""  (1)
                            channel=""  (2)
                            error-channel=""  (3)
                            container=""  (4)
                            default-protocol-handler=""  (5)
                            protocol-handlers=""  (6)
                            message-converters=""  (7)
                            merge-with-default-converters=""  (8)
                            send-timeout=""  (9)
                            payload-type=""  (10)
                            use-broker=""  (11)
                            auto-startup=""  (12)
                            phase=""/>  (13)
1 The component bean name. If you do not set the channel attribute, a DirectChannel is created and registered in the application context with this id attribute as the bean name. In this case, the endpoint is registered with the bean name id plus .adapter.
2 Identifies the channel attached to this adapter.
3 The MessageChannel bean reference to which the ErrorMessage instances should be sent.
4 See the same option on the <int-websocket:outbound-channel-adapter>.
5 See the same option on the <int-websocket:outbound-channel-adapter>.
6 See the same option on the <int-websocket:outbound-channel-adapter>.
7 See the same option on the <int-websocket:outbound-channel-adapter>.
8 See the same option on the <int-websocket:outbound-channel-adapter>.
9 Maximum amount of time (in milliseconds) to wait when sending a message to the channel if the channel can block. For example, a QueueChannel can block until space is available if its maximum capacity has been reached.
10 Fully qualified name of the Java type for the target payload to convert from the incoming WebSocketMessage. Defaults to java.lang.String.
11 Indicates whether this adapter sends non-MESSAGE WebSocketMessage instances and messages with broker destinations to the AbstractBrokerMessageHandler from the application context. When this attribute is true, the Broker Relay configuration is required. This attribute is used only on the server side. On the client side, it is ignored. Defaults to false.
12 See the same option on the <int-websocket:outbound-channel-adapter>.
13 See the same option on the <int-websocket:outbound-channel-adapter>.

Using ClientStompEncoder

Starting with version 4.3.13, Spring Integration provides ClientStompEncoder (as an extension of the standard StompEncoder) for use on the client side of WebSocket channel adapters. For proper client side message preparation, you must inject an instance of the ClientStompEncoder into the StompSubProtocolHandler. One problem with the default StompSubProtocolHandler is that it was designed for the server side, so it updates the SEND stompCommand header into MESSAGE (as required by the STOMP protocol for the server side). If the client does not send its messages in the proper SEND web socket frame, some STOMP brokers do not accept them. The purpose of the ClientStompEncoder, in this case, is to override the stompCommand header and set it to the SEND value before encoding the message to the byte[].

Dynamic WebSocket Endpoints Registration

Starting with version 5.5, the WebSocket server endpoints (channel adapters based on a ServerWebSocketContainer) can now be registered (and removed) at runtime - the paths a ServerWebSocketContainer is mapped is exposed via HandlerMapping into a DispatcherServlet and accessible for WebSocket clients. The Dynamic and Runtime Integration Flows support helps to register these endpoints in a transparent manner:

@Autowired
IntegrationFlowContext integrationFlowContext;

@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
       new ServerWebSocketContainer("/dynamic")
               .setHandshakeHandler(this.handshakeHandler);

WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
       new WebSocketInboundChannelAdapter(serverWebSocketContainer);

QueueChannel dynamicRequestsChannel = new QueueChannel();

IntegrationFlow serverFlow =
       IntegrationFlows.from(webSocketInboundChannelAdapter)
               .channel(dynamicRequestsChannel)
               .get();

IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
       this.integrationFlowContext.registration(serverFlow)
               .addBean(serverWebSocketContainer)
               .register();
...
dynamicServerFlow.destroy();
It is important to call .addBean(serverWebSocketContainer) on the dynamic flow registration to add the instance of ServerWebSocketContainer into an ApplicationContext for endpoint registration. When a dynamic flow registration is destroyed, the associated ServerWebSocketContainer instance is destroyed, too, as well as the respective endpoint registration, including URL path mappings.
The dynamic Websocket endpoints can only be registered via Spring Integration mechanism: when regular Spring @EnableWebsocket is used, Spring Integration configuration backs off and no infrastructure for dynamic endpoints is registered.