public class FluxAggregatorMessageHandler extends AbstractMessageProducingHandler implements ManageableLifecycle
AbstractMessageProducingHandler
implementation for aggregation logic based
on Reactor's Flux.groupBy(java.util.function.Function<? super T, ? extends K>)
and Flux.window(int)
operators.
The incoming messages are emitted into a FluxSink
provided by the
Flux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>)
initialized in the constructor.
The resulting windows for groups are wrapped into Message
s for downstream
consumption.
If the AbstractMessageProducingHandler.getOutputChannel()
is not a ReactiveStreamsSubscribableChannel
instance, a subscription for the whole aggregating Flux
is performed in the
start()
method.
IntegrationManagement.ManagementOverrides
messagingTemplate
EXPRESSION_PARSER, logger
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCE
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
Constructor and Description |
---|
FluxAggregatorMessageHandler()
Create an instance with a
Flux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>) and apply Flux.groupBy(java.util.function.Function<? super T, ? extends K>) and Flux.window(int)
transformation into it. |
Modifier and Type | Method and Description |
---|---|
String |
getComponentType()
Subclasses may implement this method to provide component type information.
|
IntegrationPatternType |
getIntegrationPatternType()
Return a pattern type this component implements.
|
protected void |
handleMessageInternal(Message<?> message) |
boolean |
isRunning() |
void |
setBoundaryTrigger(java.util.function.Predicate<Message<?>> boundaryTrigger)
Configure a
Predicate for messages to determine a window boundary in the
Flux.windowUntil(java.util.function.Predicate<T>) operator. |
void |
setCombineFunction(java.util.function.Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Mono<Message<?>>> combineFunction)
Configure a transformation
Function to apply for a Flux window to emit. |
void |
setCorrelationStrategy(CorrelationStrategy correlationStrategy)
Configure a
CorrelationStrategy to determine a group key from the incoming messages. |
void |
setWindowConfigurer(java.util.function.Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Flux<reactor.core.publisher.Flux<Message<?>>>> windowConfigurer)
Configure a
Function to apply a transformation into the grouping Flux
for any arbitrary Flux.window(int) options not covered by the simple options. |
void |
setWindowSize(int windowSize)
Specify a size for windows to close.
|
void |
setWindowSizeFunction(java.util.function.Function<Message<?>,Integer> windowSizeFunction)
Specify a
Function to determine a size for windows to close against the first message in group. |
void |
setWindowTimespan(java.time.Duration windowTimespan)
Configure a
Duration for closing windows periodically. |
protected boolean |
shouldCopyRequestHeaders()
Subclasses may override this.
|
void |
start() |
void |
stop() |
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, onInit, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, shouldSplitOutput, updateNotPropagatedHeaders
handleMessage, onComplete, onError, onNext, onSubscribe
buildSendTimer, destroy, getManagedName, getManagedType, getMetricsCaptor, getOrder, getOverrides, isLoggingEnabled, registerMetricsCaptor, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrack
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getThisAs
getBeanName, getComponentName
public FluxAggregatorMessageHandler()
Flux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>)
and apply Flux.groupBy(java.util.function.Function<? super T, ? extends K>)
and Flux.window(int)
transformation into it.public void setCorrelationStrategy(CorrelationStrategy correlationStrategy)
CorrelationStrategy
to determine a group key from the incoming messages.
By default a HeaderAttributeCorrelationStrategy
is used against a
IntegrationMessageHeaderAccessor.CORRELATION_ID
header value.correlationStrategy
- the CorrelationStrategy
to use.public void setCombineFunction(java.util.function.Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Mono<Message<?>>> combineFunction)
Function
to apply for a Flux
window to emit.
Requires a Mono
result with a Message
as value as a combination result
of the incoming Flux
for window.
By default a Flux
for window is fully wrapped into a message with headers copied
from the first message in window. Such a Flux
in the payload has to be subscribed
and consumed downstream.combineFunction
- the Function
to use for result windows transformation.public void setBoundaryTrigger(java.util.function.Predicate<Message<?>> boundaryTrigger)
Predicate
for messages to determine a window boundary in the
Flux.windowUntil(java.util.function.Predicate<T>)
operator.
Has a precedence over any other window configuration options.boundaryTrigger
- the Predicate
to use for window boundary.Flux.windowUntil(Predicate)
public void setWindowSize(int windowSize)
setWindowTimespan(Duration)
.windowSize
- the size for window to use.Flux.window(int)
,
Flux.windowTimeout(int, Duration)
public void setWindowSizeFunction(java.util.function.Function<Message<?>,Integer> windowSizeFunction)
Function
to determine a size for windows to close against the first message in group.
Tne result of the function can be combined with the setWindowTimespan(Duration)
.
By default an IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
header is consulted.windowSizeFunction
- the Function
to use to determine a window size
against a first message in the group.Flux.window(int)
,
Flux.windowTimeout(int, Duration)
public void setWindowTimespan(java.time.Duration windowTimespan)
Duration
for closing windows periodically.
Can be combined with the setWindowSize(int)
or setWindowSizeFunction(Function)
.windowTimespan
- the Duration
to use for windows to close periodically.Flux.window(Duration)
,
Flux.windowTimeout(int, Duration)
public void setWindowConfigurer(java.util.function.Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Flux<reactor.core.publisher.Flux<Message<?>>>> windowConfigurer)
Function
to apply a transformation into the grouping Flux
for any arbitrary Flux.window(int)
options not covered by the simple options.
Has a precedence over any other window configuration options.windowConfigurer
- the Function
to apply any custom window transformation.public String getComponentType()
IntegrationObjectSupport
getComponentType
in interface NamedComponent
getComponentType
in class MessageHandlerSupport
public IntegrationPatternType getIntegrationPatternType()
IntegrationPattern
getIntegrationPatternType
in interface IntegrationPattern
getIntegrationPatternType
in class MessageHandlerSupport
IntegrationPatternType
this component implements.public void start()
start
in interface Lifecycle
start
in interface ManageableLifecycle
public void stop()
stop
in interface Lifecycle
stop
in interface ManageableLifecycle
public boolean isRunning()
isRunning
in interface Lifecycle
isRunning
in interface ManageableLifecycle
protected void handleMessageInternal(Message<?> message)
handleMessageInternal
in class AbstractMessageHandler
protected boolean shouldCopyRequestHeaders()
AbstractMessageProducingHandler
shouldCopyRequestHeaders
in class AbstractMessageProducingHandler