public class MultipleSubjectMessageHandler
extends org.springframework.integration.handler.AbstractMessageProducingHandler
implements org.springframework.beans.factory.DisposableBean, org.springframework.integration.expression.IntegrationEvaluationContextAware
MessageHandler
by delegating processing to an Observable based on a partitionExpression.
The specific Observable that the message is delegated is determined by the partitionExpression value.
Unless you change the scheduling of the inputStream in your processor, you should ensure that the
partitionExpression does not map messages delivered on different message bus dispatcher threads to the same
observable. This is due to the underlying use of a PublishSubject
.
For example, using the expression T(java.lang.Thread).currentThread().getId()
would map the current
dispatcher thread id to an instance of a RxJava Observable. If you wanted to have an Observable per
Kafka partition, the expression header['kafka_partition_id'] since the MessageBus dispatcher thread will be
the same for each partition.
If the Observable mapped to the partitionExpression value has an error or completes, it will be recreated when the
next message consumed maps to the same partitionExpression value.
All error handling is the responsibility of the processor implementation.Modifier and Type | Field and Description |
---|---|
protected org.slf4j.Logger |
logger |
Constructor and Description |
---|
MultipleSubjectMessageHandler(Processor processor,
java.lang.String partitionExpression) |
Modifier and Type | Method and Description |
---|---|
void |
destroy() |
protected void |
handleMessageInternal(org.springframework.messaging.Message<?> message) |
void |
setIntegrationEvaluationContext(org.springframework.expression.EvaluationContext evaluationContext) |
getOutputChannel, onInit, produceOutput, sendOutputs, setOutputChannel, setOutputChannelName, setSendTimeout, shouldCopyRequestHeaders, shouldSplitOutput
getComponentType, getOrder, handleMessage, setOrder, setShouldTrack
afterPropertiesSet, extractTypeIfPossible, getApplicationContext, getApplicationContextId, getBeanFactory, getChannelResolver, getComponentName, getConversionService, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setTaskScheduler, toString
public MultipleSubjectMessageHandler(Processor processor, java.lang.String partitionExpression)
protected void handleMessageInternal(org.springframework.messaging.Message<?> message) throws java.lang.Exception
handleMessageInternal
in class org.springframework.integration.handler.AbstractMessageHandler
java.lang.Exception
public void destroy() throws java.lang.Exception
destroy
in interface org.springframework.beans.factory.DisposableBean
java.lang.Exception
public void setIntegrationEvaluationContext(org.springframework.expression.EvaluationContext evaluationContext)
setIntegrationEvaluationContext
in interface org.springframework.integration.expression.IntegrationEvaluationContextAware