Class FluxAggregatorMessageHandler
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.handler.MessageHandlerSupport
org.springframework.integration.handler.AbstractMessageHandler
org.springframework.integration.handler.AbstractMessageProducingHandler
org.springframework.integration.aggregator.FluxAggregatorMessageHandler
- All Implemented Interfaces:
org.reactivestreams.Subscriber<Message<?>>
,Aware
,BeanFactoryAware
,BeanNameAware
,DisposableBean
,InitializingBean
,ApplicationContextAware
,Lifecycle
,Ordered
,ExpressionCapable
,Orderable
,MessageProducer
,HeaderPropagationAware
,IntegrationPattern
,NamedComponent
,IntegrationManagement
,ManageableLifecycle
,TrackableComponent
,MessageHandler
,reactor.core.CoreSubscriber<Message<?>>
public class FluxAggregatorMessageHandler extends AbstractMessageProducingHandler implements ManageableLifecycle
The
AbstractMessageProducingHandler
implementation for aggregation logic based
on Reactor's Flux.groupBy(java.util.function.Function<? super T, ? extends K>)
and Flux.window(int)
operators.
The incoming messages are emitted into a FluxSink
provided by the
Flux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>)
initialized in the constructor.
The resulting windows for groups are wrapped into Message
s for downstream
consumption.
If the AbstractMessageProducingHandler.getOutputChannel()
is not a ReactiveStreamsSubscribableChannel
instance, a subscription for the whole aggregating Flux
is performed in the
start()
method.
- Since:
- 5.2
- Author:
- Artem Bilan
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
IntegrationManagement.ManagementOverrides
-
Field Summary
Fields inherited from class org.springframework.integration.handler.AbstractMessageProducingHandler
messagingTemplate
Fields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, logger
Fields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
Fields inherited from interface org.springframework.core.Ordered
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCE
-
Constructor Summary
Constructors Constructor Description FluxAggregatorMessageHandler()
Create an instance with aFlux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>)
and applyFlux.groupBy(java.util.function.Function<? super T, ? extends K>)
andFlux.window(int)
transformation into it. -
Method Summary
Modifier and Type Method Description String
getComponentType()
Subclasses may implement this method to provide component type information.IntegrationPatternType
getIntegrationPatternType()
Return a pattern type this component implements.protected void
handleMessageInternal(Message<?> message)
boolean
isRunning()
void
setBoundaryTrigger(Predicate<Message<?>> boundaryTrigger)
Configure aPredicate
for messages to determine a window boundary in theFlux.windowUntil(java.util.function.Predicate<T>)
operator.void
setCombineFunction(Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Mono<Message<?>>> combineFunction)
Configure a transformationFunction
to apply for aFlux
window to emit.void
setCorrelationStrategy(CorrelationStrategy correlationStrategy)
Configure aCorrelationStrategy
to determine a group key from the incoming messages.void
setWindowConfigurer(Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Flux<reactor.core.publisher.Flux<Message<?>>>> windowConfigurer)
Configure aFunction
to apply a transformation into the groupingFlux
for any arbitraryFlux.window(int)
options not covered by the simple options.void
setWindowSize(int windowSize)
Specify a size for windows to close.void
setWindowSizeFunction(Function<Message<?>,Integer> windowSizeFunction)
Specify aFunction
to determine a size for windows to close against the first message in group.void
setWindowTimespan(Duration windowTimespan)
Configure aDuration
for closing windows periodically.protected boolean
shouldCopyRequestHeaders()
Subclasses may override this.void
start()
void
stop()
Methods inherited from class org.springframework.integration.handler.AbstractMessageProducingHandler
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, onInit, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, shouldSplitOutput, updateNotPropagatedHeaders
Methods inherited from class org.springframework.integration.handler.AbstractMessageHandler
handleMessage, onComplete, onError, onNext, onSubscribe
Methods inherited from class org.springframework.integration.handler.MessageHandlerSupport
buildSendTimer, destroy, getManagedName, getManagedType, getMetricsCaptor, getOrder, getOverrides, isLoggingEnabled, registerMetricsCaptor, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrack
Methods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface org.springframework.integration.support.management.IntegrationManagement
getThisAs
Methods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentName
-
Constructor Details
-
FluxAggregatorMessageHandler
public FluxAggregatorMessageHandler()Create an instance with aFlux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>)
and applyFlux.groupBy(java.util.function.Function<? super T, ? extends K>)
andFlux.window(int)
transformation into it.
-
-
Method Details
-
setCorrelationStrategy
Configure aCorrelationStrategy
to determine a group key from the incoming messages. By default aHeaderAttributeCorrelationStrategy
is used against aIntegrationMessageHeaderAccessor.CORRELATION_ID
header value.- Parameters:
correlationStrategy
- theCorrelationStrategy
to use.
-
setCombineFunction
public void setCombineFunction(Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Mono<Message<?>>> combineFunction)Configure a transformationFunction
to apply for aFlux
window to emit. Requires aMono
result with aMessage
as value as a combination result of the incomingFlux
for window. By default aFlux
for window is fully wrapped into a message with headers copied from the first message in window. Such aFlux
in the payload has to be subscribed and consumed downstream.- Parameters:
combineFunction
- theFunction
to use for result windows transformation.
-
setBoundaryTrigger
Configure aPredicate
for messages to determine a window boundary in theFlux.windowUntil(java.util.function.Predicate<T>)
operator. Has a precedence over any other window configuration options.- Parameters:
boundaryTrigger
- thePredicate
to use for window boundary.- See Also:
Flux.windowUntil(Predicate)
-
setWindowSize
public void setWindowSize(int windowSize)Specify a size for windows to close. Can be combined with thesetWindowTimespan(Duration)
.- Parameters:
windowSize
- the size for window to use.- See Also:
Flux.window(int)
,Flux.windowTimeout(int, Duration)
-
setWindowSizeFunction
Specify aFunction
to determine a size for windows to close against the first message in group. Tne result of the function can be combined with thesetWindowTimespan(Duration)
. By default anIntegrationMessageHeaderAccessor.SEQUENCE_SIZE
header is consulted.- Parameters:
windowSizeFunction
- theFunction
to use to determine a window size against a first message in the group.- See Also:
Flux.window(int)
,Flux.windowTimeout(int, Duration)
-
setWindowTimespan
Configure aDuration
for closing windows periodically. Can be combined with thesetWindowSize(int)
orsetWindowSizeFunction(Function)
.- Parameters:
windowTimespan
- theDuration
to use for windows to close periodically.- See Also:
Flux.window(Duration)
,Flux.windowTimeout(int, Duration)
-
setWindowConfigurer
public void setWindowConfigurer(Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Flux<reactor.core.publisher.Flux<Message<?>>>> windowConfigurer)Configure aFunction
to apply a transformation into the groupingFlux
for any arbitraryFlux.window(int)
options not covered by the simple options. Has a precedence over any other window configuration options.- Parameters:
windowConfigurer
- theFunction
to apply any custom window transformation.
-
getComponentType
Description copied from class:IntegrationObjectSupport
Subclasses may implement this method to provide component type information.- Specified by:
getComponentType
in interfaceNamedComponent
- Overrides:
getComponentType
in classMessageHandlerSupport
-
getIntegrationPatternType
Description copied from interface:IntegrationPattern
Return a pattern type this component implements.- Specified by:
getIntegrationPatternType
in interfaceIntegrationPattern
- Overrides:
getIntegrationPatternType
in classMessageHandlerSupport
- Returns:
- the
IntegrationPatternType
this component implements.
-
start
public void start()- Specified by:
start
in interfaceLifecycle
- Specified by:
start
in interfaceManageableLifecycle
-
stop
public void stop()- Specified by:
stop
in interfaceLifecycle
- Specified by:
stop
in interfaceManageableLifecycle
-
isRunning
public boolean isRunning()- Specified by:
isRunning
in interfaceLifecycle
- Specified by:
isRunning
in interfaceManageableLifecycle
-
handleMessageInternal
- Specified by:
handleMessageInternal
in classAbstractMessageHandler
-
shouldCopyRequestHeaders
protected boolean shouldCopyRequestHeaders()Description copied from class:AbstractMessageProducingHandler
Subclasses may override this. True by default.- Overrides:
shouldCopyRequestHeaders
in classAbstractMessageProducingHandler
- Returns:
- true if the request headers should be copied.
-