22. Redis Support

Since version 2.1 Spring Integration introduces support for Redis: "an open source advanced key-value store". This support comes in the form of a Redis-based MessageStore as well as Publish-Subscribe Messaging adapters that are supported by Redis via its PUBLISH, SUBSCRIBE and UNSUBSCRIBE commands.

22.1 Introduction

To download, install and run Redis please refer to the Redis documentation.

22.2 Connecting to Redis

To begin interacting with Redis you first need to connect to it. Spring Integration uses support provided by another Spring project, Spring Data Redis, which provides typical Spring constructs: ConnectionFactory and Template. Those abstractions simplify integration with several Redis-client Java APIs. Currently Spring-Data-Redis supports jedis, jredis and rjc

RedisConnectionFactory

To connect to Redis you would use one of the implementations of the RedisConnectionFactory interface:

public interface RedisConnectionFactory extends PersistenceExceptionTranslator {

	/**
	 * Provides a suitable connection for interacting with Redis.
	 *
	 * @return connection for interacting with Redis.
	 */
	RedisConnection getConnection();
}

The example below shows how to create a JedisConnectionFactory.

In Java:

JedisConnectionFactory jcf = new JedisConnectionFactory();
jcf.afterPropertiesSet();

Or in Spring's XML configuration:

<bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
	<property name="port" value="7379" />
</bean>

The implementations of RedisConnectionFactory provide a set of properties such as port and host that can be set if needed. Once an instance of RedisConnectionFactory is created, you can create an instance of RedisTemplate and inject it with the RedisConnectionFactory.

RedisTemplate

As with other template classes in Spring (e.g., JdbcTemplate, JmsTemplate) RedisTemplate is a helper class that simplifies Redis data access code. For more information about RedisTemplate and its variations (e.g., StringRedisTemplate) please refer to the Spring-Data-Redis documentation

The code below shows how to create an instance of RedisTemplate:

In Java:

RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);

Or in Spring's XML configuration::

<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
	<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

22.3 Messaging with Redis

As mentioned in the introduction Redis provides support for Publish-Subscribe messaging via its PUBLISH, SUBSCRIBE and UNSUBSCRIBE commands. As with JMS and AMQP, Spring Integration provides Message Channels and adapters for sending and receiving messages via Redis.

22.3.1 Redis Publish/Subscribe channel

Similar to the JMS there are cases where both the producer and consumer are intended to be part of the same application, running within the same process. This could be accomplished by using a pair of inbound and outbound Channel Adapters, however just like with Spring Integration's JMS support, there is a simpler approach to address this use case.

<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>

The publish-subscribe-channel (above) will behave much like a normal <publish-subscribe-channel/> element from the main Spring Integration namespace. It can be referenced by both input-channel and output-channel attributes of any endpoint. The difference is that this channel is backed by a Redis topic name - a String value specified by the topic-name attribute. However unlike JMS this topic doesn't have to be created in advance or even auto-created by Redis. In Redis topics are simple String values that play the role of an address, and all the producer and consumer need to do to communicate is use the same String value as their topic name. A simple subscription to this channel means that asynchronous pub-sub messaging is possible between the producing and consuming endpoints, but unlike the asynchronous Message Channels created by adding a <queue/> sub-element within a simple Spring Integration <channel/> element, the Messages are not just stored in an in-memory queue. Instead those Messages are passed through Redis allowing you to rely on its support for persistence and clustering as well as its interoperability with other non-java platforms.

22.3.2 Redis Inbound Channel Adapter

The Redis-based Inbound Channel Adapter adapts incoming Redis messages into Spring Integration Messages in the same way as other inbound adapters. It receives platform-specific messages (Redis in this case) and converts them to Spring Integration Messages using a MessageConverter strategy.

<int-redis:inbound-channel-adapter id="redisAdapter"
       topics="foo, bar"
       channel="receiveChannel"
       error-channel="testErrorChannel"
       message-converter="testConverter" />

<bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
    <property name="port" value="7379" />
</bean>

<bean id="testConverter" class="foo.bar.SampleMessageConverter" />

Above is a simple but complete configuration of a Redis Inbound Channel Adapter. Note that the above configuration relies on the familiar Spring paradigm of auto-discovering certain beans. In this case the redisConnectionFactory is implicitly injected into the adapter. You can of course specify it explicitly using the connection-factory attribute instead.

