public class RSocketInboundGateway extends MessagingGatewaySupport implements IntegrationRSocketEndpoint
MessagingGatewaySupport
implementation for the IntegrationRSocketEndpoint
.
Represents an inbound endpoint for RSocket requests.
May be configured with the AbstractRSocketConnector
for mapping registration.
Or existing AbstractRSocketConnector
bean(s) will perform detection automatically.
An inbound DataBuffer
(either single or as a Publisher
element) is
converted to the target expected type which can be configured by the
setRequestElementClass(java.lang.Class<?>)
or setRequestElementType(ResolvableType)
.
If it is not configured, then target type is determined by the contentType
header:
If it is a text
, then target type is String
, otherwise - byte[]
.
An inbound Publisher
is used as is in the message to send payload.
It is a target application responsibility to process that payload any possible way.
A reply payload is encoded to the Flux
according a type of the payload or a
Publisher
element type.
IntegrationManagement.ManagementOverrides
messagingTemplate
lifecycleCondition, lifecycleLock
EXPRESSION_PARSER, logger
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
DEFAULT_PHASE
Constructor and Description |
---|
RSocketInboundGateway(String... pathArg)
Instantiate based on the provided path patterns to map this endpoint for incoming RSocket requests.
|
Modifier and Type | Method and Description |
---|---|
protected void |
doStart()
Subclasses must implement this method with the start behavior.
|
RSocketInteractionModel[] |
getInteractionModels()
Obtain
RSocketInteractionModel s
this ReactiveMessageHandler is going to be mapped onto. |
String[] |
getPath()
Get an array of the path patterns this endpoint is mapped onto.
|
reactor.core.publisher.Mono<Void> |
handleMessage(Message<?> requestMessage) |
protected void |
onInit()
Subclasses may implement this for initialization logic.
|
void |
setDecodeFluxAsUnit(boolean decodeFluxAsUnit)
Configure an option to decode an incoming
Flux as a single unit or each its event separately. |
void |
setInteractionModels(RSocketInteractionModel... interactionModelsArg)
Configure a set of
RSocketInteractionModel this endpoint is mapped onto. |
void |
setRequestElementClass(Class<?> requestElementClass)
Specify a type of payload to be generated when the inbound RSocket request
content is read by the encoders.
|
void |
setRequestElementType(ResolvableType requestElementType)
Specify the type of payload to be generated when the inbound RSocket request
content is read by the converters/encoders.
|
void |
setRSocketConnector(AbstractRSocketConnector rsocketConnector)
Provide an
AbstractRSocketConnector reference for an explicit endpoint mapping. |
void |
setRSocketStrategies(RSocketStrategies rsocketStrategies)
Configure an
RSocketStrategies instead of a default one. |
buildErrorMessage, buildSendTimer, destroy, doStop, getComponentType, getErrorChannel, getErrorMessageAttributes, getIntegrationPatternType, getManagedName, getManagedType, getOverrides, getReplyChannel, getRequestChannel, isLoggingEnabled, receive, receive, receiveMessage, receiveMessage, registerMetricsCaptor, registerReplyMessageCorrelatorIfNecessary, send, sendAndReceive, sendAndReceiveMessage, sendAndReceiveMessageReactive, sendTimer, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setErrorOnTimeout, setLoggingEnabled, setManagedName, setManagedType, setReplyChannel, setReplyChannelName, setReplyMapper, setReplyTimeout, setRequestChannel, setRequestChannelName, setRequestMapper, setRequestTimeout, setShouldTrack
doStop, getPhase, getRole, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stop
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getThisAs
getBeanName, getComponentName
public RSocketInboundGateway(String... pathArg)
pathArg
- the mapping patterns to use.public void setRSocketStrategies(RSocketStrategies rsocketStrategies)
RSocketStrategies
instead of a default one.
Note: if AbstractRSocketConnector
is provided, then its
RSocketStrategies
have a precedence.rsocketStrategies
- the RSocketStrategies
to use.RSocketStrategies.builder()
public void setRSocketConnector(AbstractRSocketConnector rsocketConnector)
AbstractRSocketConnector
reference for an explicit endpoint mapping.rsocketConnector
- the AbstractRSocketConnector
to use.public void setInteractionModels(RSocketInteractionModel... interactionModelsArg)
RSocketInteractionModel
this endpoint is mapped onto.interactionModelsArg
- the RSocketInteractionModel
s for mapping.public RSocketInteractionModel[] getInteractionModels()
IntegrationRSocketEndpoint
RSocketInteractionModel
s
this ReactiveMessageHandler
is going to be mapped onto.
Defaults to all the RSocketInteractionModel
s.getInteractionModels
in interface IntegrationRSocketEndpoint
public String[] getPath()
getPath
in interface IntegrationRSocketEndpoint
public void setRequestElementClass(Class<?> requestElementClass)
byte[].class
.requestElementClass
- The payload type.public void setRequestElementType(ResolvableType requestElementType)
byte[].class
.requestElementType
- The payload type.public void setDecodeFluxAsUnit(boolean decodeFluxAsUnit)
Flux
as a single unit or each its event separately.
Defaults to false
for consistency with Spring Messaging @MessageMapping
.
The target Flux
decoding logic depends on the Decoder
selected.
For example a StringDecoder
requires a new line separator to
be present in the stream to indicate a byte buffer end.decodeFluxAsUnit
- decode incoming Flux
as a single unit or each event separately.Decoder.decode(Publisher, ResolvableType, MimeType, java.util.Map)
protected void onInit()
IntegrationObjectSupport
onInit
in class MessagingGatewaySupport
protected void doStart()
AbstractEndpoint
AbstractEndpoint.lifecycleLock
.doStart
in class MessagingGatewaySupport
public reactor.core.publisher.Mono<Void> handleMessage(Message<?> requestMessage)
handleMessage
in interface ReactiveMessageHandler