Redis Support
Spring Integration 2.1 introduced support for Redis: “an open source advanced key-value store”.
This support comes in the form of a Redis-based MessageStore as well as publish-subscribe messaging adapters that are supported by Redis through its PUBLISH, SUBSCRIBE, and UNSUBSCRIBE commands.
You need to include this dependency into your project:
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-redis</artifactId>
    <version>5.5.20</version>
</dependency>compile "org.springframework.integration:spring-integration-redis:5.5.20"
You also need to include Redis client dependency, e.g. Lettuce.
To download, install, and run Redis, see the Redis documentation.
Connecting to Redis
To begin interacting with Redis, you first need to connect to it.
Spring Integration uses support provided by another Spring project, Spring Data Redis, which provides typical Spring constructs: ConnectionFactory and Template.
Those abstractions simplify integration with several Redis client Java APIs.
Currently Spring Data Redis supports Jedis and Lettuce.
Using RedisConnectionFactory
To connect to Redis, you can use one of the implementations of the RedisConnectionFactory interface.
The following listing shows the interface definition:
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
    /**
     * Provides a suitable connection for interacting with Redis.
     * @return connection for interacting with Redis.
     */
    RedisConnection getConnection();
}
The following example shows how to create a LettuceConnectionFactory in Java:
LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();
The following example shows how to create a LettuceConnectionFactory in Spring’s XML configuration:
<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379" />
</bean>The implementations of RedisConnectionFactory provide a set of properties, such as port and host, that you can set if needed.
Once you have an instance of RedisConnectionFactory, you can create an instance of RedisTemplate and inject it with the RedisConnectionFactory.
Using RedisTemplate
As with other template classes in Spring (such as JdbcTemplate and JmsTemplate) RedisTemplate is a helper class that simplifies Redis data access code.
For more information about RedisTemplate and its variations (such as StringRedisTemplate) see the Spring Data Redis documentation.
The following example shows how to create an instance of RedisTemplate in Java:
RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);
The following example shows how to create an instance of RedisTemplate in Spring’s XML configuration:
<bean id="redisTemplate"
         class="org.springframework.data.redis.core.RedisTemplate">
    <property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>Messaging with Redis
As mentioned in the introduction, Redis provides support for publish-subscribe messaging through its PUBLISH, SUBSCRIBE, and UNSUBSCRIBE commands.
As with JMS and AMQP, Spring Integration provides message channels and adapters for sending and receiving messages through Redis.
Redis Publish/Subscribe channel
Similarly to JMS, there are cases where both the producer and consumer are intended to be part of the same application, running within the same process. You can accomplished this by using a pair of inbound and outbound channel adapters. However, as with Spring Integration’s JMS support, there is a simpler way to address this use case. You can create a publish-subscribe channel, as the following example shows:
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>A publish-subscribe-channel behaves much like a normal <publish-subscribe-channel/> element from the main Spring Integration namespace.
It can be referenced by both the input-channel and the output-channel attributes of any endpoint.
The difference is that this channel is backed by a Redis topic name: a String value specified by the topic-name attribute.
However, unlike JMS, this topic does not have to be created in advance or even auto-created by Redis.
In Redis, topics are simple String values that play the role of an address.
The producer and consumer can communicate by using the same String value as their topic name.
A simple subscription to this channel means that asynchronous publish-subscribe messaging is possible between the producing and consuming endpoints.
However, unlike the asynchronous message channels created by adding a <queue/> element within a simple Spring Integration <channel/> element, the messages are not stored in an in-memory queue.
Instead, those messages are passed through Redis, which lets you rely on its support for persistence and clustering as well as its interoperability with other non-Java platforms.
Redis Inbound Channel Adapter
The Redis inbound channel adapter (RedisInboundChannelAdapter) adapts incoming Redis messages into Spring messages in the same way as other inbound adapters.
It receives platform-specific messages (Redis in this case) and converts them to Spring messages by using a MessageConverter strategy.
The following example shows how to configure a Redis inbound channel adapter:
<int-redis:inbound-channel-adapter id="redisAdapter"
       topics="thing1, thing2"
       channel="receiveChannel"
       error-channel="testErrorChannel"
       message-converter="testConverter" />