Also, note that the above configuration injects the adapter with a custom MessageConverter. The approach is similar to JMS where MessageConverters are used to convert between Redis Messages and the Spring Integration Message payloads. The default is a SimpleMessageConverter.

Inbound adapters can subscribe to multiple topic names hence the comma-delimited set of values in the topics attribute.

22.3.3 Redis Outbound Channel Adapter

The Redis-based Outbound Channel Adapter adapts outgoing Spring Integration messages into Redis messages in the same way as other outbound adapters. It receives Spring Integration messages and converts them to platform-specific messages (Redis in this case) using a MessageConverter strategy.

<int-redis:outbound-channel-adapter id="outboundAdapter"
			channel="sendChannel"
			topic="foo"
			message-converter="testConverter"/>

<bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
	<property name="port" value="7379"/>
</bean>

<bean id="testConverter" class="foo.bar.SampleMessageConverter" />

As you can see the configuration is similar to the Redis Inbound Channel Adapter. The adapter is implicitly injected with a RedisConnectionFactory which was defined with 'redisConnectionFactory' as its bean name. This example also includes the optional, custom MessageConverter (the 'testConverter' bean).

22.4 Redis Message Store

As described in EIP, a Message Store allows you to persist Messages. This can be very useful when dealing with components that have a capability to buffer messages (QueueChannel, Aggregator, Resequencer, etc.) if reliability is a concern. In Spring Integration, the MessageStore strategy also provides the foundation for the ClaimCheck pattern, which is described in EIP as well.

Spring Integration's Redis module provides the RedisMessageStore which is an implementation of both the the MessageStore strategy (mainly used by the QueueChannel and ClaimCheck patterns) and the MessageGroupStore strategy (mainly used by the Aggregator and Resequencer patterns).

<bean id="redisMessageStore" class="org.springframework.integration.redis.store.RedisMessageStore">
    <constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="redisMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="redisMessageStore"/>

Above is a sample RedisMessageStore configuration that shows its usage by a QueueChannel and an Aggregator. As you can see it is a simple bean configuration, and it expects a RedisConnectionFactory as a constructor argument.

By default the RedisMessageStore will use Java serialization to serialize the Message. However if you want to use a different serialization technique (e.g., JSON), you can provide your own serializer via the valueSerializer property of the RedisMessageStore.

22.5 RedisStore Inbound Channel Adapter

The RedisStore Inbound Channel Adapter is a polling consumer that reads data from a Redis collection and sends it as a Message payload.

<int-redis:store-inbound-channel-adapter id="listAdapter"
		connection-factory="redisConnectionFactory"
		key="myCollection"
		channel="redisChannel"
		collection-type="LIST" >
			<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>

As you can see from the configuration above you configure a Redis Store Inbound Channel Adapter using the store-inbound-channel-adapter element, providing values for various attributes such as:

  • key or key-expression - The name of the key for the collection being used.

  • collection-type - enumeration of the Collection types supported by this adapter. Supported Collections are: LIST, SET, ZSET, PROPERTIES, MAP

  • connection-factory - reference to an instance of org.springframework.data.redis.connection.RedisConnectionFactory

  • redis-template - reference to an instance of org.springframework.data.redis.core.RedisTemplate

and other attributes that are common across all other inbound adapters (e.g., 'channel').

[Note]Note
You cannot set both redis-template and connection-factory.
[Important]Important
By default, the adapter uses a StringRedisTemplate; this uses StringRedisSerializers for keys, values, hash keys and hash values. If your Redis store contains objects that are serialized with other techniques, you must supply a RedisTemplate configured with appropriate serializers. For example, if the store is written to using a RedisStore Outbound Adapter that has its extract-payload-elements set to false, you must provide a RedisTemplate configured thus:
<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
	<property name="connectionFactory" ref="redisConnectionFactory"/>
	<property name="keySerializer">
		<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
	</property>
	<property name="hashKeySerializer">
		<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
	</property>
</bean>

This uses String serializers for keys and hash keys and the default JDK Serialization serializers for values and hash values.

The example above is relatively simple and static since it has a literal value for the key. Sometimes, you may need to change the value of the key at runtime based on some condition. To do that, simply use key-expression instead, where the provided expression can be any valid SpEL expression.

Also, you may wish to perform some post-processing to the successfully processed data that was read from the Redis collection. For example; you may want to move or remove the value after its been processed. You can do this using the Transaction Synchronization feature that was added with Spring Integration 2.2.

