public class BroadcasterMessageHandler extends AbstractReactorMessageHandler
MessageHandler
by delegating processing to a Stream
The outputStream of the processor is used to create a message and send it to the output channel. If the
input channel and output channel are connected to the MessageBus, then data delivered to the input stream via
a call to onNext is invoked on the dispatcher thread of the message bus and sending a message to the output
channel will involve IO operations on the message bus.
The implementation uses a RingBufferProcessor
with asynchronous dispatch.
This has the advantage that the state of the Stream can be shared across all the incoming dispatcher threads that
are invoking onNext. It has the disadvantage that processing and sending to the output channel will execute serially
on one of the dispatcher threads.
The use of this handler makes for a very natural first experience when processing data. For example given
the stream
http | reactor-processor | log where the reactor-processor
does does a
buffer(5)
and then produces a single value. Sending 10 messages to the http source will
result in 2 messages in the log, no matter how many dispatcher threads are used.
You can modify what thread the outputStream subscriber, which does the send to the output channel,
will use by explicitly calling dispatchOn
or other switch (http://projectreactor.io/docs/reference/#streams-multithreading)
before returning the outputStream from your processor.
Use MultipleBroadcasterMessageHandler
for concurrent execution on dispatcher
threads spread across across multiple Stream.
All error handling is the responsibility of the processor implementation.AbstractReactorMessageHandler.ChannelForwardingSubscriber
logger, processor
Constructor and Description |
---|
BroadcasterMessageHandler(Processor processor)
Construct a new BroadcasterMessageHandler given the reactor based Processor to delegate
processing to.
|
Modifier and Type | Method and Description |
---|---|
void |
destroy() |
protected void |
handleMessageInternal(org.springframework.messaging.Message<?> message) |
protected void |
onInit() |
getEnvironment, getInputType, getRingBufferSize, getStopTimeout, invokeProcessor, setRingBufferSize, setStopTimeout
getOutputChannel, produceOutput, sendOutputs, setOutputChannel, setOutputChannelName, setSendTimeout, shouldCopyRequestHeaders, shouldSplitOutput
configureMetrics, getActiveCount, getActiveCountLong, getComponentType, getDuration, getErrorCount, getErrorCountLong, getHandleCount, getHandleCountLong, getManagedName, getManagedType, getMaxDuration, getMeanDuration, getMinDuration, getOrder, getStandardDeviationDuration, handleMessage, isCountsEnabled, isLoggingEnabled, isStatsEnabled, reset, setCountsEnabled, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, setStatsEnabled
afterPropertiesSet, extractTypeIfPossible, getApplicationContext, getApplicationContextId, getBeanFactory, getChannelResolver, getComponentName, getConversionService, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setTaskScheduler, toString
public BroadcasterMessageHandler(Processor processor)
processor
- The stream based reactor processorprotected void handleMessageInternal(org.springframework.messaging.Message<?> message) throws java.lang.Exception
handleMessageInternal
in class org.springframework.integration.handler.AbstractMessageHandler
java.lang.Exception
public void destroy() throws java.lang.Exception
java.lang.Exception
protected void onInit() throws java.lang.Exception
onInit
in class org.springframework.integration.handler.AbstractMessageProducingHandler
java.lang.Exception