<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379" />
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />The preceding example shows a simple but complete configuration of a Redis inbound channel adapter.
Note that the preceding configuration relies on the familiar Spring paradigm of auto-discovering certain beans.
In this case, the redisConnectionFactory is implicitly injected into the adapter.
You can specify it explicitly by using the connection-factory attribute instead.
Also, note that the preceding configuration injects the adapter with a custom MessageConverter.
The approach is similar to JMS, where MessageConverter instances are used to convert between Redis messages and the Spring Integration message payloads.
The default is a SimpleMessageConverter.
Inbound adapters can subscribe to multiple topic names, hence the comma-separated set of values in the topics attribute.
Since version 3.0, the inbound adapter, in addition to the existing topics attribute, now has the topic-patterns attribute.
This attribute contains a comma-separated set of Redis topic patterns.
For more information regarding Redis publish-subscribe, see Redis Pub/Sub.
Inbound adapters can use a RedisSerializer to deserialize the body of Redis messages.
The serializer attribute of the <int-redis:inbound-channel-adapter> can be set to an empty string, which results in a null value for the RedisSerializer property.
In this case, the raw byte[] bodies of Redis messages are provided as the message payloads.
Since version 5.0, you can provide an Executor instance to the inbound adapter by using the task-executor attribute of the <int-redis:inbound-channel-adapter>.
Also, the received Spring Integration messages now have the RedisHeaders.MESSAGE_SOURCE header to indicate the source of the published message: topic or pattern.
You can use this downstream for routing logic.
Redis Outbound Channel Adapter
The Redis outbound channel adapter adapts outgoing Spring Integration messages into Redis messages in the same way as other outbound adapters.
It receives Spring Integration messages and converts them to platform-specific messages (Redis in this case) by using a MessageConverter strategy.
The following example shows how to configure a Redis outbound channel adapter:
<int-redis:outbound-channel-adapter id="outboundAdapter"
    channel="sendChannel"
    topic="thing1"
    message-converter="testConverter"/>
<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379"/>
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />The configuration parallels the Redis inbound channel adapter.
The adapter is implicitly injected with a RedisConnectionFactory, which is defined with redisConnectionFactory as its bean name.
This example also includes the optional (and custom) MessageConverter (the testConverter bean).
Since Spring Integration 3.0, the <int-redis:outbound-channel-adapter> offers an alternative to the topic attribute: You can use the topic-expression attribute to determine the Redis topic for the message at runtime.
These attributes are mutually exclusive.
Redis Queue Inbound Channel Adapter
Spring Integration 3.0 introduced a queue inbound channel adapter to “pop” messages from a Redis list. By default, it uses “right pop”, but you can configure it to use “left pop” instead. The adapter is message-driven. It uses an internal listener thread and does not use a poller.
The following listing shows all the available attributes for queue-inbound-channel-adapter:
<int-redis:queue-inbound-channel-adapter id=""  (1)
                    channel=""  (2)
                    auto-startup=""  (3)
                    phase=""  (4)
                    connection-factory=""  (5)
                    queue=""  (6)
                    error-channel=""  (7)
                    serializer=""  (8)
                    receive-timeout=""  (9)
                    recovery-interval=""  (10)
                    expect-message=""  (11)
                    task-executor=""  (12)
                    right-pop=""/>  (13)| 1 | The component bean name.
If you do not provide the channelattribute, aDirectChannelis created and registered in the application context with thisidattribute as the bean name.
In this case, the endpoint itself is registered with the bean nameidplus.adapter.
(If the bean name werething1, the endpoint is registered asthing1.adapter.) | 
| 2 | The MessageChannelto which to sendMessageinstances from this Endpoint. | 
| 3 | A SmartLifecycleattribute to specify whether this endpoint should start automatically after the application context start or not.
It defaults totrue. | 
| 4 | A SmartLifecycleattribute to specify the phase in which this endpoint is started.
It defaults to0. | 
| 5 | A reference to a RedisConnectionFactorybean.
It defaults toredisConnectionFactory. | 
| 6 | The name of the Redis list on which the queue-based 'pop' operation is performed to get Redis messages. | 
| 7 | The MessageChannelto which to sendErrorMessageinstances when exceptions are received from the listening task of the endpoint.
By default, the underlyingMessagePublishingErrorHandleruses the defaulterrorChannelfrom the application context. | 
| 8 | The RedisSerializerbean reference.
It can be an empty string, which means 'no serializer'.
In this case, the rawbyte[]from the inbound Redis message is sent to thechannelas theMessagepayload.
By default it is aJdkSerializationRedisSerializer. | 
| 9 | The timeout in milliseconds for 'pop' operation to wait for a Redis message from the queue. The default is 1 second. | 
| 10 | The time in milliseconds for which the listener task should sleep after exceptions on the 'pop' operation, before restarting the listener task. | 
| 11 | Specifies whether this endpoint expects data from the Redis queue to contain entire Messageinstances.
If this attribute is set totrue, theserializercannot be an empty string, because messages require some form of deserialization (JDK serialization by default).
Its default isfalse. | 
| 12 | A reference to a Spring TaskExecutor(or standard JDK 1.5+Executor) bean.
It is used for the underlying listening task.
It defaults to aSimpleAsyncTaskExecutor. | 
| 13 | Specifies whether this endpoint should use “right pop” (when true) or “left pop” (whenfalse) to read messages from the Redis list.
Iftrue, the Redis List acts as aFIFOqueue when used with a default Redis queue outbound channel adapter.
Set it tofalseto use with software that writes to the list with “right push” or to achieve a stack-like message order.
Its default istrue.
Since version 4.3. | 
| The task-executorhas to be configured with more than one thread for processing; otherwise there is a possible deadlock when theRedisQueueMessageDrivenEndpointtries to restart the listener task after an error.
TheerrorChannelcan be used to process those errors, to avoid restarts, but it preferable to not expose your application to the possible deadlock situation.
See Spring Framework Reference Manual for possibleTaskExecutorimplementations. | 
Redis Queue Outbound Channel Adapter
Spring Integration 3.0 introduced a queue outbound channel adapter to “push” to a Redis list from Spring Integration messages.
By default, it uses “left push”, but you can configure it to use “right push” instead.
The following listing shows all the available attributes for a Redis queue-outbound-channel-adapter:
<int-redis:queue-outbound-channel-adapter id=""  (1)
                    channel=""  (2)
                    connection-factory=""  (3)
                    queue=""  (4)
                    queue-expression=""  (5)
                    serializer=""  (6)
                    extract-payload=""  (7)
                    left-push=""/>  (8)| 1 | The component bean name.
