Riak is a Key/Value datastore that supports Internet-scale data replication for high performance and high availability. Spring Data Key/Value (SDKV) provides access to the Riak datastore over the HTTP REST API using a built-in driver based on Spring 3.0's RestTemplate
. In addition to making Key/Value datastore access easier from Java, the RiakTemplate
has been designed, from the ground up, to be used from alternative JVM languages like Groovy or JRuby.
Since the SDKV support for Riak uses the stateless REST API, there are no connection factories to manage or other stateful objects to keep tabs on. The helper you'll spend the most time working with is likely the thread-safe RiakTemplate
or RiakKeyValueTemplate
. Your choice of which to use will depend on how you want to manage buckets and keys. SDKV supports two ways to interact with Riak. If you want to use the convention you're likely already familiar with, namely of storing an entry with a given key in a "bucket" by passing the bucket and key name separately, you'll want to use the RiakTemplate
. If you want to use a single object to represent your bucket and key pair, you can use the RiakKeyValueTemplate
. It supports a key object that is encoded using one of several different methods:
String
- You can concatenate two strings, separated by a colon: "mybucket:mykey".BucketKeyPair
- You can pass an instance of BucketKeyPair
, like SimpleBucketKeyPair
.Map
- You can pass a Map
with keys for "bucket" and "key".
This is likely the easiest path to using SDKV for Riak, as the bucket and key are passed separately. The examples that follow will assume you're using this version of the the template.
There are only two options you need to set to specify the Riak server to use in your RiakTemplate
object: "defaultUri" and "mapReduceUri". Encoded with the URI should be placeholders for the bucket and the key, which will be filled in by the RestTemplate
when the request is made.
![]() | Important |
---|---|
You can also turn the internal, ETag-based object cache off by setting |
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="riakTemplate" class="org.springframework.data.keyvalue.riak.core.RiakTemplate" p:defaultUri="http://localhost:8098/riak/{bucket}/{key}" p:mapReduceUri="http://localhost:8098/mapred" p:useCache="true"/> </beans>
There are a couple additional properties on the RiakTemplate
that can be changed from their defaults. If you want to specify your own ConversionService to use when converting objects for storage inside Riak, then set it on the "conversionService" property:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="conversionService" class="com.mycompany.convert.MyConversionService"/> <bean id="riakTemplate" class="org.springframework.data.keyvalue.riak.core.RiakTemplate" p:defaultUri="http://localhost:8098/riak/{bucket}/{key}" p:mapReduceUri="http://localhost:8098/mapred" p:conversionService-ref="conversionService"/> </beans>
Depending on the application, it might be useful to set default Quality-of-Service parameters. In Riak paralance, these are the "dw", "w", and "r" parameters. They can be set to an integer representing the number of vnodes that need to report having received the data before declaring the operation a success, or the string "one", "all", or (the default) "quorum". These values can be overridden by passing a different set of QosParameters
to the set/get operation you're performing.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="qos" class="org.springframework.data.keyvalue.riak.core.RiakQosParameters" p:durableWriteThreshold="all" p:writeThreshold="all"/> <bean id="riakTemplate" class="org.springframework.data.keyvalue.riak.core.RiakTemplate" p:defaultUri="http://localhost:8098/riak/{bucket}/{key}" p:mapReduceUri="http://localhost:8098/mapred" p:defaultQosParameters-ref="qos"/> </beans>
You can also set a specific ClassLoader
to use when loading objects from Riak. Just set the classLoader
property:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="riakTemplate" class="org.springframework.data.keyvalue.riak.core.RiakTemplate" p:defaultUri="http://localhost:8098/riak/{bucket}/{key}" p:mapReduceUri="http://localhost:8098/mapred" p:classLoader-ref="customClassLoader"/> </beans>
One of the primary goals of the SDKV project is to make accessing Key/Value stores easier for the developer by taking away the mundane tasks of basic IO, buffering, type conversion, exception handling, and sundry other logistical concerns so the developer can focus on creating great applications. SDKV for Riak works toward this goal by making basic persistence and data access as easy as using a Map
.
To store data in Riak, use one of the six different set
methods:
import org.springframework.data.keyvalue.riak.core.RiakTemplate; public class Example { @Autowired RiakTemplate riak; public void setData(String bucket, String key, String data) throws Exception { riak.set(bucket, key, data); // Set as Content-Type: text/plain riak.setAsBytes(bucket, key, data.getBytes()); // Set as Content-Type: application/octet-stream } public void setData(String bucket, String key, MyPojo data) throws Exception { riak.set(bucket, key, data); // Converted to JSON automatically, Content-Type: application/json } }
Additionally, there is a setWithMetaData
method that takes a Map
of metadata that will be set as the outgoing HTTP headers. To set custom metadata, your key should be prefixed with X-Riak-Meta-
e.g. X-Riak-Meta-Custom-Header
.
Riak has the ability to generate random IDs for you when storing objects. The RiakTemplate
exposes this capability via the put
method. It will return the ID it generated for you as a String
.
import org.springframework.data.keyvalue.riak.core.RiakTemplate; public class Example { @Autowired RiakTemplate riak; public String setData(String bucket, String data) throws Exception { String id = riak.put(bucket, data); // Returns the generated ID return id; } }
Retrieving data from Riak is just as easy. There are actually 13 different get
methods on RiakTemplate
that give the developer a wide range options for accessing and converting your data.
Assuming you've stored a POJO using an appropriate set
method, you can retrieve that object from Riak using a get
:
import org.springframework.data.keyvalue.riak.core.RiakTemplate; public class Example { @Autowired RiakTemplate riak; public void getData(String bucket, String key) throws Exception { // What you get depends on Content-Type. // application/json=Map, text/plain=String, etc... Object o = riak.get(bucket, key); // If your entry is Content-Type: application/json... // It will automatically be converted when retrieved. MyPojo s = riak.getAsType(bucket, key, MyPojo.class); // If your entry is Content-Type: application/octet-stream, // you can access the raw bytes. byte[] b = riak.getAsBytes(bucket, key); } }
Riak has the ability to link entries together using an arbitrary tag. This relationship information is stored in the Link
header. The RiakTemplate
exposes a method for linking entries together called link
. Its usage is quite simple:
![]() | Important |
---|---|
A link is uni-directional, so keep in mind that the bucket and key you pass first should be that of the child (or target) object and the second set of bucket/key pairs you pass to the |
@Autowired RiakTemplate riak; riak.link("childbucket", "childkey", "sourcebucket", "sourcekey", "tagname");
Now, querying the metadata on the entry at sourcebucket:sourcekey
will result in a Link header that points to the child object: </riak/childbucket/childkey>; riaktag="tagname"
When entries are linked together in Riak, those relationships can be efficiently traversed on the server using a feature called Link Walking. Rather than requesting each object in a link's relationship individually, a link walk pulls all the related objects at once and sends that data back to the client as MIME-encoded multipart data. As such, it requires special processing to convert those multiple entries into a List
of objects, just as if you had used a get
method. If you don't specify a type to convert the objects to, the linkWalk
method will try to infer it from the bucket name. If the bucket name is not a valid class name, it will default to using a java.util.Map
.
To link walk a relationship and return a list of custom POJOs, you would do something like this:
@Autowired RiakTemplate riak; List<MyPojo> result = riak.linkWalk("sourcebucket", "sourcekey", "tagname", MyPojo.class);
Riak supports Map/Reduce functionality in a couple different ways. You can specify the Javascript source to execute (termed "anonymous" Javascript), you can reference some Javascript already stored in Riak at a specfic bucket and key, or you can reference an Erlang module and function. The Map/Reduce support in SDKV covers all these bases by giving you meaningful abstractions over the Map/Reduce job that represent the various aspects of the Map/Reduce process.
At the highest level, every Map/Reduce request is represented by a MapReduceJob. The MapReduceJob
represents the inputs
, the phases
, and the optional arg
to send to Riak to execute the Map/Reduce job. The toJson
method is responsible for serializing the entire job into the appropriate JSON data to send to Riak.
Riak will accept either a string denoting the bucket in which to get the list of keys to operate on, or a List
of List
s denoting the bucket/key pairs to operate on while executing this Map/Reduce job. If you call the addInputs
method on the job passing a List
with a single string entry, the job will assume you want to operate on an entire bucket. Otherwise, you'll need to pass a multi-dimensional List
of bucket/key pairs.
To operate on an entire bucket:
@Autowired RiakTemplate riak; RiakMapReduceJob job = riak.createMapReduceJob(); List<String> bucket = new ArrayList<String>() {{ add("mybucket"); }}; job.addInputs(bucket); // Will M/R entire bucket
To operate on a set of keys:
import org.springframework.data.keyvalue.riak.mapreduce.*; @Autowired RiakTemplate riak; RiakMapReduceJob job = riak.createMapReduceJob(); List<String> pair = new ArrayList<String>() {{ add("mybucket"); add("mykey"); }}; List<List<String>> keys = new ArrayList<List<String>>() {{ add(pair); }}; job.addInputs(keys); // Will M/R only specified keys
Map/Reduce operations in Riak are broken up into phases. Phases contain a MapReduceOperation
. There are currently two implementations to handle Javascript or Erlang M/R operations: JavascriptMapReduceOperation
and ErlangMapReduceOperation
.
An example Map/Reduce job defining a single "map" phase defined in anonymous Javascript might look like this:
import org.springframework.data.keyvalue.riak.mapreduce.*; @Autowired RiakTemplate riak; RiakMapReduceJob job = riak.createMapReduceJob(); List<String> bucket = new ArrayList<String>() {{ add("mybucket"); }}; job.addInputs(bucket); // M/R the entire bucket MapReduceOperation mapOper = new JavascriptMapReduceOperation("function(v){ ...M/R function body... }"); MapReducePhase mapPhase = new RiakMapReducePhase("map", "javascript", mapOper); job.addPhase(mapPhase);
To execute a configured job on your Riak server, use either the synchronous execute
or asynchronous submit
methods of your configured RiakTemplate
:
Object o = riak.execute(job); // Results of last Map or Reduce phase. Should be a List<?> ...or... List<MyPojo> o = riak.execute(job, MyPojo.class); // Coerce to given type ...or... Future<List<?>> f = riak.submit(job); // Job runs in a separate thread
It's sometimes useful to manage settings like the Quality-of-Service parameters w
and dw
(write and durable write thresholds) and the n_val
setting at the bucket level. It's also possible to list the keys in a particular bucket by calling the getBucketSchema
method, passing true
as the second parameter, which tells the RiakTemplate
to list the keys.
To list the keys in a bucket, you would do something like this:
@Autowired RiakTemplate riak; Map<String, Object> schema = riak.getBucketSchema("mybucket", true); List<String> keys = schema.get("keys") for(String key : keys) { ...do something with each key... }
To update the bucket settings, pass a Map
of properties:
@Autowired RiakTemplate riak; Map<String, Integer> props = new HashMap<String, Integer>(); props.put("n_val", 6); props.put("dw", 3); riak.updateBucketSchema("mybucket", props);
Only the properties specified in the passed-in Map
will be updated. Properties that have already been set in previous operations and not specified in this operation will be unaffected.
SDKV for Riak also includes an asynchronous version of most of the methods available to the RiakTemplate
, whose method calls are all synchronous. The asynchronous version of the template is called AsyncRiakTemplate
.
The AsyncRiakTemplate
has the same basic configuration properties as the synchronous RiakTemplate
. The only other property specific to the AsyncRiakTemplate
you might want to configure is the thread pool the template uses to execute tasks asynchronously (by default a cached ThreadPoolExecutor). Set your ExecutorService
on the template's workerPool
property.
Using the asynchronous Riak support in SDKV means you'll be relying on callbacks to execute your business logic when the requested operation is completed. All asynchronous operations follow a similar pattern:
AsyncKeyValueStoreOperation<?, ?>
as a final parameter.Future<?>
.
To perform an asynchronous get
on a JSON-serialized Map
object which returns a custom object from the callback, you'd do something like:
@Autowired AsyncRiakTemplate riak; Future<MyObject> future = riak.get("mybucket", "mykey", new AsyncKeyValueStoreOperation<Map, MyObject>() { MyObject obj = new MyObject(); MyObject completed(KeyValueStoreMetaData meta, Map result) { obj.setName(result.get("name")); return obj; } MyObject failed(Throwable error) { obj.setError(error); return obj; } }); // Maybe do other work while waiting... MyObject obj = future.get();
If your application uses Groovy, either in a standalone context, or as part of a Grails application, then you could benefit from using the Groovy RiakBuilder
that comes with SDKV for Riak. Underneath, it uses the AsyncRiakTemplate
. To use the RiakBuilder
, pass the constructor a configured AsyncRiakTemplate
.
![]() | Important |
---|---|
Instances of |
The RiakBuilder
implements an easy-to-use DSL for interacting with Riak. It doesn't implement the full set of methods available on the underlying AsyncRiakTemplate
but a subset. The methods that the RiakBuilder
responds to are:
The following example illustrates the different uses of the Riak DSL, including batching requests together into a logical group, using a default bucket name (the node directly beneath riak
will be considered the default bucket to use for the contained operations unless a different one is specified on the operation itself):
def riak = new RiakBuilder(asyncRiakTemplate) riak { test { put(value: [test: "value"]) { completed { v, meta -> meta.key }} put(value: [test: "value"]) { completed { v, meta -> meta.key }} put(value: [test: "value"]) { completed { v, meta -> meta.key }} put(value: [test: "value"]) { completed { v, meta -> meta.key }} mapreduce { query { map(arg: [test: "arg", alist: [1, 2, 3, 4]]) { source "function(v, keyInfo, arg){ return [1]; }" } reduce { source "function(v){ return Riak.reduceSum(v); }" } } failed { it.printStackTrace() } } } } def results = riak.results riak.foreach(bucket: "test") { completed { v, meta -> riak.delete(bucket: "test", key: meta.key) } }
Some important things to note from this example:
completed
and failed
.completed
closure is passed either the result object, or, if your closure is defined with two parameters, the result object and the metadata associated with that entry.results
property. Code that needs to know the output of individual operations within the batch can get access to that object through this property. Note that this means that RiakBuilder
instances are NOT thread-safe.
![]() | Important |
---|---|
Even though the Riak DSL uses an asynchronous template underneath, all operations performed through the DSL will, by default, block until complete. To get a truly asynchronous operation, pass the parameter |
You can pass QosParameters
to Riak DSL operations by simply defining them as parameters to the operation:
def riak = new RiakBuilder(asyncRiakTemplate) def myobj = riak.set(bucket: "mybucket", key: "mykey", qos: ["dw": "all"])
The output of DSL operations will either be passed to the configured completed
callback, or be returned to the caller if no callback is specified. In the example above, the mapreduce
operation has no completed
closure. Therefore, the return of the reduce phase is simply passed back to the builder, which makes that output available on the special results
property.
To gain access to the operation's results immediately, simply assign it to a variable:
def riak = new RiakBuilder(asyncRiakTemplate) def myobj = riak.get(bucket: "mybucket", key: "mykey")
If you add a non-zero wait
value to the operation, "myobj" will contain a Future<?>
rather than the result object itself.
SDKV for Riak includes a couple of useful helper objects to make reading and writing plain text or binary data in Riak really easy. If you want to store a file in Riak, then you can create a RiakOutputStream
and simply write your data to it (making sure to call the "flush" method, which actually sends the data to Riak).
import org.springframework.data.keyvalue.riak.core.RiakTemplate; import org.springframework.data.keyvalue.riak.core.io.RiakOutputStream; public class Example { @Autowired RiakTemplate riak; public void writeToRiak(String bucket, String key, String data) throws Exception { OutputStream out = new RiakOutputStream(riak, bucket, key); try { out.write(data.getBytes()); } finally { out.flush(); out.close(); } } }
Reading data from Riak is similarly easy. SDKV provides a java.io.File
subclass that represents a resource in Riak. There's also a Spring IO Resource abstraction called RiakResource
that can be used anywhere a Resource
is required. There's also an InputStream
implementation called RiakInputStream
.
import org.springframework.data.keyvalue.riak.core.RiakTemplate; import org.springframework.data.keyvalue.riak.core.io.RiakInputStream; public class Example { @Autowired RiakTemplate riak; public String readFromRiak(String bucket, String key) throws Exception { InputStream in = new RiakInputStream(riak, bucket, key); String data; ...read data and work with it... return data; } }