Class FluxAggregatorMessageHandler
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.handler.MessageHandlerSupport
org.springframework.integration.handler.AbstractMessageHandler
org.springframework.integration.handler.AbstractMessageProducingHandler
org.springframework.integration.aggregator.FluxAggregatorMessageHandler
- All Implemented Interfaces:
- org.reactivestreams.Subscriber<Message<?>>,- Aware,- BeanFactoryAware,- BeanNameAware,- DisposableBean,- InitializingBean,- ApplicationContextAware,- Lifecycle,- Ordered,- ComponentSourceAware,- ExpressionCapable,- Orderable,- MessageProducer,- HeaderPropagationAware,- IntegrationPattern,- NamedComponent,- IntegrationManagement,- ManageableLifecycle,- TrackableComponent,- MessageHandler,- reactor.core.CoreSubscriber<Message<?>>
public class FluxAggregatorMessageHandler
extends AbstractMessageProducingHandler
implements ManageableLifecycle
The 
AbstractMessageProducingHandler implementation for aggregation logic based
 on Reactor's Flux.groupBy(java.util.function.Function<? super T, ? extends K>) and Flux.window(int) operators.
 
 The incoming messages are emitted into a FluxSink provided by the
 Flux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>) initialized in the constructor.
 
 The resulting windows for groups are wrapped into Messages for downstream
 consumption.
 
 If the AbstractMessageProducingHandler.getOutputChannel() is not a ReactiveStreamsSubscribableChannel
 instance, a subscription for the whole aggregating Flux is performed in the
 start() method.
- Since:
- 5.2
- Author:
- Artem Bilan
- 
Nested Class SummaryNested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagementIntegrationManagement.ManagementOverrides
- 
Field SummaryFields inherited from class org.springframework.integration.handler.AbstractMessageProducingHandlermessagingTemplateFields inherited from class org.springframework.integration.context.IntegrationObjectSupportEXPRESSION_PARSER, loggerFields inherited from interface org.springframework.integration.support.management.IntegrationManagementMETER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAMEFields inherited from interface org.springframework.core.OrderedHIGHEST_PRECEDENCE, LOWEST_PRECEDENCE
- 
Constructor SummaryConstructorsConstructorDescriptionCreate an instance with aFlux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>)and applyFlux.groupBy(java.util.function.Function<? super T, ? extends K>)andFlux.window(int)transformation into it.
- 
Method SummaryModifier and TypeMethodDescriptionSubclasses may implement this method to provide component type information.Return a pattern type this component implements.protected voidhandleMessageInternal(Message<?> message) booleanvoidsetBoundaryTrigger(Predicate<Message<?>> boundaryTrigger) Configure aPredicatefor messages to determine a window boundary in theFlux.windowUntil(java.util.function.Predicate<T>)operator.voidsetCombineFunction(Function<reactor.core.publisher.Flux<Message<?>>, reactor.core.publisher.Mono<Message<?>>> combineFunction) Configure a transformationFunctionto apply for aFluxwindow to emit.voidsetCorrelationStrategy(CorrelationStrategy correlationStrategy) Configure aCorrelationStrategyto determine a group key from the incoming messages.voidsetWindowConfigurer(Function<reactor.core.publisher.Flux<Message<?>>, reactor.core.publisher.Flux<reactor.core.publisher.Flux<Message<?>>>> windowConfigurer) Configure aFunctionto apply a transformation into the groupingFluxfor any arbitraryFlux.window(int)options not covered by the simple options.voidsetWindowSize(int windowSize) Specify a size for windows to close.voidsetWindowSizeFunction(Function<Message<?>, Integer> windowSizeFunction) Specify aFunctionto determine a size for windows to close against the first message in group.voidsetWindowTimespan(Duration windowTimespan) Configure aDurationfor closing windows periodically.protected booleanSubclasses may override this.voidstart()voidstop()Methods inherited from class org.springframework.integration.handler.AbstractMessageProducingHandleraddNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, onInit, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, setupMessageProcessor, shouldSplitOutput, updateNotPropagatedHeadersMethods inherited from class org.springframework.integration.handler.AbstractMessageHandlerhandleMessage, onComplete, onError, onNext, onSubscribe, setObservationConventionMethods inherited from class org.springframework.integration.handler.MessageHandlerSupportbuildSendTimer, destroy, getManagedName, getManagedType, getMetricsCaptor, getObservationRegistry, getOrder, getOverrides, isLoggingEnabled, isObserved, registerMetricsCaptor, registerObservationRegistry, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrackMethods inherited from class org.springframework.integration.context.IntegrationObjectSupportafterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentDescription, getComponentName, getComponentSource, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentDescription, setComponentName, setComponentSource, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringMethods inherited from class java.lang.Objectclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface reactor.core.CoreSubscribercurrentContextMethods inherited from interface org.springframework.integration.support.management.IntegrationManagementgetThisAsMethods inherited from interface org.springframework.integration.support.context.NamedComponentgetBeanName, getComponentName
- 
Constructor Details- 
FluxAggregatorMessageHandlerpublic FluxAggregatorMessageHandler()Create an instance with aFlux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>)and applyFlux.groupBy(java.util.function.Function<? super T, ? extends K>)andFlux.window(int)transformation into it.
 