If you do not provide the channelattribute, aDirectChannelis created and registered in the application context with thisidattribute as the bean name.
In this case, the endpoint is registered with a bean name ofidplus.adapter.
(If the bean name werething1, the endpoint is registered asthing1.adapter.) | 
| 2 | The MessageChannelfrom which this endpoint receivesMessageinstances. | 
| 3 | A reference to a RedisConnectionFactorybean.
It defaults toredisConnectionFactory. | 
| 4 | The name of the Redis list on which the queue-based 'push' operation is performed to send Redis messages.
This attribute is mutually exclusive with queue-expression. | 
| 5 | A SpEL Expressionto determine the name of the Redis list.
It uses the incomingMessageat runtime as the#rootvariable.
This attribute is mutually exclusive withqueue. | 
| 6 | A RedisSerializerbean reference.
It defaults to aJdkSerializationRedisSerializer.
However, forStringpayloads, aStringRedisSerializeris used, if aserializerreference is not provided. | 
| 7 | Specifies whether this endpoint should send only the payload or the entire Messageto the Redis queue.
It defaults totrue. | 
| 8 | Specifies whether this endpoint should use “left push” (when true) or “right push” (whenfalse) to write messages to the Redis list.
Iftrue, the Redis list acts as aFIFOqueue when used with a default Redis queue inbound channel adapter.
Set it tofalseto use with software that reads from the list with “left pop” or to achieve a stack-like message order.
It defaults totrue.
Since version 4.3. | 
Redis Application Events
Since Spring Integration 3.0, the Redis module provides an implementation of IntegrationEvent, which, in turn, is a org.springframework.context.ApplicationEvent.
The RedisExceptionEvent encapsulates exceptions from Redis operations (with the endpoint being the “source” of the event).
For example, the <int-redis:queue-inbound-channel-adapter/> emits those events after catching exceptions from the BoundListOperations.rightPop operation.
The exception may be any generic org.springframework.data.redis.RedisSystemException or a org.springframework.data.redis.RedisConnectionFailureException.
Handling these events with an <int-event:inbound-channel-adapter/> can be useful to determine problems with background Redis tasks and to take administrative actions.
Redis Message Store
As described in the Enterprise Integration Patterns (EIP) book, a message store lets you persist messages.
This can be useful when dealing with components that have a capability to buffer messages (aggregator, resequencer, and others) when reliability is a concern.
In Spring Integration, the MessageStore strategy also provides the foundation for the claim check pattern, which is described in EIP as well.
Spring Integration’s Redis module provides the RedisMessageStore.
The following example shows how to use it with a aggregator:
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
    <constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="redisMessageStore"/>The preceding example is a bean configuration, and it expects a RedisConnectionFactory as a constructor argument.
