public final class IntegrationReactiveUtils extends Object
| Modifier and Type | Field and Description | 
|---|---|
static java.time.Duration | 
DEFAULT_DELAY_WHEN_EMPTY
A default delay before repeating an empty source  
Mono as 1 second Duration. | 
static String | 
DELAY_WHEN_EMPTY_KEY
The subscriber context entry for  
Flux.delayElements(java.time.Duration)
 from the Mono.repeatWhenEmpty(java.util.function.Function). | 
| Modifier and Type | Method and Description | 
|---|---|
static <T> reactor.core.publisher.Flux<Message<T>> | 
messageChannelToFlux(MessageChannel messageChannel)
Adapt a provided  
MessageChannel into a Flux source:
 - a FluxMessageChannel
 is returned as is because it is already a Publisher;
 - a SubscribableChannel is subscribed with a MessageHandler
 for the Sinks.Many#tryEmitNext(Object) which is returned from this method;
 - a PollableChannel is wrapped into a MessageSource lambda and reuses
 messageSourceToFlux(MessageSource). | 
static <T> reactor.core.publisher.Flux<Message<T>> | 
messageSourceToFlux(MessageSource<T> messageSource)
Wrap a provided  
MessageSource into a Flux for pulling the on demand. | 
public static final String DELAY_WHEN_EMPTY_KEY
Flux.delayElements(java.time.Duration)
 from the Mono.repeatWhenEmpty(java.util.function.Function).public static final java.time.Duration DEFAULT_DELAY_WHEN_EMPTY
Mono as 1 second Duration.public static <T> reactor.core.publisher.Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageSource)
MessageSource into a Flux for pulling the on demand.
 When MessageSource.receive() returns null, the source Mono
 goes to the Mono.repeatWhenEmpty(java.util.function.Function<reactor.core.publisher.Flux<java.lang.Long>, ? extends org.reactivestreams.Publisher<?>>) state and performs a delay
 based on the DELAY_WHEN_EMPTY_KEY Duration entry in the subscriber context
 or falls back to 1 second duration.
 If a produced message has an
 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK header
 it is ack'ed in the Mono.doOnSuccess(java.util.function.Consumer<? super T>) and nack'ed in the Mono.doOnError(java.util.function.Consumer<? super java.lang.Throwable>).T - the expected payload type.messageSource - the MessageSource to adapt.Flux which pulls messages from the MessageSource on demand.public static <T> reactor.core.publisher.Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel)
MessageChannel into a Flux source:
 - a FluxMessageChannel
 is returned as is because it is already a Publisher;
 - a SubscribableChannel is subscribed with a MessageHandler
 for the Sinks.Many#tryEmitNext(Object) which is returned from this method;
 - a PollableChannel is wrapped into a MessageSource lambda and reuses
 messageSourceToFlux(MessageSource).T - the expected payload type.messageChannel - the MessageChannel to adapt.Flux which uses a provided MessageChannel as a source for events to publish.