public final class IntegrationReactiveUtils extends Object
Modifier and Type | Field and Description |
---|---|
static java.time.Duration |
DEFAULT_DELAY_WHEN_EMPTY
A default delay before repeating an empty source
Mono as 1 second Duration . |
static String |
DELAY_WHEN_EMPTY_KEY
The subscriber context entry for
Flux.delayElements(java.time.Duration)
from the Mono.repeatWhenEmpty(java.util.function.Function) . |
Modifier and Type | Method and Description |
---|---|
static <T> reactor.core.publisher.Flux<Message<T>> |
messageChannelToFlux(MessageChannel messageChannel)
Adapt a provided
MessageChannel into a Flux source:
- a FluxMessageChannel
is returned as is because it is already a Publisher ;
- a SubscribableChannel is subscribed with a MessageHandler
for the Sinks.Many#tryEmitNext(Object) which is returned from this method;
- a PollableChannel is wrapped into a MessageSource lambda and reuses
messageSourceToFlux(MessageSource) . |
static <T> reactor.core.publisher.Flux<Message<T>> |
messageSourceToFlux(MessageSource<T> messageSource)
Wrap a provided
MessageSource into a Flux for pulling the on demand. |
public static final String DELAY_WHEN_EMPTY_KEY
Flux.delayElements(java.time.Duration)
from the Mono.repeatWhenEmpty(java.util.function.Function)
.public static final java.time.Duration DEFAULT_DELAY_WHEN_EMPTY
Mono
as 1 second Duration
.public static <T> reactor.core.publisher.Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageSource)
MessageSource
into a Flux
for pulling the on demand.
When MessageSource.receive()
returns null
, the source Mono
goes to the Mono.repeatWhenEmpty(java.util.function.Function<reactor.core.publisher.Flux<java.lang.Long>, ? extends org.reactivestreams.Publisher<?>>)
state and performs a delay
based on the DELAY_WHEN_EMPTY_KEY
Duration
entry in the subscriber context
or falls back to 1 second duration.
If a produced message has an
IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
header
it is ack'ed in the Mono.doOnSuccess(java.util.function.Consumer<? super T>)
and nack'ed in the Mono.doOnError(java.util.function.Consumer<? super java.lang.Throwable>)
.T
- the expected payload type.messageSource
- the MessageSource
to adapt.Flux
which pulls messages from the MessageSource
on demand.public static <T> reactor.core.publisher.Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel)
MessageChannel
into a Flux
source:
- a FluxMessageChannel
is returned as is because it is already a Publisher
;
- a SubscribableChannel
is subscribed with a MessageHandler
for the Sinks.Many#tryEmitNext(Object)
which is returned from this method;
- a PollableChannel
is wrapped into a MessageSource
lambda and reuses
messageSourceToFlux(MessageSource)
.T
- the expected payload type.messageChannel
- the MessageChannel
to adapt.Flux
which uses a provided MessageChannel
as a source for events to publish.