WebSockets

This part of the reference documentation covers support for reactive-stack WebSocket messaging.

Introduction to WebSocket

The WebSocket protocol, RFC 6455, provides a standardized way to establish a full-duplex, two-way communication channel between client and server over a single TCP connection. It is a different TCP protocol from HTTP but is designed to work over HTTP, using ports 80 and 443 and allowing re-use of existing firewall rules.

A WebSocket interaction begins with an HTTP request that uses the HTTP Upgrade header to upgrade or, in this case, to switch to the WebSocket protocol. The following example shows such an interaction:

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
1 The Upgrade header.
2 Using the Upgrade connection.

Instead of the usual 200 status code, a server with WebSocket support returns output similar to the following:

HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
1 Protocol switch

After a successful handshake, the TCP socket underlying the HTTP upgrade request remains open for both the client and the server to continue to send and receive messages.

A complete introduction of how WebSockets work is beyond the scope of this document. See RFC 6455, the WebSocket chapter of HTML5, or any of the many introductions and tutorials on the Web.

Note that, if a WebSocket server is running behind a web server (e.g. nginx), you likely need to configure it to pass WebSocket upgrade requests on to the WebSocket server. Likewise, if the application runs in a cloud environment, check the instructions of the cloud provider related to WebSocket support.

HTTP Versus WebSocket

Even though WebSocket is designed to be HTTP-compatible and starts with an HTTP request, it is important to understand that the two protocols lead to very different architectures and application programming models.

In HTTP and REST, an application is modeled as many URLs. To interact with the application, clients access those URLs, request-response style. Servers route requests to the appropriate handler based on the HTTP URL, method, and headers.

By contrast, in WebSockets, there is usually only one URL for the initial connect. Subsequently, all application messages flow on that same TCP connection. This points to an entirely different asynchronous, event-driven, messaging architecture.

WebSocket is also a low-level transport protocol, which, unlike HTTP, does not prescribe any semantics to the content of messages. That means that there is no way to route or process a message unless the client and the server agree on message semantics.

WebSocket clients and servers can negotiate the use of a higher-level, messaging protocol (for example, STOMP), through the Sec-WebSocket-Protocol header on the HTTP handshake request. In the absence of that, they need to come up with their own conventions.

When to Use WebSockets

WebSockets can make a web page be dynamic and interactive. However, in many cases, a combination of AJAX and HTTP streaming or long polling can provide a simple and effective solution.

For example, news, mail, and social feeds need to update dynamically, but it may be perfectly okay to do so every few minutes. Collaboration, games, and financial apps, on the other hand, need to be much closer to real-time.

Latency alone is not a deciding factor. If the volume of messages is relatively low (for example, monitoring network failures) HTTP streaming or polling can provide an effective solution. It is the combination of low latency, high frequency, and high volume that make the best case for the use of WebSocket.

Keep in mind also that over the Internet, restrictive proxies that are outside of your control may preclude WebSocket interactions, either because they are not configured to pass on the Upgrade header or because they close long-lived connections that appear idle. This means that the use of WebSocket for internal applications within the firewall is a more straightforward decision than it is for public facing applications.

WebSocket API

The Spring Framework provides a WebSocket API that you can use to write client- and server-side applications that handle WebSocket messages.

Server

To create a WebSocket server, you can first create a WebSocketHandler. The following example shows how to do so:

  • Java

  • Kotlin

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		// ...
	}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession

class MyWebSocketHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		// ...
	}
}

Then you can map it to a URL:

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public HandlerMapping handlerMapping() {
		Map<String, WebSocketHandler> map = new HashMap<>();
		map.put("/path", new MyWebSocketHandler());
		int order = -1; // before annotated controllers

		return new SimpleUrlHandlerMapping(map, order);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerMapping(): HandlerMapping {
		val map = mapOf("/path" to MyWebSocketHandler())
		val order = -1 // before annotated controllers

		return SimpleUrlHandlerMapping(map, order)
	}
}

If using the WebFlux Config there is nothing further to do, or otherwise if not using the WebFlux config you’ll need to declare a WebSocketHandlerAdapter as shown below:

  • Java

  • Kotlin

@Configuration
class WebConfig {

	// ...

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}
@Configuration
class WebConfig {

	// ...

	@Bean
	fun handlerAdapter() =  WebSocketHandlerAdapter()
}

WebSocketHandler

The handle method of WebSocketHandler takes WebSocketSession and returns Mono<Void> to indicate when application handling of the session is complete. The session is handled through two streams, one for inbound and one for outbound messages. The following table describes the two methods that handle the streams:

WebSocketSession method Description

Flux<WebSocketMessage> receive()

Provides access to the inbound message stream and completes when the connection is closed.

Mono<Void> send(Publisher<WebSocketMessage>)

Takes a source for outgoing messages, writes the messages, and returns a Mono<Void> that completes when the source completes and writing is done.

A WebSocketHandler must compose the inbound and outbound streams into a unified flow and return a Mono<Void> that reflects the completion of that flow. Depending on application requirements, the unified flow completes when:

  • Either the inbound or the outbound message stream completes.

  • The inbound stream completes (that is, the connection closed), while the outbound stream is infinite.

  • At a chosen point, through the close method of WebSocketSession.

When inbound and outbound message streams are composed together, there is no need to check if the connection is open, since Reactive Streams signals end activity. The inbound stream receives a completion or error signal, and the outbound stream receives a cancellation signal.

