public class RSocketOutboundGateway extends AbstractReplyProducingMessageHandler
RSocketRequester
, which can be obtained from the
ClientRSocketConnector
on the client side or from the
RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
request message header
on the server side.
An RSocket operation is determined by the configured RSocketOutboundGateway.Command
or respective SpEL
expression to be evaluated at runtime against the request message.
By default the RSocketOutboundGateway.Command.requestResponse
operation is used.
For a Publisher
-based requests, it must be present in the request message payload
.
The flattening via upstream FluxMessageChannel
will work, too,
but this way we will lose a scope of particular request and every Publisher
event
will be send in its own plain request.
If reply is a Flux
, it is wrapped to the Mono
to retain a request scope.
The downstream flow is responsible to obtain this Flux
from a message payload
and subscribe to it by itself. The Mono
reply from this component is subscribed from the downstream
FluxMessageChannel
or it is adapted to the
ListenableFuture
otherwise.
RSocketOutboundGateway.Command
,
RSocketRequester
Modifier and Type | Class and Description |
---|---|
static class |
RSocketOutboundGateway.Command
Enumeration of commands supported by the gateways.
|
AbstractReplyProducingMessageHandler.RequestHandler
IntegrationManagement.ManagementOverrides
messagingTemplate
EXPRESSION_PARSER, logger
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCE
Constructor and Description |
---|
RSocketOutboundGateway(Expression routeExpression)
Instantiate based on the provided SpEL expression to evaluate an RSocket endpoint
route
at runtime against a request message. |
RSocketOutboundGateway(String route)
Instantiate based on the provided RSocket endpoint
route . |
Modifier and Type | Method and Description |
---|---|
protected void |
doInit() |
protected Object |
handleRequestMessage(Message<?> requestMessage)
Subclasses must implement this method to handle the request Message.
|
void |
setClientRSocketConnector(ClientRSocketConnector clientRSocketConnector)
Configure a
ClientRSocketConnector for client side requests based on the connection
provided by the ClientRSocketConnector.getRSocketRequester() . |
void |
setCommand(RSocketOutboundGateway.Command command)
Configure a
RSocketOutboundGateway.Command for RSocket request type. |
void |
setCommandExpression(Expression commandExpression)
Configure a SpEL expression to evaluate a
RSocketOutboundGateway.Command for RSocket request type at runtime
against a request message. |
void |
setExpectedResponseType(Class<?> expectedResponseType)
Specify the expected response type for the RSocket response.
|
void |
setExpectedResponseTypeExpression(Expression expectedResponseTypeExpression)
Specify the
Expression to determine the type for the RSocket response. |
void |
setPublisherElementType(Class<?> publisherElementType)
Configure a type for a request
Publisher elements. |
void |
setPublisherElementTypeExpression(Expression publisherElementTypeExpression)
Configure a SpEL expression to evaluate a request
Publisher elements type at runtime against
a request message. |
doInvokeAdvisedRequestHandler, getBeanClassLoader, getRequiresReply, handleMessageInternal, hasAdviceChain, onInit, setAdviceChain, setBeanClassLoader, setRequiresReply
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, shouldCopyRequestHeaders, shouldSplitOutput, updateNotPropagatedHeaders
configureMetrics, destroy, getActiveCount, getActiveCountLong, getComponentType, getDuration, getErrorCount, getErrorCountLong, getHandleCount, getHandleCountLong, getManagedName, getManagedType, getMaxDuration, getMeanDuration, getMinDuration, getOrder, getOverrides, getStandardDeviationDuration, handleMessage, isCountsEnabled, isLoggingEnabled, isStatsEnabled, onComplete, onError, onNext, onSubscribe, registerMetricsCaptor, reset, setCountsEnabled, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, setStatsEnabled
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
errorCount, handleCount
getBeanName, getComponentName
public RSocketOutboundGateway(String route)
route
.route
- the RSocket endpoint route to use.public RSocketOutboundGateway(Expression routeExpression)
route
at runtime against a request message.routeExpression
- the SpEL expression to use.public void setClientRSocketConnector(ClientRSocketConnector clientRSocketConnector)
ClientRSocketConnector
for client side requests based on the connection
provided by the ClientRSocketConnector.getRSocketRequester()
.
In case of server side, an RSocketRequester
must be provided in the
RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
header of request message.clientRSocketConnector
- the ClientRSocketConnector
to use.public void setCommand(RSocketOutboundGateway.Command command)
RSocketOutboundGateway.Command
for RSocket request type.command
- the RSocketOutboundGateway.Command
to use.public void setCommandExpression(Expression commandExpression)
RSocketOutboundGateway.Command
for RSocket request type at runtime
against a request message.commandExpression
- the SpEL expression to use.public void setPublisherElementType(Class<?> publisherElementType)
Publisher
elements.publisherElementType
- the type of the request Publisher
elements.RSocketRequester.RequestSpec#data(Publisher, Class)
public void setPublisherElementTypeExpression(Expression publisherElementTypeExpression)
Publisher
elements type at runtime against
a request message.publisherElementTypeExpression
- the expression to evaluate a type for the request
Publisher
elements.RSocketRequester.RequestSpec#data
public void setExpectedResponseType(Class<?> expectedResponseType)
expectedResponseType
- The expected type.setExpectedResponseTypeExpression(Expression)
,
RSocketRequester.ResponseSpec#retrieveMono
,
RSocketRequester.ResponseSpec#retrieveFlux
public void setExpectedResponseTypeExpression(Expression expectedResponseTypeExpression)
Expression
to determine the type for the RSocket response.expectedResponseTypeExpression
- The expected response type expression.RSocketRequester.ResponseSpec#retrieveMono
,
RSocketRequester.ResponseSpec#retrieveFlux
protected void doInit()
doInit
in class AbstractReplyProducingMessageHandler
protected Object handleRequestMessage(Message<?> requestMessage)
AbstractReplyProducingMessageHandler
handleRequestMessage
in class AbstractReplyProducingMessageHandler
requestMessage
- The request message.null
.