Spring Integration provides support for VMWare vFabric GemFire
VMWare vFabric GemFire (GemFire) is a distributed data management platform providing a key-value data grid along with advanced distributed system features such as event processing, continuous querying, and remote function execution. This guide assumes some familiarity with GemFire and its API.
Spring integration provides support for GemFire by providing inbound adapters for entry and continuous query events,
an outbound adapter to write entries to the cache, and MessageStore
and MessageGroupStore
implementations.
Spring integration leverages the
Spring Gemfire project, providing a thin wrapper over its components.
To configure the 'int-gfe' namespace, include the following elements within the headers of your XML configuration file:
xmlns:int-gfe="http://www.springframework.org/schema/integration/gemfire" xsi:schemaLocation="http://www.springframework.org/schema/integration/gemfire http://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire.xsd"
The inbound-channel-adapter produces messages on a channel triggered by a GemFire EntryEvent
. GemFire
generates events whenever an entry is CREATED, UPDATED, DESTROYED, or INVALIDATED in the associated region. The inbound channel adapter allows you to filter on a subset of these
events. For example, you may want to only produce messages in response to an entry being CREATED. In addition, the inbound channel adapter can evaluate a SpEL
expression if, for example, you want your message payload to contain an event property such as the new entry value.
<gfe:cache/> <gfe:replicated-region id="region"/> <int-gfe:inbound-channel-adapter id="inputChannel" region="region" cache-events="CREATED" expression="newValue"/>
In the above configuration, we are creating a GemFire Cache
and Region
using Spring GemFire's 'gfe' namespace.
The inbound-channel-adapter requires a reference to the GemFire region for which the adapter will be listening for events. Optional attributes include cache-events
which can contain a comma separated list of event types for which a message will be produced on the input channel. By default CREATED and UPDATED are enabled.
Note that this adapter conforms to Spring integration conventions.
If no channel
attribute is provided, the channel will be created from the id
attribute. This adapter also supports an error-channel
.
The GemFire EntryEvent
is the #root
object of the expression
evaluation.
Example:
expression="new foo.MyEvent(key, oldValue, newValue)"
If the expression
attribute is not provided, the message payload will be the GemFire
EntryEvent
itself.
The cq-inbound-channel-adapter produces messages a channel triggered by a GemFire continuous query or CqEvent
event. Spring GemFire introduced
continuous query support in release 1.1, including a ContinuousQueryListenerContainer
which provides a nice abstraction over the GemFire native API. This adapter requires a
reference to a ContinuousQueryListenerContainer, and creates a listener for a given query
and executes the query. The continuous query acts as an event source that will fire whenever its
result set changes state.
Note | |
---|---|
GemFire queries are written in OQL and are scoped to the entire cache (not just one region). Additionally, continuous queries require a remote (i.e., running in a separate process or remote host) cache server. Please consult the GemFire documentation for more information on implementing continuous queries. |
<gfe:client-cache id="client-cache" pool-name="client-pool"/> <gfe:pool id="client-pool" subscription-enabled="true" > <!--configure server or locator here required to address the cache server --> </gfe:pool> <gfe:client-region id="test" cache-ref="client-cache" pool-name="client-pool"/> <gfe:cq-listener-container id="queryListenerContainer" cache="client-cache" pool-name="client-pool"/> <int-gfe:cq-inbound-channel-adapter id="inputChannel" cq-listener-container="queryListenerContainer" query="select * from /test"/>
In the above configuration, we are creating a GemFire client cache
(recall a remote cache server is required for this implementation and its address is configured as a sub-element of the pool), a client region and a ContinuousQueryListenerContainer
using Spring GemFire. The continuous query inbound channel adapter requires a cq-listener-container
attribute which contains a reference to the ContinuousQueryListenerContainer
. Optionally,
it accepts an expression
attribute which uses SpEL to transform the CqEvent
or extract an individual property as needed. The cq-inbound-channel-adapter provides a
query-events
attribute, containing a comma separated list of event types for which a message will be produced on the input channel. Available event types are CREATED, UPDATED, DESTROYED,
REGION_DESTROYED, REGION_INVALIDATED. CREATED and UPDATED are enabled by default. Additional optional attributes include, query-name
which provides an optional query name, and
expression
which works as described in the above section, and durable
- a boolean value indicating if the query is durable (false by default).
Note that this adapter conforms to Spring integration conventions.
If no channel
attribute is provided, the channel will be created from the id
attribute. This adapter also supports an error-channel
The outbound-channel-adapter writes cache entries mapped from the message payload. In its simplest form, it expects a
payload of type java.util.Map
and puts the map entries into its configured region.
<int-gfe:outbound-channel-adapter id="cacheChannel" region="region"/>
Given the above configuration, an exception will be thrown if the payload is not a Map. Additionally, the outbound channel adapter can be configured to create a map of cache entries using SpEL of course.
<int-gfe:outbound-channel-adapter id="cacheChannel" region="region"> <int-gfe:cache-entries> <entry key="payload.toUpperCase()" value="payload.toLowerCase()"/> <entry key="'foo'" value="'bar'"/> </int-gfe:cache-entries> </int-gfe:outbound-channel-adapter>
In the above configuration, the inner element cache-entries
is semantically equivalent to Spring 'map' element. The adapter interprets the key
and
value
attributes as SpEL expressions with the message as the evaluation context. Note that this contain
arbitrary cache entries (not only those derived from the message) and that literal values must be enclosed in single quotes. In the above example, if the message sent to
cacheChannel
has a String payload with a value "Hello", two entries [HELLO:hello, foo:bar]
will be written (created or updated) in the cache region.
This adapter also supports the order
attribute which may be useful if it is bound to a PublishSubscribeChannel.
As described in EIP, a Message Store allows you to persist Messages. This can be very useful when dealing with components that have a capability to buffer messages (QueueChannel, Aggregator, Resequencer, etc.) if reliability is a concern. In Spring Integration, the MessageStore strategy also provides the foundation for the ClaimCheck pattern, which is described in EIP as well.
Spring Integration's Gemfire module provides the GemfireMessageStore
which is an implementation of both the
the MessageStore
strategy (mainly used by the QueueChannel and ClaimCheck
patterns) and the MessageGroupStore
strategy (mainly used by the Aggregator and
Resequencer patterns).
<bean id="gemfireMessageStore" class="o.s.i.gemfire.store.GemfireMessageStore"> <constructor-arg ref="myCache"/> </bean> <bean id="myCache" class="org.springframework.data.gemfire.CacheFactoryBean"/> <int:channel id="somePersistentQueueChannel"> <int:queue message-store="gemfireMessageStore"/> <int:channel> <int:aggregator input-channel="inputChannel" output-channel="outputChannel" message-store="gemfireMessageStore"/>
Above is a sample GemfireMessageStore
configuration that shows its usage by a QueueChannel and an Aggregator. As you can see it is a normal Spring bean configuration. The simplest configuration requires a reference to a GemFireCache
(created by CacheFactoryBean
) as a constructor argument. If the cache is standalone, i.e., embedded in the same JVM, the MessageStore will create a message store region named "messageStoreRegion". If your application requires customization of the messageStore region, for example, multiple Gemfire message stores each with its own region, you can configure a region for each message store instance and use the Region
as the constructor argument:
<bean id="gemfireMessageStore" class="o.s.i.gemfire.store.GemfireMessageStore"> <constructor-arg ref="myRegion"/> </bean> <gfe:cache/> <gfe:replicated-region id="myRegion"/>
In the above examle, the cache and region are configured using the spring-gemfire namespace (not to be confused with the spring-integration-gemfire namespace). Often it is desirable for the message store to be maintained in one or more remote cache servers in a client-server configuration (See the GemFire product documentation for more details). In this case, you configure a client cache, client region, and client pool and inject the region into the MessageStore. Here is an example:
<bean id="gemfireMessageStore" class="org.springframework.integration.gemfire.store.GemfireMessageStore"> <constructor-arg ref="myRegion"/> </bean> <gfe:client-cache/> <gfe:client-region id="myRegion" shortcut="PROXY" pool-name="messageStorePool"/> <gfe:pool id="messageStorePool"> <gfe:server host="localhost" port="40404" /> </gfe:pool>
Note the pool element is configured with the address of a cache server (a locator may be substituted here). The region is configured as a 'PROXY' so that no data will be stored locally. The region's id corresponds to a region with the same name configured in the cache server.
Starting with version 4.0, the GemfireLockRegistry
is
available. Certain components (for example aggregator and resequencer) use a lock obtained from
a LockRegistry
instance to ensure
that only one thread is manipulating a group at a time. The DefaultLockRegistry
performs this function within a single component; you can now configure an external lock registry
on these components. When used with a shared MessageGroupStore
,
the GemfireLockRegistry
can be use to provide this functionality across
multiple application instances, such that only one instance can manipulate the group at a time.
Note | |
---|---|
One of the GemfireLockRegistry constructors requires a Region
as an argument; it is used to obtain a Lock via the
getDistributedLock() method. This operation requires GLOBAL
scope for the Region .
Another constructor requires Cache and the Region
will be created with GLOBAL scope and with the name LockRegistry .
|
As of Spring Integration 4.0, a new Gemfire-based
MetadataStore
(Section 8.4, “Metadata Store”) implementation is available. The GemfireMetadataStore
can be used to maintain metadata state
across application restarts. This new MetadataStore
implementation can be used with adapters such as:
In order to instruct these adapters to use the new GemfireMetadataStore
,
simply declare a Spring bean using the bean name metadataStore.
The Twitter Inbound Channel Adapter and the
Feed Inbound Channel Adapter will both automatically
pick up and use the declared GemfireMetadataStore
.
Note | |
---|---|
The |