By default, the RedisMessageStore uses Java serialization to serialize the message.
However, if you want to use a different serialization technique (such as JSON), you can provide your own serializer by setting the valueSerializer property of the RedisMessageStore.
Starting with version 4.3.10, the Framework provides Jackson serializer and deserializer implementations for Message instances and MessageHeaders instances — MessageJacksonDeserializer and MessageHeadersJacksonSerializer, respectively.
They have to be configured with the SimpleModule options for the ObjectMapper.
In addition, you should set enableDefaultTyping on the ObjectMapper to add type information for each serialized complex object (if you trust the source).
That type information is then used during deserialization.
The framework provides a utility method called JacksonJsonUtils.messagingAwareMapper(), which is already supplied with all the previously mentioned properties and serializers.
This utility method comes with the trustedPackages argument to limit Java packages for deserialization to avoid security vulnerabilities.
The default trusted packages: java.util, java.lang, org.springframework.messaging.support, org.springframework.integration.support, org.springframework.integration.message, org.springframework.integration.store.
To manage JSON serialization in the RedisMessageStore, you must configure it in a fashion similar to the following example:
RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);
Starting with version 4.3.12, RedisMessageStore supports the prefix option to allow distinguishing between instances of the store on the same Redis server.
Redis Channel Message Stores
The RedisMessageStore shown earlier maintains each group as a value under a single key (the group ID).
While you can use this to back a QueueChannel for persistence, a specialized RedisChannelMessageStore is provided for that purpose (since version 4.0).
This store uses a LIST for each channel, LPUSH when sending messages, and RPOP when receiving messages.
By default, this store also uses JDK serialization, but you can modify the value serializer, as described earlier.
We recommend using this store backing channels, instead of using the general RedisMessageStore.
The following example defines a Redis message store and uses it in a channel with a queue:
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
	<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="redisMessageStore"/>
<int:channel>The keys used to store the data have the form: <storeBeanName>:<channelId> (in the preceding example, redisMessageStore:somePersistentQueueChannel).
In addition, a subclass RedisChannelPriorityMessageStore is also provided.
When you use this with a QueueChannel, the messages are received in (FIFO) priority order.
It uses the standard IntegrationMessageHeaderAccessor.PRIORITY header and supports priority values (0 - 9).
Messages with other priorities (and messages with no priority) are retrieved in FIFO order after any messages with priority.
| These stores implement only BasicMessageGroupStoreand do not implementMessageGroupStore.
They can be used only for situations such as backing aQueueChannel. | 
Redis Metadata Store
Spring Integration 3.0 introduced a new Redis-based MetadataStore (see Metadata Store) implementation.
You can use the RedisMetadataStore to maintain the state of a MetadataStore across application restarts.
You can use this new MetadataStore implementation with adapters such as:
To instruct these adapters to use the new RedisMetadataStore, declare a Spring bean named metadataStore.
The Feed inbound channel adapter and the feed inbound channel adapter both automatically pick up and use the declared RedisMetadataStore.
The following example shows how to declare such a bean:
<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
    <constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>The RedisMetadataStore is backed by RedisProperties.
Interaction with it uses BoundHashOperations, which, in turn, requires a key for the entire Properties store.
In the case of the MetadataStore, this key plays the role of a region, which is useful in a distributed environment, when several applications use the same Redis server.
By default, this key has a value of MetaData.
Starting with version 4.0, this store implements ConcurrentMetadataStore, letting it be reliably shared across multiple application instances where only one instance is allowed to store or modify a key’s value.
| You cannot use the RedisMetadataStore.replace()(for example, in theAbstractPersistentAcceptOnceFileListFilter) with a Redis cluster, since theWATCHcommand for atomicity is not currently supported. | 
Redis Store Inbound Channel Adapter
The Redis store inbound channel adapter is a polling consumer that reads data from a Redis collection and sends it as a Message payload.
The following example shows how to configure a Redis store inbound channel adapter:
<int-redis:store-inbound-channel-adapter id="listAdapter"
    connection-factory="redisConnectionFactory"
    key="myCollection"
    channel="redisChannel"
    collection-type="LIST" >
    <int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>The preceding example shows how to configure a Redis store inbound channel adapter by using the store-inbound-channel-adapter element, providing values for various attributes, such as:
- 
keyorkey-expression: The name of the key for the collection being used.
- 
collection-type: An enumeration of the collection types supported by this adapter. The supported Collections areLIST,SET,ZSET,PROPERTIES, andMAP.
- 
connection-factory: Reference to an instance ofo.s.data.redis.connection.RedisConnectionFactory.
- 
redis-template: Reference to an instance ofo.s.data.redis.core.RedisTemplate.
- 
Other attributes that are common across all other inbound adapters (such as 'channel'). 
| You cannot set both redis-templateandconnection-factory. | 
| By default, the adapter uses a  The  | 
Because it has a literal value for the key, the preceding example is relatively simple and static.
Sometimes, you may need to change the value of the key at runtime based on some condition.
To do so, use key-expression instead, where the provided expression can be any valid SpEL expression.
Also, you may wish to perform some post-processing on the successfully processed data that was read from the Redis collection.
For example, you may want to move or remove the value after its been processed.
You can do so by using the transaction synchronization feature that was added with Spring Integration 2.2.
The following example uses key-expression and transaction synchronization:
<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
        connection-factory="redisConnectionFactory"
        key-expression="'presidents'"
        channel="otherRedisChannel"
        auto-startup="false"
        collection-type="ZSET">
            <int:poller fixed-rate="1000" max-messages-per-poll="2">
                <int:transactional synchronization-factory="syncFactory"/>
            </int:poller>
