public class RSocketOutboundGateway extends AbstractReplyProducingMessageHandler
RSocketRequester
, which can be obtained from the
ClientRSocketConnector
on the client side or from the
RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
request message header
on the server side.
An RSocket operation is determined by the configured RSocketInteractionModel
or respective SpEL
expression to be evaluated at runtime against the request message.
By default the RSocketInteractionModel.requestResponse
operation is used.
For a Publisher
-based requests, it must be present in the request message payload
.
The flattening via upstream FluxMessageChannel
will work, too,
but this way we will lose a scope of particular request and every Publisher
event
will be send in its own plain request.
If reply is a Flux
, it is wrapped to the Mono
to retain a request scope.
The downstream flow is responsible to obtain this Flux
from a message payload
and subscribe to it by itself. The Mono
reply from this component is subscribed from the downstream
FluxMessageChannel
or it is adapted to the
ListenableFuture
otherwise.
RSocketInteractionModel
,
RSocketRequester
AbstractReplyProducingMessageHandler.RequestHandler
IntegrationManagement.ManagementOverrides
messagingTemplate
EXPRESSION_PARSER, logger
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCE
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
Constructor and Description |
---|
RSocketOutboundGateway(Expression routeExpression)
Instantiate based on the provided SpEL expression to evaluate an RSocket endpoint
route
at runtime against a request message. |
RSocketOutboundGateway(String route,
Object... routeVariables)
Instantiate based on the provided RSocket endpoint
route
and optional variables to expand route template. |
Modifier and Type | Method and Description |
---|---|
protected void |
doInit() |
protected Object |
handleRequestMessage(Message<?> requestMessage)
Subclasses must implement this method to handle the request Message.
|
void |
setClientRSocketConnector(ClientRSocketConnector clientRSocketConnector)
Configure a
ClientRSocketConnector for client side requests based on the connection
provided by the ClientRSocketConnector.getRequester() . |
void |
setExpectedResponseType(Class<?> expectedResponseType)
Specify an response type for the RSocket response.
|
void |
setExpectedResponseTypeExpression(Expression expectedResponseTypeExpression)
Specify an
Expression to determine the type for the RSocket response. |
void |
setInteractionModel(RSocketInteractionModel interactionModel)
Configure an
RSocketInteractionModel for the RSocket request type. |
void |
setInteractionModelExpression(Expression interactionModelExpression)
Configure a SpEL expression to evaluate an
RSocketInteractionModel
for the RSocket request type at runtime against a request message. |
void |
setMetadataExpression(Expression metadataExpression)
Specify a SpEL expression to evaluate a metadata for the RSocket request
as
Map<Object, MimeType> against a request message. |
void |
setPublisherElementType(Class<?> publisherElementType)
Configure a type for a request
Publisher elements. |
void |
setPublisherElementTypeExpression(Expression publisherElementTypeExpression)
Configure a SpEL expression to evaluate a request
Publisher elements type at runtime against
a request message. |
doInvokeAdvisedRequestHandler, getBeanClassLoader, getIntegrationPatternType, getRequiresReply, handleMessageInternal, hasAdviceChain, onInit, setAdviceChain, setBeanClassLoader, setRequiresReply
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, shouldCopyRequestHeaders, shouldSplitOutput, updateNotPropagatedHeaders
handleMessage, onComplete, onError, onNext, onSubscribe
buildSendTimer, destroy, getComponentType, getManagedName, getManagedType, getMetricsCaptor, getOrder, getOverrides, isLoggingEnabled, registerMetricsCaptor, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrack
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getThisAs
getBeanName, getComponentName
public RSocketOutboundGateway(String route, @Nullable Object... routeVariables)
route
and optional variables to expand route template.route
- the RSocket endpoint route to use.routeVariables
- the variables to expand route template.public RSocketOutboundGateway(Expression routeExpression)
route
at runtime against a request message.
If route is a template and variables expansion is required, it is recommended to do that
in this expression evaluation, for example using some bean with an appropriate logic.routeExpression
- the SpEL expression to use.public void setClientRSocketConnector(ClientRSocketConnector clientRSocketConnector)
ClientRSocketConnector
for client side requests based on the connection
provided by the ClientRSocketConnector.getRequester()
.
In case of server side, an RSocketRequester
must be provided in the
RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
header of request message.clientRSocketConnector
- the ClientRSocketConnector
to use.public void setInteractionModel(RSocketInteractionModel interactionModel)
RSocketInteractionModel
for the RSocket request type.interactionModel
- the RSocketInteractionModel
to use.public void setInteractionModelExpression(Expression interactionModelExpression)
RSocketInteractionModel
for the RSocket request type at runtime against a request message.interactionModelExpression
- the SpEL expression to use.public void setPublisherElementType(Class<?> publisherElementType)
Publisher
elements.publisherElementType
- the type of the request Publisher
elements.RSocketRequester.RequestSpec#data(Object, Class)
public void setPublisherElementTypeExpression(Expression publisherElementTypeExpression)
Publisher
elements type at runtime against
a request message.publisherElementTypeExpression
- the expression to evaluate a type for the request
Publisher
elements.RSocketRequester.RequestSpec#data
public void setExpectedResponseType(Class<?> expectedResponseType)
expectedResponseType
- The expected type.setExpectedResponseTypeExpression(Expression)
,
RSocketRequester.RequestSpec#retrieveMono
,
RSocketRequester.RequestSpec#retrieveFlux
public void setExpectedResponseTypeExpression(Expression expectedResponseTypeExpression)
Expression
to determine the type for the RSocket response.expectedResponseTypeExpression
- The expected response type expression.RSocketRequester.RequestSpec#retrieveMono
,
RSocketRequester.RequestSpec#retrieveFlux
public void setMetadataExpression(Expression metadataExpression)
Map<Object, MimeType>
against a request message.metadataExpression
- the expression for metadata.protected void doInit()
doInit
in class AbstractReplyProducingMessageHandler
protected Object handleRequestMessage(Message<?> requestMessage)
AbstractReplyProducingMessageHandler
handleRequestMessage
in class AbstractReplyProducingMessageHandler
requestMessage
- The request message.null
.