Class IntegrationReactiveUtils

java.lang.Object
org.springframework.integration.util.IntegrationReactiveUtils

public final class IntegrationReactiveUtils extends Object
Utilities for adapting integration components to/from reactive types.
Since:
5.3
Author:
Artem Bilan
  • Field Details

    • DELAY_WHEN_EMPTY_KEY

      public static final String DELAY_WHEN_EMPTY_KEY
      The subscriber context entry for Flux.delayElements(java.time.Duration) from the Mono.repeatWhenEmpty(java.util.function.Function).
      See Also:
    • DEFAULT_DELAY_WHEN_EMPTY

      public static final Duration DEFAULT_DELAY_WHEN_EMPTY
      A default delay before repeating an empty source Mono as 1 second Duration.
    • isContextPropagationPresent

      public static final boolean isContextPropagationPresent
      The indicator that io.micrometer:context-propagation library is on classpath.
      Since:
      6.2.5
  • Method Details

    • captureReactorContext

      public static reactor.util.context.ContextView captureReactorContext()
      Capture a Reactor ContextView from the current thread local state according to the ContextSnapshotFactory logic. If no io.micrometer:context-propagation library is on classpath, the Context.empty() is returned.
      Returns:
      the Reactor ContextView from the current thread local state or Context.empty().
      Since:
      6.2.5
    • setThreadLocalsFromReactorContext

      @Nullable public static AutoCloseable setThreadLocalsFromReactorContext(reactor.util.context.ContextView context)
      Populate thread local variables from the provided Reactor ContextView according to the ContextSnapshotFactory logic.
      Parameters:
      context - the Reactor ContextView to populate from.
      Returns:
      the ContextSnapshot.Scope as a AutoCloseable to not pollute the target classpath. Can be cast if necessary. Or null if there is no io.micrometer:context-propagation library is on classpath.
      Since:
      6.2.5
    • messageSourceToFlux

      public static <T> reactor.core.publisher.Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageSource)
      Wrap a provided MessageSource into a Flux for pulling the on demand. When MessageSource.receive() returns null, the source Mono goes to the Mono.repeatWhenEmpty(java.util.function.Function<reactor.core.publisher.Flux<java.lang.Long>, ? extends org.reactivestreams.Publisher<?>>) state and performs a delay based on the DELAY_WHEN_EMPTY_KEY Duration entry in the subscriber context or falls back to 1 second duration. If a produced message has an IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK header it is ack'ed in the Mono.doOnSuccess(java.util.function.Consumer<? super T>) and nack'ed in the Mono.doOnError(java.util.function.Consumer<? super java.lang.Throwable>).
      Type Parameters:
      T - the expected payload type.
      Parameters:
      messageSource - the MessageSource to adapt.
      Returns:
      a Flux which pulls messages from the MessageSource on demand.
    • messageChannelToFlux

      public static <T> reactor.core.publisher.Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel)
      Adapt a provided MessageChannel into a Flux source: - a FluxMessageChannel is returned as is because it is already a Publisher; - a SubscribableChannel is subscribed with a MessageHandler for the Sinks.Many.tryEmitNext(Object) which is returned from this method; - a PollableChannel is wrapped into a MessageSource lambda and reuses messageSourceToFlux(MessageSource).
      Type Parameters:
      T - the expected payload type.
      Parameters:
      messageChannel - the MessageChannel to adapt.
      Returns:
      a Flux which uses a provided MessageChannel as a source for events to publish.