- 
- 
Method Details- 
setCorrelationStrategyConfigure aCorrelationStrategyto determine a group key from the incoming messages. By default aHeaderAttributeCorrelationStrategyis used against aIntegrationMessageHeaderAccessor.CORRELATION_IDheader value.- Parameters:
- correlationStrategy- the- CorrelationStrategyto use.
 
- 
setCombineFunctionpublic void setCombineFunction(Function<reactor.core.publisher.Flux<Message<?>>, reactor.core.publisher.Mono<Message<?>>> combineFunction) Configure a transformationFunctionto apply for aFluxwindow to emit. Requires aMonoresult with aMessageas value as a combination result of the incomingFluxfor window. By default aFluxfor window is fully wrapped into a message with headers copied from the first message in window. Such aFluxin the payload has to be subscribed and consumed downstream.- Parameters:
- combineFunction- the- Functionto use for result windows transformation.
 
- 
setBoundaryTrigger
- 
setWindowSizepublic void setWindowSize(int windowSize) Specify a size for windows to close. Can be combined with thesetWindowTimespan(Duration).- Parameters:
- windowSize- the size for window to use.
- See Also:
 
- 
setWindowSizeFunctionSpecify aFunctionto determine a size for windows to close against the first message in group. Tne result of the function can be combined with thesetWindowTimespan(Duration). By default anIntegrationMessageHeaderAccessor.SEQUENCE_SIZEheader is consulted.- Parameters:
- windowSizeFunction- the- Functionto use to determine a window size against a first message in the group.
- See Also:
 
- 
setWindowTimespanConfigure aDurationfor closing windows periodically. Can be combined with thesetWindowSize(int)orsetWindowSizeFunction(Function).- Parameters:
- windowTimespan- the- Durationto use for windows to close periodically.
- See Also:
 
- 
setWindowConfigurerpublic void setWindowConfigurer(Function<reactor.core.publisher.Flux<Message<?>>, reactor.core.publisher.Flux<reactor.core.publisher.Flux<Message<?>>>> windowConfigurer) Configure aFunctionto apply a transformation into the groupingFluxfor any arbitraryFlux.window(int)options not covered by the simple options. Has a precedence over any other window configuration options.- Parameters:
- windowConfigurer- the- Functionto apply any custom window transformation.
 
- 
getComponentTypeDescription copied from class:IntegrationObjectSupportSubclasses may implement this method to provide component type information.- Specified by:
- getComponentTypein interface- NamedComponent
- Overrides:
- getComponentTypein class- MessageHandlerSupport
 
- 
getIntegrationPatternTypeDescription copied from interface:IntegrationPatternReturn a pattern type this component implements.- Specified by:
- getIntegrationPatternTypein interface- IntegrationPattern
- Overrides:
- getIntegrationPatternTypein class- MessageHandlerSupport
- Returns:
- the IntegrationPatternTypethis component implements.
 
- 
startpublic void start()- Specified by:
- startin interface- Lifecycle
- Specified by:
- startin interface- ManageableLifecycle
 
- 
stoppublic void stop()- Specified by:
- stopin interface- Lifecycle
- Specified by:
- stopin interface- ManageableLifecycle
 
- 
isRunningpublic boolean isRunning()- Specified by:
- isRunningin interface- Lifecycle
- Specified by:
- isRunningin interface- ManageableLifecycle
 
- 
handleMessageInternal- Specified by:
- handleMessageInternalin class- AbstractMessageHandler
 
- 
shouldCopyRequestHeadersprotected boolean shouldCopyRequestHeaders()Description copied from class:AbstractMessageProducingHandlerSubclasses may override this. True by default.- Overrides:
- shouldCopyRequestHeadersin class- AbstractMessageProducingHandler
- Returns:
- true if the request headers should be copied.
 
 
-