The most basic implementation of a handler is one that handles the inbound stream. The following example shows such an implementation:

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		return session.receive()			(1)
				.doOnNext(message -> {
					// ...					(2)
				})
				.concatMap(message -> {
					// ...					(3)
				})
				.then();					(4)
	}
}
1 Access the stream of inbound messages.
2 Do something with each message.
3 Perform nested asynchronous operations that use the message content.
4 Return a Mono<Void> that completes when receiving completes.
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		return session.receive()            (1)
				.doOnNext {
					// ...					(2)
				}
				.concatMap {
					// ...					(3)
				}
				.then()                     (4)
	}
}
1 Access the stream of inbound messages.
2 Do something with each message.
3 Perform nested asynchronous operations that use the message content.
4 Return a Mono<Void> that completes when receiving completes.
For nested, asynchronous operations, you may need to call message.retain() on underlying servers that use pooled data buffers (for example, Netty). Otherwise, the data buffer may be released before you have had a chance to read the data. For more background, see Data Buffers and Codecs.

The following implementation combines the inbound and outbound streams:

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Flux<WebSocketMessage> output = session.receive()				(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.map(value -> session.textMessage("Echo " + value));	(2)

		return session.send(output);									(3)
	}
}
1 Handle the inbound message stream.
2 Create the outbound message, producing a combined flow.
3 Return a Mono<Void> that does not complete while we continue to receive.
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val output = session.receive()                      (1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.map { session.textMessage("Echo $it") }    (2)

		return session.send(output)                         (3)
	}
}
1 Handle the inbound message stream.
2 Create the outbound message, producing a combined flow.
3 Return a Mono<Void> that does not complete while we continue to receive.

Inbound and outbound streams can be independent and be joined only for completion, as the following example shows:

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Mono<Void> input = session.receive()								(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.then();

		Flux<String> source = ... ;
		Mono<Void> output = session.send(source.map(session::textMessage));	(2)

		return Mono.zip(input, output).then();								(3)
	}
}
1 Handle inbound message stream.
2 Send outgoing messages.
3 Join the streams and return a Mono<Void> that completes when either stream ends.
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val input = session.receive()									(1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.then()

		val source: Flux<String> = ...
		val output = session.send(source.map(session::textMessage))		(2)

		return Mono.zip(input, output).then()							(3)
	}
}
1 Handle inbound message stream.
2 Send outgoing messages.
3 Join the streams and return a Mono<Void> that completes when either stream ends.

DataBuffer

DataBuffer is the representation for a byte buffer in WebFlux. The Spring Core part of the reference has more on that in the section on Data Buffers and Codecs. The key point to understand is that on some servers like Netty, byte buffers are pooled and reference counted, and must be released when consumed to avoid memory leaks.

When running on Netty, applications must use DataBufferUtils.retain(dataBuffer) if they wish to hold on input data buffers in order to ensure they are not released, and subsequently use DataBufferUtils.release(dataBuffer) when the buffers are consumed.

Handshake

WebSocketHandlerAdapter delegates to a WebSocketService. By default, that is an instance of HandshakeWebSocketService, which performs basic checks on the WebSocket request and then uses RequestUpgradeStrategy for the server in use. Currently, there is built-in support for Reactor Netty, Tomcat, Jetty, and Undertow.

HandshakeWebSocketService exposes a sessionAttributePredicate property that allows setting a Predicate<String> to extract attributes from the WebSession and insert them into the attributes of the WebSocketSession.

Server Configuration

The RequestUpgradeStrategy for each server exposes configuration specific to the underlying WebSocket server engine. When using the WebFlux Java config you can customize such properties as shown in the corresponding section of the WebFlux Config, or otherwise if not using the WebFlux config, use the below:

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter(webSocketService());
	}

	@Bean
	public WebSocketService webSocketService() {
		TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
		strategy.setMaxSessionIdleTimeout(0L);
		return new HandshakeWebSocketService(strategy);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerAdapter() =
			WebSocketHandlerAdapter(webSocketService())

	@Bean
	fun webSocketService(): WebSocketService {
		val strategy = TomcatRequestUpgradeStrategy().apply {
			setMaxSessionIdleTimeout(0L)
		}
		return HandshakeWebSocketService(strategy)
	}
}

Check the upgrade strategy for your server to see what options are available. Currently, only Tomcat and Jetty expose such options.

CORS

The easiest way to configure CORS and restrict access to a WebSocket endpoint is to have your WebSocketHandler implement CorsConfigurationSource and return a CorsConfiguration with allowed origins, headers, and other details. If you cannot do that, you can also set the corsConfigurations property on the SimpleUrlHandler to specify CORS settings by URL pattern. If both are specified, they are combined by using the combine method on CorsConfiguration.

Client

Spring WebFlux provides a WebSocketClient abstraction with implementations for Reactor Netty, Tomcat, Jetty, Undertow, and standard Java (that is, JSR-356).

The Tomcat client is effectively an extension of the standard Java one with some extra functionality in the WebSocketSession handling to take advantage of the Tomcat-specific API to suspend receiving messages for back pressure.

To start a WebSocket session, you can create an instance of the client and use its execute methods:

  • Java

  • Kotlin

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
		session.receive()
				.doOnNext(System.out::println)
				.then());
val client = ReactorNettyWebSocketClient()

		val url = URI("ws://localhost:8080/path")
		client.execute(url) { session ->
			session.receive()
					.doOnNext(::println)
			.then()
		}

Some clients, such as Jetty, implement Lifecycle and need to be stopped and started before you can use them. All clients have constructor options related to configuration of the underlying WebSocket client.