Class ReactiveRedisStreamMessageProducer
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.endpoint.MessageProducerSupport
org.springframework.integration.redis.inbound.ReactiveRedisStreamMessageProducer
- All Implemented Interfaces:
Aware
,BeanFactoryAware
,BeanNameAware
,DisposableBean
,InitializingBean
,SmartInitializingSingleton
,ApplicationContextAware
,Lifecycle
,Phased
,SmartLifecycle
,ExpressionCapable
,MessageProducer
,IntegrationPattern
,NamedComponent
,ManageableLifecycle
,ManageableSmartLifecycle
,TrackableComponent
public class ReactiveRedisStreamMessageProducer extends MessageProducerSupport
A
MessageProducerSupport
for reading messages from a Redis Stream and publishing them into the provided
output channel.
By default this adapter reads message as a standalone client XREAD
(Redis command) but can be switched to a
Consumer Group feature XREADGROUP
by setting consumerName
field.
By default the Consumer Group name is the id of this bean IntegrationObjectSupport.getBeanName()
.- Since:
- 5.4
- Author:
- Attoumane Ahamadi, Artem Bilan, Rohan Mukesh
-
Field Summary
Fields inherited from class org.springframework.integration.endpoint.AbstractEndpoint
lifecycleCondition, lifecycleLock
Fields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, logger
-
Constructor Summary
Constructors Constructor Description ReactiveRedisStreamMessageProducer(ReactiveRedisConnectionFactory reactiveConnectionFactory, String streamKey)
-
Method Summary
Modifier and Type Method Description protected void
doStart()
Take no action by default.String
getComponentType()
Subclasses may implement this method to provide component type information.protected void
onInit()
Subclasses may implement this for initialization logic.void
setAutoAck(boolean autoAck)
Set whether or not acknowledge message read in the Consumer Group.void
setBatchSize(int recordsPerPoll)
Configure a batch size for the COUNT option during reading.void
setConsumerGroup(String consumerGroup)
Set the name of the Consumer Group.void
setConsumerName(String consumerName)
Set the name of the consumer.void
setCreateConsumerGroup(boolean createConsumerGroup)
Create the Consumer Group if and only if it does not exist.void
setExtractPayload(boolean extractPayload)
Configure this channel adapter to extract or not value from theRecord
.void
setObjectMapper(HashMapper<?,?,?> hashMapper)
Configure a hash mapper.void
setOnErrorResume(Function<? super Throwable,? extends org.reactivestreams.Publisher<Void>> resumeFunction)
Configure a resume Function to resume the main sequence when polling the stream fails.void
setPollTimeout(Duration pollTimeout)
Configure a poll timeout for the BLOCK option during reading.void
setReadOffset(ReadOffset readOffset)
Define the offset from which we want to read message.void
setSerializer(RedisSerializationContext.SerializationPair<?> pair)
Configure a key, hash key and hash value serializer.void
setStreamReceiverOptions(StreamReceiver.StreamReceiverOptions<String,?> streamReceiverOptions)
SetReactiveStreamOperations
used to customize theStreamReceiver
.void
setTargetType(Class<?> targetType)
Configure a hash target type.Methods inherited from class org.springframework.integration.endpoint.MessageProducerSupport
afterSingletonsInstantiated, buildErrorMessage, doStop, getErrorChannel, getErrorMessageAttributes, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisher
Methods inherited from class org.springframework.integration.endpoint.AbstractEndpoint
destroy, doStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stop
Methods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentName
-
Constructor Details
-
ReactiveRedisStreamMessageProducer
public ReactiveRedisStreamMessageProducer(ReactiveRedisConnectionFactory reactiveConnectionFactory, String streamKey)
-
-
Method Details
-
setReadOffset
Define the offset from which we want to read message. By default theReadOffset.latest()
is used.ReadOffset.latest()
is equal to '$', which is the Id used withXREAD
to get new data added to the stream. Note that when switching to the Consumer Group feature, we set it toReadOffset.lastConsumed()
if it is still equal toReadOffset.latest()
.- Parameters:
readOffset
- the desired offset
-
setExtractPayload
public void setExtractPayload(boolean extractPayload)Configure this channel adapter to extract or not value from theRecord
.- Parameters:
extractPayload
- default true
-
setAutoAck
public void setAutoAck(boolean autoAck)Set whether or not acknowledge message read in the Consumer Group.true
by default.- Parameters:
autoAck
- the acknowledge option.
-
setConsumerGroup
Set the name of the Consumer Group. It is possible to create that Consumer Group if desired, see:createConsumerGroup
. If not set, the defined bean nameIntegrationObjectSupport.getBeanName()
is used.- Parameters:
consumerGroup
- the Consumer Group on which this adapter should register to listen messages.
-
setConsumerName
Set the name of the consumer. When a consumer name is provided, this adapter is switched to the Consumer Group feature. Note that this value should be unique in the group.- Parameters:
consumerName
- the consumer name in the Consumer Group
-
setCreateConsumerGroup
public void setCreateConsumerGroup(boolean createConsumerGroup)Create the Consumer Group if and only if it does not exist. During the creation we also create the stream, seeMKSTREAM
.- Parameters:
createConsumerGroup
- specify if we should create the Consumer Group,false
by default
-
setStreamReceiverOptions
public void setStreamReceiverOptions(@Nullable StreamReceiver.StreamReceiverOptions<String,?> streamReceiverOptions)SetReactiveStreamOperations
used to customize theStreamReceiver
. It provides a way to set the polling timeout and the serialization context. By default the polling timeout is set to infinite andStringRedisSerializer
is used. Mutually exclusive with 'pollTimeout', 'batchSize', 'onErrorResume', 'serializer', 'targetType', 'objectMapper'.- Parameters:
streamReceiverOptions
- the desired receiver options
-
setPollTimeout
Configure a poll timeout for the BLOCK option during reading. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions)
.- Parameters:
pollTimeout
- the timeout for polling.- Since:
- 5.5
- See Also:
StreamReceiver.StreamReceiverOptionsBuilder.pollTimeout(Duration)
-
setBatchSize
public void setBatchSize(int recordsPerPoll)Configure a batch size for the COUNT option during reading. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions)
.- Parameters:
recordsPerPoll
- must be greater zero.- Since:
- 5.5
- See Also:
StreamReceiver.StreamReceiverOptionsBuilder.batchSize(int)
-
setOnErrorResume
public void setOnErrorResume(Function<? super Throwable,? extends org.reactivestreams.Publisher<Void>> resumeFunction)Configure a resume Function to resume the main sequence when polling the stream fails. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions)
. By default this function extract the failedRecord
and sends anErrorMessage
to the providedMessageProducerSupport.setErrorChannel(org.springframework.messaging.MessageChannel)
. The failed message for this record may have aIntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
header when manual acknowledgment is configured for this message producer.- Parameters:
resumeFunction
- must not be null.- Since:
- 5.5
- See Also:
StreamReceiver.StreamReceiverOptionsBuilder.onErrorResume(Function)
-
setSerializer
Configure a key, hash key and hash value serializer. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions)
.- Parameters:
pair
- must not be null.- Since:
- 5.5
- See Also:
StreamReceiver.StreamReceiverOptionsBuilder.serializer(RedisSerializationContext)
-
setTargetType
Configure a hash target type. Changes the emitted Record type to ObjectRecord. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions)
.- Parameters:
targetType
- must not be null.- Since:
- 5.5
- See Also:
StreamReceiver.StreamReceiverOptionsBuilder.targetType(Class)
-
setObjectMapper
Configure a hash mapper. Mutually exclusive withsetStreamReceiverOptions(StreamReceiver.StreamReceiverOptions)
.- Parameters:
hashMapper
- must not be null.- Since:
- 5.5
- See Also:
StreamReceiver.StreamReceiverOptionsBuilder.objectMapper(HashMapper)
-
getComponentType
Description copied from class:IntegrationObjectSupport
Subclasses may implement this method to provide component type information.- Specified by:
getComponentType
in interfaceNamedComponent
- Overrides:
getComponentType
in classIntegrationObjectSupport
-
onInit
protected void onInit()Description copied from class:IntegrationObjectSupport
Subclasses may implement this for initialization logic.- Overrides:
onInit
in classMessageProducerSupport
-
doStart
protected void doStart()Description copied from class:MessageProducerSupport
Take no action by default. Subclasses may override this if they need lifecycle-managed behavior. Protected by 'lifecycleLock'.- Overrides:
doStart
in classMessageProducerSupport
-