Class IntegrationReactiveUtils
java.lang.Object
org.springframework.integration.util.IntegrationReactiveUtils
Utilities for adapting integration components to/from reactive types.
- Since:
- 5.3
- Author:
- Artem Bilan
-
Field Summary
Modifier and TypeFieldDescriptionstatic final Duration
A default delay before repeating an empty sourceMono
as 1 secondDuration
.static final String
The subscriber context entry forFlux.delayElements(java.time.Duration)
from theMono.repeatWhenEmpty(java.util.function.Function)
.static final boolean
The indicator thatio.micrometer:context-propagation
library is on classpath. -
Method Summary
Modifier and TypeMethodDescriptionstatic reactor.util.context.ContextView
Capture a ReactorContextView
from the current thread local state according to theContextSnapshotFactory
logic.static <T> reactor.core.publisher.Flux<Message<T>>
messageChannelToFlux
(MessageChannel messageChannel) Adapt a providedMessageChannel
into aFlux
source: - aFluxMessageChannel
is returned as is because it is already aPublisher
; - aSubscribableChannel
is subscribed with aMessageHandler
for theSinks.Many.tryEmitNext(Object)
which is returned from this method; - aPollableChannel
is wrapped into aMessageSource
lambda and reusesmessageSourceToFlux(MessageSource)
.static <T> reactor.core.publisher.Flux<Message<T>>
messageSourceToFlux
(MessageSource<T> messageSource) Wrap a providedMessageSource
into aFlux
for pulling the on demand.static AutoCloseable
setThreadLocalsFromReactorContext
(reactor.util.context.ContextView context) Populate thread local variables from the provided ReactorContextView
according to theContextSnapshotFactory
logic.
-
Field Details
-
DELAY_WHEN_EMPTY_KEY
The subscriber context entry forFlux.delayElements(java.time.Duration)
from theMono.repeatWhenEmpty(java.util.function.Function)
.- See Also:
-
DEFAULT_DELAY_WHEN_EMPTY
A default delay before repeating an empty sourceMono
as 1 secondDuration
. -
isContextPropagationPresent
public static final boolean isContextPropagationPresentThe indicator thatio.micrometer:context-propagation
library is on classpath.- Since:
- 6.2.5
-
-
Method Details
-
captureReactorContext
public static reactor.util.context.ContextView captureReactorContext()Capture a ReactorContextView
from the current thread local state according to theContextSnapshotFactory
logic. If noio.micrometer:context-propagation
library is on classpath, theContext.empty()
is returned.- Returns:
- the Reactor
ContextView
from the current thread local state orContext.empty()
. - Since:
- 6.2.5
-
setThreadLocalsFromReactorContext
@Nullable public static AutoCloseable setThreadLocalsFromReactorContext(reactor.util.context.ContextView context) Populate thread local variables from the provided ReactorContextView
according to theContextSnapshotFactory
logic.- Parameters:
context
- the ReactorContextView
to populate from.- Returns:
- the
ContextSnapshot.Scope
as aAutoCloseable
to not pollute the target classpath. Can be cast if necessary. Or null if there is noio.micrometer:context-propagation
library is on classpath. - Since:
- 6.2.5
-
messageSourceToFlux
public static <T> reactor.core.publisher.Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageSource) Wrap a providedMessageSource
into aFlux
for pulling the on demand. WhenMessageSource.receive()
returnsnull
, the sourceMono
goes to theMono.repeatWhenEmpty(java.util.function.Function<reactor.core.publisher.Flux<java.lang.Long>, ? extends org.reactivestreams.Publisher<?>>)
state and performs adelay
based on theDELAY_WHEN_EMPTY_KEY
Duration
entry in the subscriber context or falls back to 1 second duration. If a produced message has anIntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
header it is ack'ed in theMono.doOnSuccess(java.util.function.Consumer<? super T>)
and nack'ed in theMono.doOnError(java.util.function.Consumer<? super java.lang.Throwable>)
.- Type Parameters:
T
- the expected payload type.- Parameters:
messageSource
- theMessageSource
to adapt.- Returns:
- a
Flux
which pulls messages from theMessageSource
on demand.
-
messageChannelToFlux
public static <T> reactor.core.publisher.Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel) Adapt a providedMessageChannel
into aFlux
source: - aFluxMessageChannel
is returned as is because it is already aPublisher
; - aSubscribableChannel
is subscribed with aMessageHandler
for theSinks.Many.tryEmitNext(Object)
which is returned from this method; - aPollableChannel
is wrapped into aMessageSource
lambda and reusesmessageSourceToFlux(MessageSource)
.- Type Parameters:
T
- the expected payload type.- Parameters:
messageChannel
- theMessageChannel
to adapt.- Returns:
- a
Flux
which uses a providedMessageChannel
as a source for events to publish.
-