public class MultipleSubjectMessageHandler
extends org.springframework.integration.handler.AbstractMessageProducingHandler
implements org.springframework.beans.factory.DisposableBean
MessageHandler
by delegating processing to an Observable based on a partitionExpression.
The specific Observable that the message is delegated is determined by the partitionExpression value.
Unless you change the scheduling of the inputStream in your processor, you should ensure that the
partitionExpression does not map messages delivered on different message bus dispatcher threads to the same
observable. This is due to the underlying use of a PublishSubject
.
For example, using the expression T(java.lang.Thread).currentThread().getId()
would map the current
dispatcher thread id to an instance of a RxJava Observable. If you wanted to have an Observable per
Kafka partition, the expression header['kafka_partition_id'] since the MessageBus dispatcher thread will be
the same for each partition.
If the Observable mapped to the partitionExpression value has an error or completes, it will be recreated when the
next message consumed maps to the same partitionExpression value.
All error handling is the responsibility of the processor implementation.Modifier and Type | Field and Description |
---|---|
protected org.slf4j.Logger |
logger |
Constructor and Description |
---|
MultipleSubjectMessageHandler(Processor processor,
java.lang.String partitionExpression) |
Modifier and Type | Method and Description |
---|---|
void |
destroy() |
protected void |
handleMessageInternal(org.springframework.messaging.Message<?> message) |
protected void |
onInit() |
void |
setIntegrationEvaluationContext(org.springframework.expression.EvaluationContext evaluationContext) |
getOutputChannel, produceOutput, sendOutputs, setOutputChannel, setOutputChannelName, setSendTimeout, shouldCopyRequestHeaders, shouldSplitOutput
configureMetrics, getActiveCount, getActiveCountLong, getComponentType, getDuration, getErrorCount, getErrorCountLong, getHandleCount, getHandleCountLong, getManagedName, getManagedType, getMaxDuration, getMeanDuration, getMinDuration, getOrder, getStandardDeviationDuration, handleMessage, isCountsEnabled, isLoggingEnabled, isStatsEnabled, reset, setCountsEnabled, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, setStatsEnabled
afterPropertiesSet, extractTypeIfPossible, getApplicationContext, getApplicationContextId, getBeanFactory, getChannelResolver, getComponentName, getConversionService, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setTaskScheduler, toString
public MultipleSubjectMessageHandler(Processor processor, java.lang.String partitionExpression)
public void setIntegrationEvaluationContext(org.springframework.expression.EvaluationContext evaluationContext)
protected void onInit() throws java.lang.Exception
onInit
in class org.springframework.integration.handler.AbstractMessageProducingHandler
java.lang.Exception
protected void handleMessageInternal(org.springframework.messaging.Message<?> message) throws java.lang.Exception
handleMessageInternal
in class org.springframework.integration.handler.AbstractMessageHandler
java.lang.Exception
public void destroy() throws java.lang.Exception
destroy
in interface org.springframework.beans.factory.DisposableBean
java.lang.Exception