</int-redis:store-inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
	<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>You can declare your poller to be transactional by using a transactional element.
This element can reference a real transaction manager (for example, if some other part of your flow invokes JDBC).
If you do not have a “real” transaction, you can use an o.s.i.transaction.PseudoTransactionManager, which is an implementation of Spring’s PlatformTransactionManager and enables the use of the transaction synchronization features of the Redis adapter when there is no actual transaction.
| This does not make the Redis activities themselves transactional. It lets the synchronization of actions be taken before or after success (commit) or after failure (rollback). | 
Once your poller is transactional, you can set an instance of the o.s.i.transaction.TransactionSynchronizationFactory on the transactional element.
TransactionSynchronizationFactory creates an instance of the TransactionSynchronization.
For your convenience, we have exposed a default SpEL-based TransactionSynchronizationFactory, which lets you configure SpEL expressions, with their execution being coordinated (synchronized) with a transaction.
Expressions for before-commit, after-commit, and after-rollback are supported, together with channels (one for each kind of event) where the evaluation result (if any) is sent.
For each child element, you can specify expression and channel attributes.
If only the channel attribute is present, the received message is sent there as part of the particular synchronization scenario.
If only the expression attribute is present and the result of an expression is a non-null value, a message with the result as the payload is generated and sent to a default channel (NullChannel) and appears in the logs (at the DEBUG level).
If you want the evaluation result to go to a specific channel, add a channel attribute.
If the result of an expression is null or void, no message is generated.
For more information about transaction synchronization, see Transaction Synchronization.
RedisStore Outbound Channel Adapter
The RedisStore outbound channel adapter lets you write a message payload to a Redis collection, as the following example shows:
<int-redis:store-outbound-channel-adapter id="redisListAdapter"
          collection-type="LIST"
          channel="requestChannel"
          key="myCollection" />The preceding configuration a Redis store outbound channel adapter by using the store-inbound-channel-adapter element.
It provides values for various attributes, such as:
- 
keyorkey-expression: The name of the key for the collection being used.
- 
extract-payload-elements: If set totrue(the default) and the payload is an instance of a “multi-value” object (that is, aCollectionor aMap), it is stored by using “addAll” and “putAll” semantics. Otherwise, if set tofalse, the payload is stored as a single entry regardless of its type. If the payload is not an instance of a “multi-value” object, the value of this attribute is ignored and the payload is always stored as a single entry.
- 
collection-type: An enumeration of theCollectiontypes supported by this adapter. The supported Collections areLIST,SET,ZSET,PROPERTIES, andMAP.
- 
map-key-expression: SpEL expression that returns the name of the key for the entry being stored. It applies only if thecollection-typeisMAPorPROPERTIESand 'extract-payload-elements' is false.
- 
connection-factory: Reference to an instance ofo.s.data.redis.connection.RedisConnectionFactory.
- 
redis-template: Reference to an instance ofo.s.data.redis.core.RedisTemplate.
- 
Other attributes that are common across all other inbound adapters (such as 'channel'). 
| You cannot set both redis-templateandconnection-factory. | 
| By default, the adapter uses a StringRedisTemplate.
This usesStringRedisSerializerinstances for keys, values, hash keys, and hash values.
However, ifextract-payload-elementsis set tofalse, aRedisTemplatethat hasStringRedisSerializerinstances for keys and hash keys andJdkSerializationRedisSerializerinstances s for values and hash values will be used.
With the JDK serializer, it is important to understand that Java serialization is used for all values, regardless of whether the value is actually a collection or not.
If you need more control over the serialization of values, consider providing your ownRedisTemplaterather than relying upon these defaults. | 
Because it has literal values for the key and other attributes, the preceding example is relatively simple and static.
Sometimes, you may need to change the values dynamically at runtime based on some condition.
To do so, use their -expression equivalents (key-expression, map-key-expression, and so on), where the provided expression can be any valid SpEL expression.
Redis Outbound Command Gateway
Spring Integration 4.0 introduced the Redis command gateway to let you perform any standard Redis command by using the generic RedisConnection#execute method.
The following listing shows the available attributes for the Redis outbound gateway:
<int-redis:outbound-gateway
        request-channel=""  (1)
        reply-channel=""  (2)
        requires-reply=""  (3)
        reply-timeout=""  (4)
        connection-factory=""  (5)
        redis-template=""  (6)
        arguments-serializer=""  (7)
        command-expression=""  (8)
        argument-expressions=""  (9)
        use-command-variable=""  (10)
        arguments-strategy="" /> (11)| 1 | The MessageChannelfrom which this endpoint receivesMessageinstances. | 
