Since version 2.1 Spring Integration introduces support for Redis: _"an open source advanced key-value store". _ This support comes in the form of a Redis-based MessageStore as well as Publish-Subscribe Messaging adapters that are supported by Redis via its PUBLISH, SUBSCRIBE and UNSUBSCRIBE commands.
To download, install and run Redis please refer to the Redis documentation.
To begin interacting with Redis you first need to connect to it.
Spring Integration uses support provided by another Spring project, Spring Data Redis, which provides typical Spring constructs: ConnectionFactory
and Template
.
Those abstractions simplify integration with several Redis-client Java APIs.
Currently Spring-Data-Redis supportshttps://github.com/xetorthio/jedis[jedis], jredis and rjc
RedisConnectionFactory
To connect to Redis you would use one of the implementations of the RedisConnectionFactory
interface:
public interface RedisConnectionFactory extends PersistenceExceptionTranslator { /** * Provides a suitable connection for interacting with Redis. * * @return connection for interacting with Redis. */ RedisConnection getConnection(); }
The example below shows how to create a JedisConnectionFactory
.
In Java:
JedisConnectionFactory jcf = new JedisConnectionFactory();
jcf.afterPropertiesSet();
Or in Spring’s XML configuration:
<bean id="redisConnectionFactory" class="o.s.data.redis.connection.jedis.JedisConnectionFactory"> <property name="port" value="7379" /> </bean>
The implementations of RedisConnectionFactory provide a set of properties such as port and host that can be set if needed. Once an instance of RedisConnectionFactory is created, you can create an instance of RedisTemplate and inject it with the RedisConnectionFactory.
RedisTemplate
As with other template classes in Spring (e.g., JdbcTemplate
, JmsTemplate
) RedisTemplate
is a helper class that simplifies Redis data access code.
For more information about RedisTemplate
and its variations (e.g., StringRedisTemplate
) please refer to the Spring-Data-Redis documentation
The code below shows how to create an instance of RedisTemplate
:
In Java:
RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);
<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"> <property name="connectionFactory" ref="redisConnectionFactory"/> </bean>
As mentioned in the introduction Redis provides support for Publish-Subscribe messaging via its PUBLISH, SUBSCRIBE and UNSUBSCRIBE commands. As with JMS and AMQP, Spring Integration provides Message Channels and adapters for sending and receiving messages via Redis.
Similar to the JMS there are cases where both the producer and consumer are intended to be part of the same application, running within the same process. This could be accomplished by using a pair of inbound and outbound Channel Adapters, however just like with Spring Integration’s JMS support, there is a simpler approach to address this use case.
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>
The publish-subscribe-channel (above) will behave much like a normal <publish-subscribe-channel/>
element from the main Spring Integration namespace.
It can be referenced by both input-channel
and output-channel
attributes of any endpoint.
The difference is that this channel is backed by a Redis topic name - a String value specified by the topic-name
attribute.
However unlike JMS this topic doesn’t have to be created in advance or even auto-created by Redis.
In Redis topics are simple String values that play the role of an address, and all the producer and consumer need to do to communicate is use the same String value as their topic name.
A simple subscription to this channel means that asynchronous pub-sub messaging is possible between the producing and consuming endpoints, but unlike the asynchronous Message Channels created by adding a <queue/>
sub-element within a simple Spring Integration <channel/>
element, the Messages are not just stored in an in-memory queue.
Instead those Messages are passed through Redis allowing you to rely on its support for persistence and clustering as well as its interoperability with other non-java platforms.
The Redis-based Inbound Channel Adapter (RedisInboundChannelAdapter
) adapts incoming Redis messages into Spring Messages in the same way as other inbound adapters.
It receives platform-specific messages (Redis in this case) and converts them to Spring Messages using a MessageConverter
strategy.
<int-redis:inbound-channel-adapter id="redisAdapter" topics="foo, bar" channel="receiveChannel" error-channel="testErrorChannel" message-converter="testConverter" /> <bean id="redisConnectionFactory" class="o.s.data.redis.connection.jedis.JedisConnectionFactory"> <property name="port" value="7379" /> </bean> <bean id="testConverter" class="foo.bar.SampleMessageConverter" />
Above is a simple but complete configuration of a Redis Inbound Channel Adapter.
Note that the above configuration relies on the familiar Spring paradigm of auto-discovering certain beans.
In this case the redisConnectionFactory
is implicitly injected into the adapter.
You can of course specify it explicitly using the connection-factory
attribute instead.
Also, note that the above configuration injects the adapter with a custom MessageConverter
.
The approach is similar to JMS where MessageConverters
are used to convert between Redis Messages and the Spring Integration Message payloads.
The default is a SimpleMessageConverter
.
Inbound adapters can subscribe to multiple topic names hence the comma-delimited set of values in the topics
attribute.
Since version 3.0, the Inbound Adapter, in addition to the existing topics
attribute, now has the topic-patterns
attribute.
This attribute contains a comma-delimited set of Redis topic patterns.
For more information regarding Redis publish/subscribe, see Redis Pub/Sub.
Inbound adapters can use a RedisSerializer
to deserialize the body of Redis Messages.
The serializer
attribute of the <int-redis:inbound-channel-adapter>
can be set to an empty string, which results in a null
value for the RedisSerializer
property.
In this case the raw byte[]
bodies of Redis Messages are provided as the message payloads.
Since version 5.0, an Executor
instance can be provided to the Inbound Adapter via the task-executor
attribute of the <int-redis:inbound-channel-adapter>
.
Also the received Spring Integration Messages have now RedisHeaders.MESSAGE_SOURCE
header to indicate the source of the published message - topic or pattern.
This can be used downstream for routing logic.
The Redis-based Outbound Channel Adapter adapts outgoing Spring Integration messages into Redis messages in the same way as other outbound adapters.
It receives Spring Integration messages and converts them to platform-specific messages (Redis in this case) using a MessageConverter
strategy.
<int-redis:outbound-channel-adapter id="outboundAdapter" channel="sendChannel" topic="foo" message-converter="testConverter"/> <bean id="redisConnectionFactory" class="o.s.data.redis.connection.jedis.JedisConnectionFactory"> <property name="port" value="7379"/> </bean> <bean id="testConverter" class="foo.bar.SampleMessageConverter" />
As you can see the configuration is similar to the Redis Inbound Channel Adapter.
The adapter is implicitly injected with a RedisConnectionFactory
which was defined with redisConnectionFactory
as its bean name.
This example also includes the optional, custom MessageConverter
(the testConverter
bean).
Since Spring Integration 3.0, the <int-redis:outbound-channel-adapter>
, as an alternative to the topic
attribute, has the topic-expression
attribute to determine the Redis topic against the Message at runtime.
These attributes are mutually exclusive.
Since Spring Integration 3.0, a Queue Inbound Channel Adapter is available to pop messages from a Redis List. By default it uses right pop, but it can be configured to use left pop instead. The adapter is message-driven using an internal listener thread and does not use a poller.
<int-redis:queue-inbound-channel-adapter id="" channel="" auto-startup="" phase="" connection-factory="" queue="" error-channel="" serializer="" receive-timeout="" recovery-interval="" expect-message="" task-executor="" right-pop=""/>
The component bean name.
If the | |
The | |
A | |
A | |
A reference to a | |
The name of the Redis List on which the queue-based pop operation is performed to get Redis messages. | |
The | |
The | |
The timeout in milliseconds for pop operation to wait for a Redis message from the queue. Default is 1 second. | |
The time in milliseconds for which the listener task should sleep after exceptions on the pop operation, before restarting the listener task. | |
Specify if this Endpoint expects data from the Redis queue to contain entire | |
A reference to a Spring | |
Specify whether this Endpoint should use right pop (when |
Since Spring Integration 3.0, a Queue Outbound Channel Adapter is available to push to a Redis List from Spring Integration messages. By default, it uses left push, but it can be configured to use right push instead.
<int-redis:queue-outbound-channel-adapter id="" channel="" connection-factory="" queue="" queue-expression="" serializer="" extract-payload="" left-push=""/>
The component bean name.
If the | |
The | |
A reference to a | |
The name of the Redis List on which the queue-based push operation is performed to send Redis messages.
This attribute is mutually exclusive with | |
A SpEL | |
A | |
Specify if this Endpoint should send just the payload to the Redis queue, or the entire | |
Specify whether this Endpoint should use left push (when |
Since Spring Integration 3.0, the Redis module provides an implementation of IntegrationEvent
- which, in turn, is a org.springframework.context.ApplicationEvent
.
The RedisExceptionEvent
encapsulates an Exception
s from Redis operations (with the Endpoint being the source
of the event).
For example, the <int-redis:queue-inbound-channel-adapter/>
emits those events after catching Exception
s from the BoundListOperations.rightPop
operation.
The exception may be any generic org.springframework.data.redis.RedisSystemException
or a org.springframework.data.redis.RedisConnectionFailureException
.
Handling these events using an <int-event:inbound-channel-adapter/>
can be useful to determine problems with background Redis tasks and to take administrative actions.
As described in EIP, a Message Store allows you to persist Messages. This can be very useful when dealing with components that have a capability to buffer messages (Aggregator, Resequencer, etc.) if reliability is a concern. In Spring Integration, the MessageStore strategy also provides the foundation for the ClaimCheck pattern, which is described in EIP as well.
Spring Integration’s Redis module provides the RedisMessageStore
.
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore"> <constructor-arg ref="redisConnectionFactory"/> </bean> <int:aggregator input-channel="inputChannel" output-channel="outputChannel" message-store="redisMessageStore"/>
Above is a sample RedisMessageStore
configuration that shows its usage by an Aggregator.
As you can see it is a simple bean configuration, and it expects a RedisConnectionFactory
as a constructor argument.
By default the RedisMessageStore
will use Java serialization to serialize the Message.
However if you want to use a different serialization technique (e.g., JSON), you can provide your own serializer via the valueSerializer
property of the RedisMessageStore
.
Starting with version 4.3.10, the Framework provides Jackson Serializer and Deserializer implementations for Message
s and MessageHeaders
- MessageHeadersJacksonSerializer
and MessageJacksonDeserializer
, respectively.
They have to be configured via the SimpleModule
options for the ObjectMapper
.
In addition, enableDefaultTyping
should be configured on the ObjectMapper
to add type information for each serialized complex object.
That type information is then used during deserialization.
The Framework provides a utility method JacksonJsonUtils.messagingAwareMapper()
, which is already supplied with all the above-mentioned properties and serializers.
To manage JSON serialization in the RedisMessageStore
, it must be configured like so:
RedisMessageStore store = new RedisMessageStore(jedisConnectionFactory); ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper(); RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper); store.setValueSerializer(serializer);
Starting with version 4.3.12, the RedisMessageStore
supports the key prefix
option to allow distinguishing between instances of the store on the same Redis server.
The RedisMessageStore
above maintains each group as a value under a single key (the group id).
While this can be used to back a QueueChannel
for persistence, a specialized RedisChannelMessageStore
is provided for that purpose (since version 4.0).
This store uses a LIST
for each channel and LPUSH
when sending and RPOP
when receiving messages.
This store also uses JDK serialization by default, but the value serializer can be modified as described above.
It is recommended that this store is used for backing channels, instead of the general RedisMessageStore
.
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore"> <constructor-arg ref="redisConnectionFactory"/> </bean> <int:channel id="somePersistentQueueChannel"> <int:queue message-store="redisMessageStore"/> <int:channel>
The keys that are used to store the data have the form <storeBeanName>:<channelId>
(in the above example, redisMessageStore:somePersistentQueueChannel
).
In addition, a subclass RedisChannelPriorityMessageStore
is also provided.
When this is used with a QueueChannel
, the messages are received in (FIFO within) priority order.
It uses the standard IntegrationMessageHeaderAccessor.PRIORITY
header and supports priority values 0 - 9
; messages with other priorities (and messages with no priority) are retrieved in FIFO order after any messages with priority.
Important | |
---|---|
These stores implement only |
As of Spring Integration 3.0 a new Redis-based MetadataStore (Section 10.5, “Metadata Store”) implementation is available.
The RedisMetadataStore
can be used to maintain state of a MetadataStore
across application restarts.
This new MetadataStore
implementation can be used with adapters such as:
In order to instruct these adapters to use the new RedisMetadataStore
simply declare a Spring bean using the bean name metadataStore.
The Twitter Inbound Channel Adapter and the Feed Inbound Channel Adapter will both automatically pick up and use the declared RedisMetadataStore
.
<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore"> <constructor-arg name="connectionFactory" ref="redisConnectionFactory"/> </bean>
The RedisMetadataStore
is backed by RedisProperties
and interaction with it uses BoundHashOperations
, which, in turn, requires a key
for the entire Properties
store.
In the case of the MetadataStore
, this key
plays the role of a region, which is useful in distributed environment, when several applications use the same Redis server.
By default this key
has the value MetaData
.
Starting with version 4.0, this store now implements ConcurrentMetadataStore
, allowing it to be reliably shared across multiple application instances where only one instance will be allowed to store or modify a key’s value.
Important | |
---|---|
The |
The RedisStore Inbound Channel Adapter is a polling consumer that reads data from a Redis collection and sends it as a Message payload.
<int-redis:store-inbound-channel-adapter id="listAdapter" connection-factory="redisConnectionFactory" key="myCollection" channel="redisChannel" collection-type="LIST" > <int:poller fixed-rate="2000" max-messages-per-poll="10"/> </int-redis:store-inbound-channel-adapter>
As you can see from the configuration above you configure a Redis Store Inbound Channel Adapter using the store-inbound-channel-adapter
element, providing values for various attributes such as:
key
or key-expression
- The name of the key for the collection being used.
collection-type
- enumeration of the Collection types supported by this adapter.
Supported Collections are: LIST, SET, ZSET, PROPERTIES, MAP
connection-factory
- reference to an instance of o.s.data.redis.connection.RedisConnectionFactory
redis-template
- reference to an instance of o.s.data.redis.core.RedisTemplate
and other attributes that are common across all other inbound adapters (e.g., channel).
Note | |
---|---|
You cannot set both |
Important | |
---|---|
By default, the adapter uses a <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"> <property name="connectionFactory" ref="redisConnectionFactory"/> <property name="keySerializer"> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> </property> <property name="hashKeySerializer"> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> </property> </bean> This uses String serializers for keys and hash keys and the default JDK Serialization serializers for values and hash values. |
The example above is relatively simple and static since it has a literal value for the key
.
Sometimes, you may need to change the value of the key at runtime based on some condition.
To do that, simply use key-expression
instead, where the provided expression can be any valid SpEL expression.
Also, you may wish to perform some post-processing to the successfully processed data that was read from the Redis collection. For example; you may want to move or remove the value after its been processed. You can do this using the Transaction Synchronization feature that was added with Spring Integration 2.2.
<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization" connection-factory="redisConnectionFactory" key-expression="'presidents'" channel="otherRedisChannel" auto-startup="false" collection-type="ZSET"> <int:poller fixed-rate="1000" max-messages-per-poll="2"> <int:transactional synchronization-factory="syncFactory"/> </int:poller> </int-redis:store-inbound-channel-adapter> <int:transaction-synchronization-factory id="syncFactory"> <int:after-commit expression="payload.removeByScore(18, 18)"/> </int:transaction-synchronization-factory> <bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
As you can see from the above all, you need to do is declare your poller to be transactional with a transactional
element.
This element can reference a real transaction manager (for example if some other part of your flow invokes JDBC).
If you don’t have a real transaction, you can use a o.s.i.transaction.PseudoTransactionManager
which is an implementation of Spring’s PlatformTransactionManager
and enables the use of the transaction synchronization features of the redis adapter when there is no actual transaction.
Important | |
---|---|
This does NOT make the Redis activities themselves transactional, it simply allows the synchronization of actions to be taken before/after success (commit) or after failure (rollback). |
Once your poller is transactional all you need to do is set an instance of the o.s.i.transaction.TransactionSynchronizationFactory
on the transactional
element.
TransactionSynchronizationFactory
will create an instance of the TransactionSynchronization
.
For your convenience we’ve exposed a default SpEL-based TransactionSynchronizationFactory
which allows you to configure SpEL expressions, with their execution being coordinated (synchronized) with a transaction.
Expressions for before-commit, after-commit, and after-rollback are supported, together with a channel for each where the evaluation result (if any) will be sent.
For each sub-element you can specify expression
and/or channel
attributes.
If only the channel
attribute is present the received Message will be sent there as part of the particular synchronization scenario.
If only the expression
attribute is present and the result of an expression is a non-Null value, a Message with the result as the payload will be generated and sent to a default channel (NullChannel) and will appear in the logs (DEBUG).
If you want the evaluation result to go to a specific channel add a channel
attribute.
If the result of an expression is null or void, no Message will be generated.
For more information about transaction synchronization, see Section C.3, “Transaction Synchronization”.
The RedisStore Outbound Channel Adapter allows you to write a Message payload to a Redis collection
<int-redis:store-outbound-channel-adapter id="redisListAdapter" collection-type="LIST" channel="requestChannel" key="myCollection" />
As you can see from the configuration above, you configure a Redis Store Outbound Channel Adapter using the store-inbound-channel-adapter
element, providing values for various attributes such as:
key
or key-expression
- The name of the key for the collection being used.
extract-payload-elements
- If set to true
(Default) and the payload is an instance of a "multi- value" object (i.e., Collection or Map) it will be stored using addAll/ putAll semantics.
Otherwise, if set to false
the payload will be stored as a single entry regardless of its type.
If the payload is not an instance of a "multi-value" object, the value of this attribute is ignored and the payload will always be stored as a single entry.
collection-type
- enumeration of the Collection types supported by this adapter.
Supported Collections are: LIST, SET, ZSET, PROPERTIES, MAP
map-key-expression
- SpEL expression that returns the name of the key for entry being stored.
Only applies if the collection-type
is MAP or PROPERTIES and extract-payload-elements is false.
connection-factory
- reference to an instance of o.s.data.redis.connection.RedisConnectionFactory
redis-template
- reference to an instance of o.s.data.redis.core.RedisTemplate
and other attributes that are common across all other inbound adapters (e.g., channel).
Note | |
---|---|
You cannot set both |
Important | |
---|---|
By default, the adapter uses a |
The example above is relatively simple and static since it has a literal values for the key
and other attributes.
Sometimes you may need to change the values dynamically at runtime based on some condition.
To do that simply use their -expression
equivalents (key-expression
, map-key-expression
etc.) where the provided expression can be any valid SpEL expression.
Since Spring Integration 4.0, the Redis Command Gateway is available to perform any standard Redis command using generic RedisConnection#execute
method:
<int-redis:outbound-gateway request-channel="" reply-channel="" requires-reply="" reply-timeout="" connection-factory="" redis-template="" arguments-serializer="" command-expression="" argument-expressions="" use-command-variable="" arguments-strategy="" />
The | |
The | |
Specify whether this outbound gateway must return a non-null value.
This value is | |
The timeout in milliseconds to wait until the reply message will be sent or not. Typically is applied for queue-based limited reply-channels. | |
A reference to a | |
A reference to a | |
Reference to an instance of | |
The SpEL expression that returns the command key.
Default is the | |
Comma-separate SpEL expressions that will be evaluated as command arguments.
Mutually exclusive with the | |
A | |
Reference to an instance of |
The <int-redis:outbound-gateway>
can be used as a common component to perform any desired Redis operation.
For example to get incremented value from Redis Atomic Number:
<int-redis:outbound-gateway request-channel="requestChannel" reply-channel="replyChannel" command-expression="'INCR'"/>
where the Message payload
should be a name of redisCounter
, which may be provided by org.springframework.data.redis.support.atomic.RedisAtomicInteger
bean definition.
The RedisConnection#execute
has a generic Object
as return type and real result depends on command type, for example MGET
returns a List<byte[]>
.
For more information about commands, their arguments and result type seehttp://redis.io/commands[Redis Specification].
Since Spring Integration 4.1, the Redis Queue Outbound Gateway is available to perform request and reply scenarios.
It pushes a conversation`UUID` to the provided queue
, then pushes the value to a Redis List with that UUID
as its key and waits for the reply from a Redis List with a key of UUID + '.reply'
.
A different UUID is used for each interaction.
<int-redis:queue-outbound-gateway request-channel="" reply-channel="" requires-reply="" reply-timeout="" connection-factory="" queue="" order="" serializer="" extract-payload=""/>
The | |
The | |
Specify whether this outbound gateway must return a non-null value.
This value is | |
The timeout in milliseconds to wait until the reply message will be sent or not. Typically is applied for queue-based limited reply-channels. | |
A reference to a | |
The name of the Redis List to which outbound gateway will send a conversation`UUID`. | |
The order for this outbound gateway when multiple gateway are registered thereby | |
The | |
Specify if this Endpoint expects data from the Redis queue to contain entire |
Since Spring Integration 4.1, the Redis Queue Inbound Gateway is available to perform request and reply scenarios.
It pops a conversation UUID
from the provided queue
, then pops the value from the Redis List with that UUID
as its key and pushes the reply to the Redis List with a key of UUID + '.reply'
:
<int-redis:queue-inbound-gateway request-channel="" reply-channel="" executor="" reply-timeout="" connection-factory="" queue="" order="" serializer="" receive-timeout="" expect-message="" recovery-interval=""/>
The | |
The | |
A reference to a Spring | |
The timeout in milliseconds to wait until the reply message will be sent or not. Typically is applied for queue-based limited reply-channels. | |
A reference to a | |
The name of the Redis List for the conversation | |
The order for this inbound gateway when multiple gateway are registered thereby | |
The | |
The timeout in milliseconds to wait until the receive message will be get or not. Typically is applied for queue-based limited request-channels. | |
Specify if this Endpoint expects data from the Redis queue to contain entire | |
The time in milliseconds for which the listener task should sleep after exceptions on the right pop operation, before restarting the listener task. |
Starting with version 4.0, the RedisLockRegistry
is available.
Certain components (for example aggregator and resequencer) use a lock obtained from a LockRegistry
instance to ensure that only one thread is manipulating a group at a time.
The DefaultLockRegistry
performs this function within a single component; you can now configure an external lock registry on these components.
When used with a shared MessageGroupStore
, the RedisLockRegistry
can be use to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.
When a lock is released by a local thread, another local thread will generally be able to acquire the lock immediately. If a lock is released by a thread using a different registry instance, it can take up to 100ms to acquire the lock.
To avoid "hung" locks (when a server fails), the locks in this registry are expired after a default 60 seconds, but this can be configured on the registry. Locks are normally held for a much smaller time.
Important | |
---|---|
Because the keys can expire, an attempt to unlock an expired lock will result in an exception being thrown. However, be aware that the resources protected by such a lock may have been compromised so such exceptions should be considered severe. The expiry should be set at a large enough value to prevent this condition, while small enough that the lock can be recovered after a server failure in a reasonable amount of time. |
Starting with version 5.0, the RedisLockRegistry
implements ExpirableLockRegistry
providing functionality to remove locks last acquired more than age
ago that are not currently locked.