public class SubjectMessageHandler
extends org.springframework.integration.handler.AbstractMessageProducingHandler
implements org.springframework.context.SmartLifecycle
MessageHandler
by delegating processing to a Observable
.
The outputStream of the processor is used to create a message and send it to the output channel. If the
input channel and output channel are connected to the Binder
,
then data delivered to the input stream via a call to onNext is invoked on the dispatcher thread of the binder
and sending a message to the output channel will involve IO operations on the binder.
The implementation uses a SerializedSubject. This has the advantage that the state of the Observabale
can be shared across all the incoming dispatcher threads that are invoking onNext. It has the disadvantage
that processing and sending to the output channel will execute serially on one of the dispatcher threads.
The use of this handler makes for a very natural first experience when processing data. For example given
the stream
http | rxjava-processor | log where the rxjava-processor
does a
buffer(5)
and then produces a single value. Sending 10 messages to the http source will
result in 2 messages in the log, no matter how many dispatcher threads are used.
You can modify what thread the outputStream subscriber, which does the send to the output channel,
will use by explicitly calling observeOn
before returning the outputStream from your processor.
All error handling is the responsibility of the processor implementation.Constructor and Description |
---|
SubjectMessageHandler(RxJavaProcessor processor) |
Modifier and Type | Method and Description |
---|---|
int |
getPhase() |
protected void |
handleMessageInternal(org.springframework.messaging.Message<?> message) |
boolean |
isAutoStartup() |
boolean |
isRunning() |
void |
start() |
void |
stop() |
void |
stop(Runnable callback) |
getOutputChannel, onInit, produceOutput, sendOutputs, setOutputChannel, setOutputChannelName, setSendTimeout, shouldCopyRequestHeaders, shouldSplitOutput
configureMetrics, getActiveCount, getActiveCountLong, getComponentType, getDuration, getErrorCount, getErrorCountLong, getHandleCount, getHandleCountLong, getManagedName, getManagedType, getMaxDuration, getMeanDuration, getMinDuration, getOrder, getStandardDeviationDuration, handleMessage, isCountsEnabled, isLoggingEnabled, isStatsEnabled, reset, setCountsEnabled, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, setStatsEnabled
afterPropertiesSet, extractTypeIfPossible, getApplicationContext, getApplicationContextId, getBeanFactory, getChannelResolver, getComponentName, getConversionService, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setTaskScheduler, toString
public SubjectMessageHandler(RxJavaProcessor processor)
public void start()
start
in interface org.springframework.context.Lifecycle
public boolean isRunning()
isRunning
in interface org.springframework.context.Lifecycle
public boolean isAutoStartup()
isAutoStartup
in interface org.springframework.context.SmartLifecycle
public void stop(Runnable callback)
stop
in interface org.springframework.context.SmartLifecycle
public int getPhase()
getPhase
in interface org.springframework.context.Phased
protected void handleMessageInternal(org.springframework.messaging.Message<?> message) throws Exception
handleMessageInternal
in class org.springframework.integration.handler.AbstractMessageHandler
Exception
public void stop()
stop
in interface org.springframework.context.Lifecycle
Copyright © 2016 Pivotal Software, Inc.. All rights reserved.