RSocket Support
The RSocket Spring Integration module (spring-integration-rsocket
) allows for executions of RSocket application protocol.
You need to include this dependency into your project:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>6.0.1</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:6.0.1"
This module is available starting with version 5.2 and is based on the Spring Messaging foundation with its RSocket component implementations, such as RSocketRequester
, RSocketMessageHandler
and RSocketStrategies
.
See Spring Framework RSocket Support for more information about the RSocket protocol, terminology and components.
Before starting an integration flow processing via channel adapters, we need to establish an RSocket connection between server and client.
For this purpose, Spring Integration RSocket support provides the ServerRSocketConnector
and ClientRSocketConnector
implementations of the AbstractRSocketConnector
.
The ServerRSocketConnector
exposes a listener on the host and port according to provided io.rsocket.transport.ServerTransport
for accepting connections from clients.
An internal RSocketServer
instance can be customized with the setServerConfigurer()
, as well as other options that can be configured, e.g. RSocketStrategies
and MimeType
for payload data and headers metadata.
When a setupRoute
is provided from the client requester (see ClientRSocketConnector
below), a connected client is stored as a RSocketRequester
under the key determined by the clientRSocketKeyStrategy
BiFunction<Map<String, Object>, DataBuffer, Object>
.
By default, a connection data is used for the key as a converted value to string with UTF-8 charset.
Such an RSocketRequester
registry can be used in the application logic to determine a particular client connection for interaction with it, or for publishing the same message to all connected clients.
When a connection is established from the client, an RSocketConnectedEvent
is emitted from the ServerRSocketConnector
.
This is similar to what is provided by the @ConnectMapping
annotation in Spring Messaging module.
The mapping pattern *
means accept all the client routes.
The RSocketConnectedEvent
can be used to distinguish different routes via DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER
header.
A typical server configuration might look like this:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}
@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}
All the options, including RSocketStrategies
bean and @EventListener
for RSocketConnectedEvent
, are optional.
See ServerRSocketConnector
JavaDocs for more information.
Starting with version 5.2.1, the ServerRSocketMessageHandler
is extracted to a public, top-level class for possible connection with an existing RSocket server.
When a ServerRSocketConnector
is supplied with an external instance of ServerRSocketMessageHandler
, it doesn’t create an RSocket server internally and just delegates all the handling logic to the provided instance.
In addition, the ServerRSocketMessageHandler
can be configured with a messageMappingCompatible
flag to handle also @MessageMapping
for an RSocket controller, fully replacing the functionality provided by the standard RSocketMessageHandler
.
This can be useful in mixed configurations, when classic @MessageMapping
methods are present in the same application along with RSocket channel adapters and an externally configured RSocket server is present in the application.
The ClientRSocketConnector
serves as a holder for RSocketRequester
based on the RSocket
connected via the provided ClientTransport
.
The RSocketConnector
can be customized with the provided RSocketConnectorConfigurer
.
The setupRoute
(with optional templates variables) and setupData
with metadata can be also configured on this component.
A typical client configuration might look like this:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}
Most of these options (including RSocketStrategies
bean) are optional.
Note how we connect to the locally started RSocket server on the arbitrary port.
See ServerRSocketConnector.clientRSocketKeyStrategy
for setupData
use cases.
Also see ClientRSocketConnector
and its AbstractRSocketConnector
superclass JavaDocs for more information.
Both ClientRSocketConnector
and ServerRSocketConnector
are responsible for mapping inbound channel adapters to their path
configuration for routing incoming RSocket requests.
See the next section for more information.
RSocket Inbound Gateway
The RSocketInboundGateway
is responsible for receiving RSocket requests and producing responses (if any).
It requires an array of path
mapping which could be as patterns similar to MVC request mapping or @MessageMapping
semantics.
In addition, (since version 5.2.2), a set of interaction models (see RSocketInteractionModel
) can be configured on the RSocketInboundGateway
to restrict RSocket requests to this endpoint by the particular frame type.
By default, all the interaction models are supported.
Such a bean, according its IntegrationRSocketEndpoint
implementation (extension of a ReactiveMessageHandler
), is auto detected either by the ServerRSocketConnector
or ClientRSocketConnector
for a routing logic in the internal IntegrationRSocketMessageHandler
for incoming requests.
An AbstractRSocketConnector
can be provided to the RSocketInboundGateway
for explicit endpoint registration.
This way, the auto-detection option is disabled on that AbstractRSocketConnector
.
The RSocketStrategies
can also be injected into the RSocketInboundGateway
or they are obtained from the provided AbstractRSocketConnector
overriding any explicit injection.
Decoders are used from those RSocketStrategies
to decode a request payload according to the provided requestElementType
.
If an RSocketPayloadReturnValueHandler.RESPONSE_HEADER
header is not provided in incoming the Message
, the RSocketInboundGateway
treats a request as a fireAndForget
RSocket interaction model.
In this case, an RSocketInboundGateway
performs a plain send
operation into the outputChannel
.
Otherwise, a MonoProcessor
value from the RSocketPayloadReturnValueHandler.RESPONSE_HEADER
header is used for sending a reply to the RSocket.
For this purpose, an RSocketInboundGateway
performs a sendAndReceiveMessageReactive
operation on the outputChannel
.
The payload
of the message to send downstream is always a Flux
according to MessagingRSocket
logic.
When in a fireAndForget
RSocket interaction model, the message has a plain converted payload
.
The reply payload
could be a plain object or a Publisher
- the RSocketInboundGateway
converts both of them properly into an RSocket response according to the encoders provided in the RSocketStrategies
.
Starting with version 5.3, a decodeFluxAsUnit
option (default false
) is added to the RSocketInboundGateway
.
By default, incoming Flux
is transformed the way that each its event is decoded separately.
This is an exact behavior present currently with @MessageMapping
semantics.
To restore a previous behavior or decode the whole Flux
as single unit according application requirements, the decodeFluxAsUnit
has to be set to true
.
However the target decoding logic depends on the Decoder
selected, e.g. a StringDecoder
requires a new line separator (by default) to be present in the stream to indicate a byte buffer end.
See Configuring RSocket Endpoints with Java for samples how to configure an RSocketInboundGateway
endpoint and deal with payloads downstream.
RSocket Outbound Gateway
The RSocketOutboundGateway
is an AbstractReplyProducingMessageHandler
to perform requests into RSocket and produce replies based on the RSocket replies (if any).
A low level RSocket protocol interaction is delegated into an RSocketRequester
resolved from the provided ClientRSocketConnector
or from the RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
header in the request message on the server side.
A target RSocketRequester
on the server side can be resolved from an RSocketConnectedEvent
or using ServerRSocketConnector.getClientRSocketRequester()
API according some business key selected for connect request mappings via ServerRSocketConnector.setClientRSocketKeyStrategy()
.
See ServerRSocketConnector
JavaDocs for more information.
The route
to send request has to be configured explicitly (together with path variables) or via a SpEL expression which is evaluated against request message.
The RSocket interaction model can be provided via RSocketInteractionModel
option or respective expression setting.
By default, a requestResponse
is used for common gateway use-cases.
When request message payload is a Publisher
, a publisherElementType
option can be provided to encode its elements according an RSocketStrategies
supplied in the target RSocketRequester
.
An expression for this option can evaluate to a ParameterizedTypeReference
.
See the RSocketRequester.RequestSpec.data()
JavaDocs for more information about data and its type.
An RSocket request can also be enhanced with a metadata
.
For this purpose a metadataExpression
against request message can be configured on the RSocketOutboundGateway
.
Such an expression must evaluate to a Map<Object, MimeType>
.
When interactionModel
is not fireAndForget
, an expectedResponseType
must be supplied.
It is a String.class
by default.
An expression for this option can evaluate to a ParameterizedTypeReference
.
See the RSocketRequester.RetrieveSpec.retrieveMono()
and RSocketRequester.RetrieveSpec.retrieveFlux()
JavaDocs for more information about reply data and its type.
A reply payload
from the RSocketOutboundGateway
is a Mono
(even for a fireAndForget
interaction model it is Mono<Void>
) always making this component as async
.
Such a Mono
is subscribed before producing into the outputChannel
for regular channels or processed on demand by the FluxMessageChannel
.
A Flux
response for the requestStream
or requestChannel
interaction model is also wrapped into a reply Mono
.
It can be flattened downstream by the FluxMessageChannel
with a passthrough service activator:
@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}
Or subscribed explicitly in the target application logic.
The expected response type can also be configured (or evaluated via expression) to void
treating this gateway as an outbound channel adapter.
However, the outputChannel
still has to be configured (even if it just a NullChannel
) to initiate a subscription to the returned Mono
.
See Configuring RSocket Endpoints with Java for samples how to configure an RSocketOutboundGateway
endpoint a deal with payloads downstream.
RSocket Namespace Support
Spring Integration provides an rsocket
namespace and the corresponding schema definition.
To include it in your configuration, add the following namespace declaration in your application context configuration file:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>
Inbound
To configure Spring Integration RSocket inbound channel adapters with XML, you need to use an appropriate inbound-gateway
components from the int-rsocket
namespace.
The following example shows how to configure it:
<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
A ClientRSocketConnector
and ServerRSocketConnector
should be configured as generic <bean>
definitions.
Outbound
<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>
See spring-integration-rsocket.xsd
for description for all those XML attributes.
Configuring RSocket Endpoints with Java
The following example shows how to configure an RSocket inbound endpoint with Java:
@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}
@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}
A ClientRSocketConnector
or ServerRSocketConnector
is assumed in this configuration with meaning for auto-detection of such an endpoint on the “echo” path.
Pay attention to the @Transformer
signature with its fully reactive processing of the RSocket requests and producing reactive replies.
The following example shows how to configure a RSocket inbound gateway with the Java DSL:
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlow
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}
A ClientRSocketConnector
or ServerRSocketConnector
is assumed in this configuration with meaning for auto-detection of such an endpoint on the “/uppercase” path and expected interaction model as “request channel”.
The following example shows how to configure a RSocket outbound gateway with Java:
@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}
The setClientRSocketConnector()
is required only for the client side.
On the server side, the RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
header with an RSocketRequester
value must be supplied in the request message.
The following example shows how to configure a RSocket outbound gateway with the Java DSL:
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlow
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
See IntegrationFlow
as a Gateway for more information how to use a mentioned Function
interface in the beginning of the flow above.