public class FluxAggregatorMessageHandler extends AbstractMessageProducingHandler implements Lifecycle
AbstractMessageProducingHandler implementation for aggregation logic based
on Reactor's Flux.groupBy(java.util.function.Function<? super T, ? extends K>) and Flux.window(int) operators.
The incoming messages are emitted into a FluxSink provided by the
Flux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>) initialized in the constructor.
The resulting windows for groups are wrapped into Messages for downstream
consumption.
If the AbstractMessageProducingHandler.getOutputChannel() is not a ReactiveStreamsSubscribableChannel
instance, a subscription for the whole aggregating Flux is performed in the
start() method.
IntegrationManagement.ManagementOverridesmessagingTemplateEXPRESSION_PARSER, loggerMETER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAMEHIGHEST_PRECEDENCE, LOWEST_PRECEDENCE| Constructor and Description |
|---|
FluxAggregatorMessageHandler()
Create an instance with a
Flux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>) and apply Flux.groupBy(java.util.function.Function<? super T, ? extends K>) and Flux.window(int)
transformation into it. |
| Modifier and Type | Method and Description |
|---|---|
protected void |
handleMessageInternal(Message<?> message) |
boolean |
isRunning() |
void |
setBoundaryTrigger(java.util.function.Predicate<Message<?>> boundaryTrigger)
Configure a
Predicate for messages to determine a window boundary in the
Flux.windowUntil(java.util.function.Predicate<T>) operator. |
void |
setCombineFunction(java.util.function.Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Mono<Message<?>>> combineFunction)
Configure a transformation
Function to apply for a Flux window to emit. |
void |
setCorrelationStrategy(CorrelationStrategy correlationStrategy)
Configure a
CorrelationStrategy to determine a group key from the incoming messages. |
void |
setWindowConfigurer(java.util.function.Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Flux<reactor.core.publisher.Flux<Message<?>>>> windowConfigurer)
Configure a
Function to apply a transformation into the grouping Flux
for any arbitrary Flux.window(int) options not covered by the simple options. |
void |
setWindowSize(int windowSize)
Specify a size for windows to close.
|
void |
setWindowSizeFunction(java.util.function.Function<Message<?>,Integer> windowSizeFunction)
Specify a
Function to determine a size for windows to close against the first message in group. |
void |
setWindowTimespan(java.time.Duration windowTimespan)
Configure a
Duration for closing windows periodically. |
protected boolean |
shouldCopyRequestHeaders()
Subclasses may override this.
|
void |
start() |
void |
stop() |
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, onInit, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, shouldSplitOutput, updateNotPropagatedHeadersconfigureMetrics, destroy, getActiveCount, getActiveCountLong, getComponentType, getDuration, getErrorCount, getErrorCountLong, getHandleCount, getHandleCountLong, getManagedName, getManagedType, getMaxDuration, getMeanDuration, getMetricsCaptor, getMinDuration, getOrder, getOverrides, getStandardDeviationDuration, handleMessage, isCountsEnabled, isLoggingEnabled, isStatsEnabled, onComplete, onError, onNext, onSubscribe, registerMetricsCaptor, reset, setCountsEnabled, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, setStatsEnabledafterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waiterrorCount, handleCountgetBeanName, getComponentNamepublic FluxAggregatorMessageHandler()
Flux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>) and apply Flux.groupBy(java.util.function.Function<? super T, ? extends K>) and Flux.window(int)
transformation into it.public void setCorrelationStrategy(CorrelationStrategy correlationStrategy)
CorrelationStrategy to determine a group key from the incoming messages.
By default a HeaderAttributeCorrelationStrategy is used against a
IntegrationMessageHeaderAccessor.CORRELATION_ID header value.correlationStrategy - the CorrelationStrategy to use.public void setCombineFunction(java.util.function.Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Mono<Message<?>>> combineFunction)
Function to apply for a Flux window to emit.
Requires a Mono result with a Message as value as a combination result
of the incoming Flux for window.
By default a Flux for window is fully wrapped into a message with headers copied
from the first message in window. Such a Flux in the payload has to be subscribed
and consumed downstream.combineFunction - the Function to use for result windows transformation.public void setBoundaryTrigger(java.util.function.Predicate<Message<?>> boundaryTrigger)
Predicate for messages to determine a window boundary in the
Flux.windowUntil(java.util.function.Predicate<T>) operator.
Has a precedence over any other window configuration options.boundaryTrigger - the Predicate to use for window boundary.Flux.windowUntil(Predicate)public void setWindowSize(int windowSize)
setWindowTimespan(Duration).windowSize - the size for window to use.Flux.window(int),
Flux.windowTimeout(int, Duration)public void setWindowSizeFunction(java.util.function.Function<Message<?>,Integer> windowSizeFunction)
Function to determine a size for windows to close against the first message in group.
Tne result of the function can be combined with the setWindowTimespan(Duration).
By default an IntegrationMessageHeaderAccessor.SEQUENCE_SIZE header is consulted.windowSizeFunction - the Function to use to determine a window size
against a first message in the group.Flux.window(int),
Flux.windowTimeout(int, Duration)public void setWindowTimespan(java.time.Duration windowTimespan)
Duration for closing windows periodically.
Can be combined with the setWindowSize(int) or setWindowSizeFunction(Function).windowTimespan - the Duration to use for windows to close periodically.Flux.window(Duration),
Flux.windowTimeout(int, Duration)public void setWindowConfigurer(java.util.function.Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Flux<reactor.core.publisher.Flux<Message<?>>>> windowConfigurer)
Function to apply a transformation into the grouping Flux
for any arbitrary Flux.window(int) options not covered by the simple options.
Has a precedence over any other window configuration options.windowConfigurer - the Function to apply any custom window transformation.protected void handleMessageInternal(Message<?> message)
handleMessageInternal in class AbstractMessageHandlerprotected boolean shouldCopyRequestHeaders()
AbstractMessageProducingHandlershouldCopyRequestHeaders in class AbstractMessageProducingHandler