<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
        connection-factory="redisConnectionFactory"
        key-expression="'presidents'"
        channel="otherRedisChannel"
        auto-startup="false"
        collection-type="ZSET">
            <int:poller fixed-rate="1000" max-messages-per-poll="2">
                <int:transactional synchronization-factory="syncFactory"/>
            </int:poller>
</int-redis:store-inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
	<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>

<bean id="transactionManager" class="org.springframework.integration.transaction.PseudoTransactionManager"/>

As you can see from the above all, you need to do is declare your poller to be transactional with a transactional element. This element can reference a real transaction manager (for example if some other part of your flow invokes JDBC). If you don't have a 'real' transaction, you can use a org.springframework.integration.transaction.PseudoTransactionManager which is an implementation of Spring's PlatformTransactionManager and enables the use of the transaction synchronization features of the redis adapter when there is no actual transaction.

[Important]Important
This does NOT make the Redis activities themselves transactional, it simply allows the synchronization of actions to be taken before/after success (commit) or after failure (rollback).

Once your poller is transactional all you need to do is set an instance of the org.springframework.integration.transaction.TransactionSynchronizationFactory on the transactional element. TransactionSynchronizationFactory will create an instance of the TransactionSynchronization. For your convenience we've exposed a default SpEL-based TransactionSynchronizationFactory which allows you to configure SpEL expressions, with their execution being coordinated (synchronized) with a transaction. Expressions for before-commit, after-commit, and after-rollback are supported, together with a channel for each where the evaluation result (if any) will be sent. For each sub-element you can specify expression and/or channel attributes. If only the channel attribute is present the received Message will be sent there as part of the particular synchronization scenario. If only the expression attribute is present and the result of an expression is a non-Null value, a Message with the result as the payload will be generated and sent to a default channel (NullChannel) and will appear in the logs (DEBUG). If you want the evaluation result to go to a specific channel add a channel attribute. If the result of an expression is null or void, no Message will be generated.

For more information about transaction synchronization, see Section B.3, “Transaction Synchronization”.

22.6 RedisStore Outbound Channel Adapter

The RedisStore Outbound Channel Adapter allows you to write a Message payload to a Redis collection

<int-redis:store-outbound-channel-adapter id="redisListAdapter"
          collection-type="LIST"
          channel="requestChannel"
          key="myCollection" />

As you can see from the configuration above, you configure a Redis Store Outbound Channel Adapter using the store-inbound-channel-adapter element, providing values for various attributes such as:

  • key or key-expression - The name of the key for the collection being used.

  • extract-payload-elements - If set to true (Default) and the payload is an instance of a "multi- value" object (i.e., Collection or Map) it will be stored using addAll/ putAll semantics. Otherwise, if set to false the payload will be stored as a single entry regardless of its type. If the payload is not an instance of a "multi-value" object, the value of this attribute is ignored and the payload will always be stored as a single entry.

  • collection-type - enumeration of the Collection types supported by this adapter. Supported Collections are: LIST, SET, ZSET, PROPERTIES, MAP

  • map-key-expression - SpEL expression that returns the name of the key for entry being stored. Only applies if the collection-type is MAP or PROPERTIES and 'extract-payload-elements' is false.

  • connection-factory - reference to an instance of org.springframework.data.redis.connection.RedisConnectionFactory

  • redis-template - reference to an instance of org.springframework.data.redis.core.RedisTemplate

and other attributes that are common across all other inbound adapters (e.g., 'channel').

[Note]Note
You cannot set both redis-template and connection-factory.

[Important]Important
By default, the adapter uses a StringRedisTemplate; this uses StringRedisSerializers for keys, values, hash keys and hash values. However, if extract-payload-elements is set to false, a RedisTemplate using StringRedisSerializers for keys and hash keys, and JdkSerializationRedisSerializers for values and hash values will be used. With the JDK serializer, it is important to understand that java serialization is used for all values, regardless of whether the value is actually a collection or not. If you need more control over the serialization of values, you may want to consider providing your own RedisTemplate rather than relying upon these defaults.

The example above is relatively simple and static since it has a literal values for the key and other attributes. Sometimes you may need to change the values dynamically at runtime based on some condition. To do that simply use their -expression equivalents (key-expression, map-key-expression etc.) where the provided expression can be any valid SpEL expression.