| 2 | The MessageChannelwhere this endpoint sends replyMessageinstances. | 
| 3 | Specifies whether this outbound gateway must return a non-null value.
It defaults to true.
AReplyRequiredExceptionis thrown when Redis returns anullvalue. | 
| 4 | The timeout (in milliseconds) to wait until the reply message is sent. It is typically applied for queue-based limited reply-channels. | 
| 5 | A reference to a RedisConnectionFactorybean.
It defaults toredisConnectionFactory.
It is mutually exclusive with 'redis-template' attribute. | 
| 6 | A reference to a RedisTemplatebean.
It is mutually exclusive with 'connection-factory' attribute. | 
| 7 | A reference to an instance of org.springframework.data.redis.serializer.RedisSerializer.
It is used to serialize each command argument to byte[], if necessary. | 
| 8 | The SpEL expression that returns the command key.
It defaults to the redis_commandmessage header.
It must not evaluate tonull. | 
| 9 | Comma-separated SpEL expressions that are evaluated as command arguments.
Mutually exclusive with the arguments-strategyattribute.
If you provide neither attribute, thepayloadis used as the command arguments.
The argument expressions can evaluate to 'null' to support a variable number of arguments. | 
| 10 | A booleanflag to specify whether the evaluated Redis command string is made available as the#cmdvariable in the expression evaluation context in theo.s.i.redis.outbound.ExpressionArgumentsStrategywhenargument-expressionsis configured.
Otherwise this attribute is ignored. | 
| 11 | Reference to an instance of o.s.i.redis.outbound.ArgumentsStrategy.
It is mutually exclusive withargument-expressionsattribute.
If you provide neither attribute, thepayloadis used as the command arguments. | 
You can use the <int-redis:outbound-gateway> as a common component to perform any desired Redis operation.
The following example shows how to get incremented values from Redis atomic number:
<int-redis:outbound-gateway request-channel="requestChannel"
    reply-channel="replyChannel"
    command-expression="'INCR'"/>The Message payload should have a name of redisCounter, which may be provided by org.springframework.data.redis.support.atomic.RedisAtomicInteger bean definition.
The RedisConnection#execute method has a generic Object as its return type.
Real result depends on command type.
For example, MGET returns a List<byte[]>.
For more information about commands, their arguments and result type, see Redis Specification.
Redis Queue Outbound Gateway
Spring Integration introduced the Redis queue outbound gateway to perform request and reply scenarios.
It pushes a conversation UUID to the provided queue, pushes the value with that UUID as its key to a Redis list, and waits for the reply from a Redis list with a key of UUID' plus '.reply.
A different UUID is used for each interaction.
The following listing shows the available attributes for a Redis outbound gateway:
<int-redis:queue-outbound-gateway
        request-channel=""  (1)
        reply-channel=""  (2)
        requires-reply=""  (3)
        reply-timeout=""  (4)
        connection-factory=""  (5)
        queue=""  (6)
        order=""  (7)
        serializer=""  (8)
        extract-payload=""/>  (9)| 1 | The MessageChannelfrom which this endpoint receivesMessageinstances. | 
| 2 | The MessageChannelwhere this endpoint sends replyMessageinstances. | 
| 3 | Specifies whether this outbound gateway must return a non-null value.
This value is falseby default.
Otherwise, aReplyRequiredExceptionis thrown when Redis returns anullvalue. | 
| 4 | The timeout (in milliseconds) to wait until the reply message is sent. It is typically applied for queue-based limited reply-channels. | 
| 5 | A reference to a RedisConnectionFactorybean.
It defaults toredisConnectionFactory.
It is mutually exclusive with the 'redis-template' attribute. | 
| 6 | The name of the Redis list to which the outbound gateway sends a conversation UUID. | 
| 7 | The order of this outbound gateway when multiple gateways are registered. | 
| 8 | The RedisSerializerbean reference.
It can be an empty string, which means “no serializer”.
In this case, the rawbyte[]from the inbound Redis message is sent to thechannelas theMessagepayload.
By default, it is aJdkSerializationRedisSerializer. | 
| 9 | Specifies whether this endpoint expects data from the Redis queue to contain entire Messageinstances.
If this attribute is set totrue, theserializercannot be an empty string, because messages require some form of deserialization (JDK serialization by default). | 
Redis Queue Inbound Gateway
Spring Integration 4.1 introduced the Redis queue inbound gateway to perform request and reply scenarios.
It pops a conversation UUID from the provided queue, pops the value with that UUID as its key from the Redis list, and pushes the reply to the Redis list with a key of UUID plus .reply.
The following listing shows the available attributes for a Redis queue inbound gateway:
<int-redis:queue-inbound-gateway
        request-channel=""  (1)
        reply-channel=""  (2)
        executor=""  (3)
        reply-timeout=""  (4)
        connection-factory=""  (5)
        queue=""  (6)
        order=""  (7)
        serializer=""  (8)
        receive-timeout=""  (9)
        expect-message=""  (10)
        recovery-interval=""/>  (11)| 1 | The MessageChannelwhere this endpoint sendsMessageinstances created from the Redis data. | 
