public class ReactiveRedisStreamMessageProducer extends MessageProducerSupport
MessageProducerSupport
for reading messages from a Redis Stream and publishing them into the provided
output channel.
By default this adapter reads message as a standalone client XREAD
(Redis command) but can be switched to a
Consumer Group feature XREADGROUP
by setting consumerName
field.
By default the Consumer Group name is the id of this bean IntegrationObjectSupport.getBeanName()
.lifecycleCondition, lifecycleLock
EXPRESSION_PARSER, logger
DEFAULT_PHASE
Constructor and Description |
---|
ReactiveRedisStreamMessageProducer(ReactiveRedisConnectionFactory reactiveConnectionFactory,
String streamKey) |
Modifier and Type | Method and Description |
---|---|
protected void |
doStart()
Take no action by default.
|
String |
getComponentType()
Subclasses may implement this method to provide component type information.
|
protected void |
onInit()
Subclasses may implement this for initialization logic.
|
void |
setAutoAck(boolean autoAck)
Set whether or not acknowledge message read in the Consumer Group.
|
void |
setConsumerGroup(String consumerGroup)
Set the name of the Consumer Group.
|
void |
setConsumerName(String consumerName)
Set the name of the consumer.
|
void |
setCreateConsumerGroup(boolean createConsumerGroup)
Create the Consumer Group if and only if it does not exist.
|
void |
setExtractPayload(boolean extractPayload)
Configure this channel adapter to extract or not value from the
Record . |
void |
setReadOffset(ReadOffset readOffset)
Define the offset from which we want to read message.
|
void |
setStreamReceiverOptions(StreamReceiver.StreamReceiverOptions<String,?> streamReceiverOptions)
Set
ReactiveStreamOperations used to customize the StreamReceiver . |
afterSingletonsInstantiated, buildErrorMessage, doStop, getErrorChannel, getErrorMessageAttributes, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisher
destroy, doStop, getPhase, getRole, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stop
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getBeanName, getComponentName
public ReactiveRedisStreamMessageProducer(ReactiveRedisConnectionFactory reactiveConnectionFactory, String streamKey)
public void setReadOffset(ReadOffset readOffset)
ReadOffset.latest()
is used.
ReadOffset.latest()
is equal to '$', which is the Id used with XREAD
to get new data added to
the stream. Note that when switching to the Consumer Group feature, we set it to
ReadOffset.lastConsumed()
if it is still equal to ReadOffset.latest()
.readOffset
- the desired offsetpublic void setExtractPayload(boolean extractPayload)
Record
.extractPayload
- default truepublic void setAutoAck(boolean autoAck)
true
by default.autoAck
- the acknowledge option.public void setConsumerGroup(@Nullable String consumerGroup)
createConsumerGroup
. If not set, the defined bean name IntegrationObjectSupport.getBeanName()
is used.consumerGroup
- the Consumer Group on which this adapter should register to listen messages.public void setConsumerName(@Nullable String consumerName)
consumerName
- the consumer name in the Consumer Grouppublic void setCreateConsumerGroup(boolean createConsumerGroup)
MKSTREAM
.createConsumerGroup
- specify if we should create the Consumer Group, false
by defaultpublic void setStreamReceiverOptions(@Nullable StreamReceiver.StreamReceiverOptions<String,?> streamReceiverOptions)
ReactiveStreamOperations
used to customize the StreamReceiver
.
It provides a way to set the polling timeout and the serialization context.
By default the polling timeout is set to infinite and
StringRedisSerializer
is used.streamReceiverOptions
- the desired receiver optionspublic String getComponentType()
IntegrationObjectSupport
getComponentType
in interface NamedComponent
getComponentType
in class IntegrationObjectSupport
protected void onInit()
IntegrationObjectSupport
onInit
in class MessageProducerSupport
protected void doStart()
MessageProducerSupport
doStart
in class MessageProducerSupport