WebSockets Support
Starting with version 4.1, Spring Integration has WebSocket support.
It is based on the architecture, infrastructure, and API from the Spring Framework’s web-socket
module.
Therefore, many of Spring WebSocket’s components (such as SubProtocolHandler
or WebSocketClient
) and configuration options (such as @EnableWebSocketMessageBroker
) can be reused within Spring Integration.
For more information, see the Spring Framework WebSocket Support chapter in the Spring Framework reference manual.
You need to include this dependency into your project:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-websocket</artifactId>
<version>6.4.1</version>
</dependency>
compile "org.springframework.integration:spring-integration-websocket:6.4.1"
For server side, the org.springframework:spring-webmvc
dependency must be included explicitly.
The Spring Framework WebSocket infrastructure is based on the Spring messaging foundation and provides a basic messaging framework based on the same MessageChannel
implementations and MessageHandler
implementations that Spring Integration uses (and some POJO-method annotation mappings).
Consequently, Spring Integration can be directly involved in a WebSocket flow, even without WebSocket adapters.
For this purpose, you can configure a Spring Integration @MessagingGateway
with appropriate annotations, as the following example shows:
@MessagingGateway
@Controller
public interface WebSocketGateway {
@MessageMapping("/greeting")
@SendToUser("/queue/answer")
@Gateway(requestChannel = "greetingChannel")
String greeting(String payload);
}
Overview
Since the WebSocket protocol is streaming by definition, and we can send and receive messages to and from a WebSocket at the same time, we can deal with an appropriate WebSocketSession
, regardless of being on the client or server side.
To encapsulate the connection management and WebSocketSession
registry, the IntegrationWebSocketContainer
is provided with ClientWebSocketContainer
and ServerWebSocketContainer
implementations.
Thanks to the WebSocket API and its implementation in the Spring Framework (with many extensions), the same classes are used on the server side as well as the client side (from a Java perspective, of course).
Consequently, most connection and WebSocketSession
registry options are the same on both sides.
That lets us reuse many configuration items and infrastructure hooks to build WebSocket applications on the server side as well as on the client side.
The following example shows how components can serve both purposes:
//Client side
@Bean
public WebSocketClient webSocketClient() {
return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}
@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}
//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
return new ServerWebSocketContainer("/endpoint").withSockJs();
}
The IntegrationWebSocketContainer
is designed to achieve bidirectional messaging and can be shared between inbound and outbound channel adapters (see below), can be referenced from only one of them when using one-way (sending or receiving) WebSocket messaging.
It can be used without any channel adapter, but, in this case, IntegrationWebSocketContainer
only plays a role as the WebSocketSession
registry.
The ServerWebSocketContainer implements WebSocketConfigurer to register an internal IntegrationWebSocketContainer.IntegrationWebSocketHandler as an Endpoint .
It does so under the provided paths and other server WebSocket options (such as HandshakeHandler or SockJS fallback ) within the ServletWebSocketHandlerRegistry for the target vendor WebSocket Container.
This registration is achieved with an infrastructural WebSocketIntegrationConfigurationInitializer component, which does the same as the @EnableWebSocket annotation.
This means that, by using @EnableIntegration (or any Spring Integration namespace in the application context), you can omit the @EnableWebSocket declaration, because the Spring Integration infrastructure detects all WebSocket endpoints.
|
Starting with version 6.1, the ClientWebSocketContainer
can be configured with a provided URI
instead of uriTemplate
and uriVariables
combination.
This is useful in cases when custom encoding is required for some parts of the uri.
See an UriComponentsBuilder
API for convenience.
WebSocket Inbound Channel Adapter
The WebSocketInboundChannelAdapter
implements the receiving part of WebSocketSession
interaction.
You must supply it with a IntegrationWebSocketContainer
, and the adapter registers itself as a WebSocketListener
to handle incoming messages and WebSocketSession
events.
Only one WebSocketListener can be registered in the IntegrationWebSocketContainer .
|
For WebSocket sub-protocols, the WebSocketInboundChannelAdapter
can be configured with SubProtocolHandlerRegistry
as the second constructor argument.
The adapter delegates to the SubProtocolHandlerRegistry
to determine the appropriate SubProtocolHandler
for the accepted WebSocketSession
and to convert a WebSocketMessage
to a Message
according to the sub-protocol implementation.
By default, the WebSocketInboundChannelAdapter relies only on the raw PassThruSubProtocolHandler implementation, which converts the WebSocketMessage to a Message .
|
The WebSocketInboundChannelAdapter
accepts and sends to the underlying integration flow only Message
instances that have SimpMessageType.MESSAGE
or an empty simpMessageType
header.
All other Message
types are handled through the ApplicationEvent
instances emitted from a SubProtocolHandler
implementation (such as StompSubProtocolHandler
).
On the server side, if the @EnableWebSocketMessageBroker
configuration is present, you can configure WebSocketInboundChannelAdapter
with the useBroker = true
option.
In this case, all non-MESSAGE
Message
types are delegated to the provided AbstractBrokerMessageHandler
.
In addition, if the broker relay is configured with destination prefixes, those messages that match the Broker destinations are routed to the AbstractBrokerMessageHandler
instead of to the outputChannel
of the WebSocketInboundChannelAdapter
.
If useBroker = false
and the received message is of the SimpMessageType.CONNECT
type, the WebSocketInboundChannelAdapter
immediately sends a SimpMessageType.CONNECT_ACK
message to the WebSocketSession
without sending it to the channel.
Spring’s WebSocket Support allows the configuration of only one broker relay.
Consequently, we do not require an AbstractBrokerMessageHandler reference.
It is detected in the Application Context.
|
For more configuration options, see WebSockets Namespace Support.
WebSocket Outbound Channel Adapter
The WebSocketOutboundChannelAdapter
:
-
Accepts Spring Integration messages from its
MessageChannel
-
Determines the
WebSocketSession
id
from theMessageHeaders
-
Retrieves the
WebSocketSession
from the providedIntegrationWebSocketContainer
-
Delegates the conversion and sending of
WebSocketMessage
work to the appropriateSubProtocolHandler
from the providedSubProtocolHandlerRegistry
.
On the client side, the WebSocketSession
id
message header is not required, because ClientWebSocketContainer
deals only with a single connection and its WebSocketSession
respectively.
To use the STOMP sub-protocol, you should configure this adapter with a StompSubProtocolHandler
.
Then you can send any STOMP message type to this adapter, using StompHeaderAccessor.create(StompCommand…)
and a MessageBuilder
, or just using a HeaderEnricher
(see Header Enricher).
The rest of this chapter covers largely additional configuration options.
WebSockets Namespace Support
The Spring Integration WebSocket namespace includes several components described in the remainder of this chapter. To include it in your configuration, use the following namespace declaration in your application context configuration file:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/websocket
https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
...
</beans>
<int-websocket:client-container>
Attributes
The following listing shows the attributes available for the <int-websocket:client-container>
element:
<int-websocket:client-container
id="" (1)
client="" (2)
uri="" (3)
uri-variables="" (4)
origin="" (5)
send-time-limit="" (6)
send-buffer-size-limit="" (7)
send-buffer-overflow-strategy="" (8)
auto-startup="" (9)
phase=""> (10)
<int-websocket:http-headers>
<entry key="" value=""/>
</int-websocket:http-headers> (11)
</int-websocket:client-container>
1 | The component bean name. |
2 | The WebSocketClient bean reference. |
3 | The uri or uriTemplate to the target WebSocket service.
If you use it as a uriTemplate with URI variable placeholders, the uri-variables attribute is required. |
4 | Comma-separated values for the URI variable placeholders within the uri attribute value.
The values are replaced into the placeholders according to their order in the uri .
See UriComponents.expand(Object…uriVariableValues) . |
5 | The Origin Handshake HTTP header value. |
6 | The WebSocket session 'send' timeout limit.
Defaults to 10000 . |
7 | The WebSocket session 'send' message size limit.
Defaults to 524288 . |
8 | The WebSocket session send buffer overflow strategy
determines the behavior when a session’s outbound message buffer has reached the send-buffer-size-limit .
See ConcurrentWebSocketSessionDecorator.OverflowStrategy for possible values and more details. |
9 | Boolean value indicating whether this endpoint should start automatically.
Defaults to false , assuming that this container is started from the WebSocket inbound adapter. |
10 | The lifecycle phase within which this endpoint should start and stop.
The lower the value, the earlier this endpoint starts and the later it stops.
The default is Integer.MAX_VALUE .
Values can be negative.
See SmartLifeCycle . |
11 | A Map of HttpHeaders to be used with the Handshake request. |
<int-websocket:server-container>
Attributes
The following listing shows the attributes available for the <int-websocket:server-container>
element:
<int-websocket:server-container
id="" (1)
path="" (2)
handshake-handler="" (3)
handshake-interceptors="" (4)
decorator-factories="" (5)
send-time-limit="" (6)
send-buffer-size-limit="" (7)
send-buffer-overflow-strategy="" (8)
allowed-origins=""> (9)
<int-websocket:sockjs
client-library-url="" (10)
stream-bytes-limit="" (11)
session-cookie-needed="" (12)
heartbeat-time="" (13)
disconnect-delay="" (14)
message-cache-size="" (15)
websocket-enabled="" (16)
scheduler="" (17)
message-codec="" (18)
transport-handlers="" (19)
suppress-cors="true" /> (20)
</int-websocket:server-container>
1 | The component bean name. |
2 | A path (or comma-separated paths) that maps a particular request to a WebSocketHandler .
Supports exact path mapping URIs (such as /myPath ) and ant-style path patterns (such as /myPath/** ). |
3 | The HandshakeHandler bean reference.
Defaults to DefaultHandshakeHandler . |
4 | List of HandshakeInterceptor bean references. |
5 | List of one or more factories (WebSocketHandlerDecoratorFactory ) that decorate the handler used to process WebSocket messages.
This may be useful for some advanced use cases (for example, to allow Spring Security to forcibly close
the WebSocket session when the corresponding HTTP session expires).
See the Spring Session Project for more information. |
6 | See the same option on the <int-websocket:client-container> . |
7 | See the same option on the <int-websocket:client-container> . |
8 | The WebSocket session send buffer overflow strategy
determines the behavior when a session’s outbound message buffer has reached the send-buffer-size-limit .
See ConcurrentWebSocketSessionDecorator.OverflowStrategy for possible values and more details. |
9 | The allowed origin header values.
You can specify multiple origins as a comma-separated list.
This check is mostly designed for browser clients.
There is nothing preventing other types of client from modifying the origin header value.
When SockJS is enabled and allowed origins are restricted, transport types that do not use origin headers for cross-origin requests (jsonp-polling , iframe-xhr-polling , iframe-eventsource , and iframe-htmlfile ) are disabled.
As a consequence, IE6 and IE7 are not supported, and IE8 and IE9 are supported only without cookies.
By default, all origins are allowed. |
10 | Transports with no native cross-domain communication (such as eventsource and htmlfile ) must get a simple page from the “foreign” domain in an invisible iframe so that code in the iframe can run from a domain local to the SockJS server.
Since the iframe needs to load the SockJS javascript client library, this property lets you specify the location from which to load it.
By default, it points to d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js .
However, you can also set it to point to a URL served by the application.
Note that it is possible to specify a relative URL, in which case the URL must be relative to the iframe URL.
For example, assuming a SockJS endpoint mapped to /sockjs and the resulting iframe URL is /sockjs/iframe.html , the relative URL must start with "../../" to traverse up to the location above the SockJS mapping.
For prefix-based servlet mapping, you may need one more traversal. |
11 | Minimum number of bytes that can be sent over a single HTTP streaming request before it is closed.
Defaults to 128K (that is, 128*1024 or 131072 bytes). |
12 | The cookie_needed value in the response from the SockJs /info endpoint.
This property indicates whether a JSESSIONID cookie is required for the application to function correctly (for example, for load balancing or in Java Servlet containers for the use of an HTTP session). |
13 | The amount of time (in milliseconds) when the server has not sent any messages and after which the server should
send a heartbeat frame to the client in order to keep the connection from breaking.
The default value is 25,000 (25 seconds). |
14 | The amount of time (in milliseconds) before a client is considered disconnected after not having a receiving connection (that is, an active connection over which the server can send data to the client).
The default value is 5000 . |
15 | The number of server-to-client messages that a session can cache while waiting for the next HTTP polling request from the client.
The default size is 100 . |
16 | Some load balancers do not support WebSockets.
Set this option to false to disable the WebSocket transport on the server side.
The default value is true . |
17 | The TaskScheduler bean reference.
A new ThreadPoolTaskScheduler instance is created if no value is provided.
This scheduler instance is used for scheduling heart-beat messages. |
18 | The SockJsMessageCodec bean reference to use for encoding and decoding SockJS messages.
By default, Jackson2SockJsMessageCodec is used, which requires the Jackson library to be present on the classpath. |
19 | List of TransportHandler bean references. |
20 | Whether to disable automatic addition of CORS headers for SockJS requests.
The default value is false . |
<int-websocket:outbound-channel-adapter>
Attributes
The following listing shows the attributes available for the <int-websocket:outbound-channel-adapter>
element:
<int-websocket:outbound-channel-adapter
id="" (1)
channel="" (2)
container="" (3)
default-protocol-handler="" (4)
protocol-handlers="" (5)
message-converters="" (6)
merge-with-default-converters="" (7)
auto-startup="" (8)
phase=""/> (9)
1 | The component bean name.
If you do not provide the channel attribute, a DirectChannel is created and registered in the application context with this id attribute as the bean name.
In this case, the endpoint is registered with the bean name id plus .adapter .
And the MessageHandler is registered with the bean alias id plus .handler . |
2 | Identifies the channel attached to this adapter. |
3 | The reference to the IntegrationWebSocketContainer bean, which encapsulates the low-level connection and WebSocketSession handling operations.
Required. |
4 | Optional reference to a SubProtocolHandler instance.
It is used when the client did not request a sub-protocol or it is a single protocol-handler.
If this reference or a protocol-handlers list is not provided, the PassThruSubProtocolHandler is used by default. |
5 | List of SubProtocolHandler bean references for this channel adapter.
If you provide only a single bean reference and do not provide a default-protocol-handler , that single SubProtocolHandler is used as the default-protocol-handler .
If you do not set this attribute or default-protocol-handler , the PassThruSubProtocolHandler is used by default. |
6 | List of MessageConverter bean references for this channel adapter. |
7 | Boolean value indicating whether the default converters should be registered after any custom converters.
This flag is used only if message-converters is provided.
Otherwise, all default converters are registered.
Defaults to false .
The default converters are (in order): StringMessageConverter , ByteArrayMessageConverter , and MappingJackson2MessageConverter (if the Jackson library is present on the classpath). |
8 | Boolean value indicating whether this endpoint should start automatically.
Defaults to true . |
9 | The lifecycle phase within which this endpoint should start and stop.
The lower the value, the earlier this endpoint starts and the later it stops.
The default is Integer.MIN_VALUE .
Values can be negative.
See SmartLifeCycle . |
<int-websocket:inbound-channel-adapter>
Attributes
The following listing shows the attributes available for the <int-websocket:outbound-channel-adapter>
element:
<int-websocket:inbound-channel-adapter
id="" (1)
channel="" (2)
error-channel="" (3)
container="" (4)
default-protocol-handler="" (5)
protocol-handlers="" (6)
message-converters="" (7)
merge-with-default-converters="" (8)
send-timeout="" (9)
payload-type="" (10)
use-broker="" (11)
auto-startup="" (12)
phase=""/> (13)
1 | The component bean name.
If you do not set the channel attribute, a DirectChannel is created and registered in the application context with this id attribute as the bean name.
In this case, the endpoint is registered with the bean name id plus .adapter . |
2 | Identifies the channel attached to this adapter. |
3 | The MessageChannel bean reference to which the ErrorMessage instances should be sent. |
4 | See the same option on the <int-websocket:outbound-channel-adapter> . |
5 | See the same option on the <int-websocket:outbound-channel-adapter> . |
6 | See the same option on the <int-websocket:outbound-channel-adapter> . |
7 | See the same option on the <int-websocket:outbound-channel-adapter> . |
8 | See the same option on the <int-websocket:outbound-channel-adapter> . |
9 | Maximum amount of time (in milliseconds) to wait when sending a message to the channel if the channel can block.
For example, a QueueChannel can block until space is available if its maximum capacity has been reached. |
10 | Fully qualified name of the Java type for the target payload to convert from the incoming WebSocketMessage .
Defaults to java.lang.String . |
11 | Indicates whether this adapter sends non-MESSAGE WebSocketMessage instances and messages with broker destinations to the AbstractBrokerMessageHandler from the application context.
When this attribute is true , the Broker Relay configuration is required.
This attribute is used only on the server side.
On the client side, it is ignored.
Defaults to false . |
12 | See the same option on the <int-websocket:outbound-channel-adapter> . |
13 | See the same option on the <int-websocket:outbound-channel-adapter> . |
Using ClientStompEncoder
Starting with version 4.3.13, Spring Integration provides ClientStompEncoder
(as an extension of the standard StompEncoder
) for use on the client side of WebSocket channel adapters.
For proper client side message preparation, you must inject an instance of the ClientStompEncoder
into the StompSubProtocolHandler
.
One problem with the default StompSubProtocolHandler
is that it was designed for the server side, so it updates the SEND
stompCommand
header into MESSAGE
(as required by the STOMP protocol for the server side).
If the client does not send its messages in the proper SEND
web socket frame, some STOMP brokers do not accept them.
The purpose of the ClientStompEncoder
, in this case, is to override the stompCommand
header and set it to the SEND
value before encoding the message to the byte[]
.
Dynamic WebSocket Endpoints Registration
Starting with version 5.5, the WebSocket server endpoints (channel adapters based on a ServerWebSocketContainer
) can now be registered (and removed) at runtime - the paths
a ServerWebSocketContainer
is mapped is exposed via HandlerMapping
into a DispatcherServlet
and accessible for WebSocket clients.
The Dynamic and Runtime Integration Flows support helps to register these endpoints in a transparent manner:
@Autowired
IntegrationFlowContext integrationFlowContext;
@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
new ServerWebSocketContainer("/dynamic")
.setHandshakeHandler(this.handshakeHandler);
WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
new WebSocketInboundChannelAdapter(serverWebSocketContainer);
QueueChannel dynamicRequestsChannel = new QueueChannel();
IntegrationFlow serverFlow =
IntegrationFlow.from(webSocketInboundChannelAdapter)
.channel(dynamicRequestsChannel)
.get();
IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
this.integrationFlowContext.registration(serverFlow)
.addBean(serverWebSocketContainer)
.register();
...
dynamicServerFlow.destroy();
It is important to call .addBean(serverWebSocketContainer) on the dynamic flow registration to add the instance of ServerWebSocketContainer into an ApplicationContext for endpoint registration.
When a dynamic flow registration is destroyed, the associated ServerWebSocketContainer instance is destroyed, too, as well as the respective endpoint registration, including URL path mappings.
|
The dynamic Websocket endpoints can only be registered via Spring Integration mechanism: when regular Spring @EnableWebsocket is used, Spring Integration configuration backs off and no infrastructure for dynamic endpoints is registered.
|