| 2 | The MessageChannelfrom where this endpoint waits for replyMessageinstances.
Optional - thereplyChannelheader is still in use. | 
| 3 | A reference to a Spring TaskExecutor(or a standard JDKExecutor) bean.
It is used for the underlying listening task.
It defaults to aSimpleAsyncTaskExecutor. | 
| 4 | The timeout (in milliseconds) to wait until the reply message is sent. It is typically applied for queue-based limited reply-channels. | 
| 5 | A reference to a RedisConnectionFactorybean.
It defaults toredisConnectionFactory.
It is mutually exclusive with 'redis-template' attribute. | 
| 6 | The name of the Redis list for the conversation UUID. | 
| 7 | The order of this inbound gateway when multiple gateways are registered. | 
| 8 | The RedisSerializerbean reference.
It can be an empty string, which means “no serializer”.
In this case, the rawbyte[]from the inbound Redis message is sent to thechannelas theMessagepayload.
It default to aJdkSerializationRedisSerializer.
(Note that, in releases before version 4.3, it was aStringRedisSerializerby default.
To restore that behavior, provide a reference to aStringRedisSerializer). | 
| 9 | The timeout (in milliseconds) to wait until the receive message is fetched. It is typically applied for queue-based limited request-channels. | 
| 10 | Specifies whether this endpoint expects data from the Redis queue to contain entire Messageinstances.
If this attribute is set totrue, theserializercannot be an empty string, because messages require some form of deserialization (JDK serialization by default). | 
| 11 | The time (in milliseconds) the listener task should sleep after exceptions on the “right pop” operation before restarting the listener task. | 
| The task-executorhas to be configured with more than one thread for processing; otherwise there is a possible deadlock when theRedisQueueMessageDrivenEndpointtries to restart the listener task after an error.
TheerrorChannelcan be used to process those errors, to avoid restarts, but it preferable to not expose your application to the possible deadlock situation.
See Spring Framework Reference Manual for possibleTaskExecutorimplementations. | 
Redis Stream Outbound Channel Adapter
Spring Integration 5.4 introduced Reactive Redis Stream outbound channel adapter to write Message payload into Redis stream.
Outbound Channel adapter uses ReactiveStreamOperations.add(…) to add a  Record to the stream.
The following example shows how to use Java configuration and Service class for Redis Stream Outbound Channel Adapter.
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
        ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
    ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
        new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
    reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
    reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
    reactiveStreamMessageHandler.setExtractPayload(true); (4)
    return reactiveStreamMessageHandler;
}
| 1 | Construct an instance of ReactiveRedisStreamMessageHandlerusingReactiveRedisConnectionFactoryand stream name to add records.
Another constructor variant is based on a SpEL expression to evaluate a stream key against a request message. | 
| 2 | Set a RedisSerializationContextused to serialize a record key and value before adding to the stream. | 
| 3 | Set HashMapperwhich provides contract between Java types and Redis hashes/maps. | 
| 4 | If 'true', channel adapter will extract payload from a request message for a stream record to add.
Or use the whole message as a value.
It defaults to true. | 
Redis Stream Inbound Channel Adapter
Spring Integration 5.4 introduced the Reactive Stream inbound channel adapter for reading messages from a Redis Stream.
Inbound channel adapter uses StreamReceiver.receive(…) or StreamReceiver.receiveAutoAck() based on auto acknowledgement flag to read record from Redis stream.
The following example shows how to use Java configuration for Redis Stream Inbound Channel Adapter.
@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
       ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
            new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
    messageProducer.setStreamReceiverOptions( (2)
                StreamReceiver.StreamReceiverOptions.builder()
                      .pollTimeout(Duration.ofMillis(100))
                      .build());
    messageProducer.setAutoStartup(true); (3)
    messageProducer.setAutoAck(false); (4)
    messageProducer.setCreateConsumerGroup(true); (5)
    messageProducer.setConsumerGroup("my-group"); (6)
    messageProducer.setConsumerName("my-consumer"); (7)
    messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
    messageProducer.setReadOffset(ReadOffset.latest()); (9)
    messageProducer.extractPayload(true); (10)
    return messageProducer;
}
| 1 | Construct an instance of ReactiveRedisStreamMessageProducerusingReactiveRedisConnectionFactoryand stream key to read records. | 
| 2 | A StreamReceiver.StreamReceiverOptionsto consume redis stream using reactive infrastructure. | 
| 3 | A SmartLifecycleattribute to specify whether this endpoint should start automatically after the application context start or not.
It defaults totrue.
Iffalse,RedisStreamMessageProducershould be started manuallymessageProducer.start(). | 
| 4 | If false, received messages are not auto acknowledged.
The acknowledgement of the message will be deferred to the client consuming message.
It defaults totrue. | 
| 5 | If true, a consumer group will be created.
During creation of consumer group stream will be created (if not exists yet), too.
Consumer group track message delivery and distinguish between consumers.
It defaults tofalse. | 
| 6 | Set Consumer Group name. It defaults to the defined bean name. | 
| 7 | Set Consumer name.
Reads message as my-consumerfrom groupmy-group. | 
| 8 | The message channel to which to send messages from this endpoint. | 
| 9 | Define the offset to read message.
It defaults to ReadOffset.latest(). | 
| 10 | If 'true', channel adapter will extract payload value from the Record.
Otherwise the wholeRecordis used as a payload.
It defaults totrue. | 
If the autoAck is set to false, the Record in Redis Stream is not acknowledge automatically by the Redis driver, instead an IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK header is added into a message to produce with a SimpleAcknowledgment instance as a value.
It is a target integration flow responsibility to call its acknowledge() callback whenever the business logic is done for the message based on such a record.
Similar logic is required even when an exception happens during deserialization and errorChannel is configured.
So, target error handler must decided to ack or nack such a failed message.
Alongside with IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, the ReactiveRedisStreamMessageProducer also populates these headers into the message to produce: RedisHeaders.STREAM_KEY, RedisHeaders.STREAM_MESSAGE_ID, RedisHeaders.CONSUMER_GROUP and RedisHeaders.CONSUMER.
Starting with version 5.5, you can configure StreamReceiver.StreamReceiverOptionsBuilder options explicitly on the ReactiveRedisStreamMessageProducer, including the newly introduced onErrorResume function, which is required if the Redis Stream consumer should continue polling when deserialization errors occur.
The default function sends a message to the error channel (if provided) with possible acknowledgement for the failed message as it is described above.
All these StreamReceiver.StreamReceiverOptionsBuilder are mutually exclusive with an externally provided StreamReceiver.StreamReceiverOptions.
Redis Lock Registry
Spring Integration 4.0 introduced the RedisLockRegistry.
Certain components (for example, aggregator and resequencer) use a lock obtained from a LockRegistry instance to ensure that only one thread manipulates a group at a time.
The DefaultLockRegistry performs this function within a single component.
You can now configure an external lock registry on these components.
When you use it with a shared MessageGroupStore, you can use the RedisLockRegistry to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.
When a lock is released by a local thread, another local thread can generally acquire the lock immediately. If a lock is released by a thread using a different registry instance, it can take up to 100ms to acquire the lock.
To avoid “hung” locks (when a server fails), the locks in this registry are expired after a default 60 seconds, but you can configure this value on the registry. Locks are normally held for a much smaller time.
| Because the keys can expire, an attempt to unlock an expired lock results in an exception being thrown. However, the resources protected by such a lock may have been compromised, so such exceptions should be considered to be severe. You should set the expiry at a large enough value to prevent this condition, but set it low enough that the lock can be recovered after a server failure in a reasonable amount of time. | 
Starting with version 5.0, the RedisLockRegistry implements ExpirableLockRegistry, which removes locks last acquired more than age ago and that are not currently locked.
String with version 5.5.6, the RedisLockRegistry is support automatically clean up cache for redisLocks in RedisLockRegistry.locks via RedisLockRegistry.setCacheCapacity().
See its JavaDocs for more information.
String with version 5.5.13, the RedisLockRegistry exposes a setRedisLockType(RedisLockType) option to determine in which mode a Redis lock acquisition should happen:
- 
RedisLockType.SPIN_LOCK- the lock is acquired by periodic loop (100ms) checking whether the lock can be acquired. Default.
- 
RedisLockType.PUB_SUB_LOCK- The lock is acquired by redis pub-sub subscription.
The pub-sub is preferred mode - less network chatter between client Redis server, and more performant - the lock is acquired immediately when subscription is notified about unlocking in the other process. However, the Redis does not support pub-sub in the Master/Replica connections (for example in AWS ElastiCache environment), therefore a busy-spin mode is chosen as a default to make the registry working in any environment.