public class MultipleBroadcasterMessageHandler extends AbstractReactorMessageHandler
MessageHandler
by delegating processing to a Stream based on a partitionExpression.
The specific Stream that the message is delegated to is determined by the partitionExpression value.
Unless you change the scheduling of the inputStream in your processor, you should ensure that the
partitionExpression does not map messages delivered on different message bus dispatcher threads to the same
stream. This is due to the underlying use of a Broadcaster
.
For example, using the expression T(java.lang.Thread).currentThread().getId()
would map the current
dispatcher thread id to an instance of a Stream. If you wanted to have a Stream per
Kafka partition, you can use the expression header['kafka_partition_id']
since the MessageBus
dispatcher thread will be the same for each partition.
If the Stream mapped to the partitionExpression value has an error or completes, it will be recreated when the
next message consumed maps to the same partitionExpression value.
All error handling is the responsibility of the processor implementation.AbstractReactorMessageHandler.ChannelForwardingSubscriber
logger, processor
Constructor and Description |
---|
MultipleBroadcasterMessageHandler(Processor processor,
java.lang.String partitionExpression)
Construct a new MessageHandler given the reactor based Processor to delegate
processing to and a partition expression.
|
Modifier and Type | Method and Description |
---|---|
void |
destroy() |
protected void |
handleMessageInternal(org.springframework.messaging.Message<?> message) |
protected void |
onInit() |
void |
setIntegrationEvaluationContext(org.springframework.expression.EvaluationContext evaluationContext) |
getEnvironment, getInputType, getRingBufferSize, getStopTimeout, invokeProcessor, setRingBufferSize, setStopTimeout
getOutputChannel, produceOutput, sendOutputs, setOutputChannel, setOutputChannelName, setSendTimeout, shouldCopyRequestHeaders, shouldSplitOutput
configureMetrics, getActiveCount, getActiveCountLong, getComponentType, getDuration, getErrorCount, getErrorCountLong, getHandleCount, getHandleCountLong, getManagedName, getManagedType, getMaxDuration, getMeanDuration, getMinDuration, getOrder, getStandardDeviationDuration, handleMessage, isCountsEnabled, isLoggingEnabled, isStatsEnabled, reset, setCountsEnabled, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, setStatsEnabled
afterPropertiesSet, extractTypeIfPossible, getApplicationContext, getApplicationContextId, getBeanFactory, getChannelResolver, getComponentName, getConversionService, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setTaskScheduler, toString
public MultipleBroadcasterMessageHandler(Processor processor, java.lang.String partitionExpression)
processor
- The stream based reactor processorpublic void setIntegrationEvaluationContext(org.springframework.expression.EvaluationContext evaluationContext)
protected void onInit() throws java.lang.Exception
onInit
in class org.springframework.integration.handler.AbstractMessageProducingHandler
java.lang.Exception
protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
handleMessageInternal
in class org.springframework.integration.handler.AbstractMessageHandler
public void destroy() throws java.lang.Exception
java.lang.Exception