5. Riak Support

Riak is a Key/Value datastore that supports Internet-scale data replication for high performance and high availability. Spring Data Key/Value (SDKV) provides access to the Riak datastore over the HTTP REST API using a built-in driver based on Spring 3.0's RestTemplate. In addition to making Key/Value datastore access easier from Java, the RiakTemplate has been designed, from the ground up, to be used from alternative JVM languages like Groovy or JRuby.

Since the SDKV support for Riak uses the stateless REST API, there are no connection factories to manage or other stateful objects to keep tabs on. The helper you'll spend the most time working with is likely the thread-safe RiakTemplate or RiakKeyValueTemplate. Your choice of which to use will depend on how you want to manage buckets and keys. SDKV supports two ways to interact with Riak. If you want to use the convention you're likely already familiar with, namely of storing an entry with a given key in a "bucket" by passing the bucket and key name separately, you'll want to use the RiakTemplate. If you want to use a single object to represent your bucket and key pair, you can use the RiakKeyValueTemplate. It supports a key object that is encoded using one of several different methods:

5.1 Configuring the RiakTemplate

This is likely the easiest path to using SDKV for Riak, as the bucket and key are passed separately. The examples that follow will assume you're using this version of the the template.

There are only two options you need to set to specify the Riak server to use in your RiakTemplate object: "defaultUri" and "mapReduceUri". Encoded with the URI should be placeholders for the bucket and the key, which will be filled in by the RestTemplate when the request is made.

[Important]Important

You can also turn the internal, ETag-based object cache off by setting useCache="false". It's generally recommended, however, to leave the internal cache on as the ETag matching will pick up any changes made to the entry on the Riak side and your application will benefit from greatly-increased performance for often-requested objects.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="riakTemplate" class="org.springframework.data.keyvalue.riak.core.RiakTemplate" 
          p:defaultUri="http://localhost:8098/riak/{bucket}/{key}"
          p:mapReduceUri="http://localhost:8098/mapred"
          p:useCache="true"/>
    
</beans>

5.1.1 Advanced Template Configuration

There are a couple additional properties on the RiakTemplate that can be changed from their defaults. If you want to specify your own ConversionService to use when converting objects for storage inside Riak, then set it on the "conversionService" property:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="conversionService" class="com.mycompany.convert.MyConversionService"/> 
    <bean id="riakTemplate" class="org.springframework.data.keyvalue.riak.core.RiakTemplate" 
          p:defaultUri="http://localhost:8098/riak/{bucket}/{key}"
          p:mapReduceUri="http://localhost:8098/mapred"
          p:conversionService-ref="conversionService"/>

</beans>

Depending on the application, it might be useful to set default Quality-of-Service parameters. In Riak paralance, these are the "dw", "w", and "r" parameters. They can be set to an integer representing the number of vnodes that need to report having received the data before declaring the operation a success, or the string "one", "all", or (the default) "quorum". These values can be overridden by passing a different set of QosParameters to the set/get operation you're performing.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="qos" class="org.springframework.data.keyvalue.riak.core.RiakQosParameters"
          p:durableWriteThreshold="all"
          p:writeThreshold="all"/>
    <bean id="riakTemplate" class="org.springframework.data.keyvalue.riak.core.RiakTemplate" 
          p:defaultUri="http://localhost:8098/riak/{bucket}/{key}"
          p:mapReduceUri="http://localhost:8098/mapred"
          p:defaultQosParameters-ref="qos"/>

</beans>

You can also set a specific ClassLoader to use when loading objects from Riak. Just set the classLoader property:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="riakTemplate" class="org.springframework.data.keyvalue.riak.core.RiakTemplate" 
          p:defaultUri="http://localhost:8098/riak/{bucket}/{key}"
          p:mapReduceUri="http://localhost:8098/mapred"
          p:classLoader-ref="customClassLoader"/>

</beans>

5.2 Working with Objects using the RiakTemplate

One of the primary goals of the SDKV project is to make accessing Key/Value stores easier for the developer by taking away the mundane tasks of basic IO, buffering, type conversion, exception handling, and sundry other logistical concerns so the developer can focus on creating great applications. SDKV for Riak works toward this goal by making basic persistence and data access as easy as using a Map.

5.2.1 Saving data into Riak

To store data in Riak, use one of the six different set methods:

import org.springframework.data.keyvalue.riak.core.RiakTemplate;

public class Example {

    @Autowired
    RiakTemplate riak;

    public void setData(String bucket, String key, String data) throws Exception {
        riak.set(bucket, key, data); // Set as Content-Type: text/plain
        riak.setAsBytes(bucket, key, data.getBytes()); // Set as Content-Type: application/octet-stream
    }
  
    public void setData(String bucket, String key, MyPojo data) throws Exception {
        riak.set(bucket, key, data); // Converted to JSON automatically, Content-Type: application/json
    }

}
        

Additionally, there is a setWithMetaData method that takes a Map of metadata that will be set as the outgoing HTTP headers. To set custom metadata, your key should be prefixed with X-Riak-Meta- e.g. X-Riak-Meta-Custom-Header.

Letting Riak generate the key

Riak has the ability to generate random IDs for you when storing objects. The RiakTemplate exposes this capability via the put method. It will return the ID it generated for you as a String.

import org.springframework.data.keyvalue.riak.core.RiakTemplate;

public class Example {

    @Autowired
    RiakTemplate riak;

    public String setData(String bucket, String data) throws Exception {
        String id = riak.put(bucket, data); // Returns the generated ID
        return id;
    }

}
          

5.2.2 Retrieving data from Riak

Retrieving data from Riak is just as easy. There are actually 13 different get methods on RiakTemplate that give the developer a wide range options for accessing and converting your data.

Assuming you've stored a POJO using an appropriate set method, you can retrieve that object from Riak using a get:

import org.springframework.data.keyvalue.riak.core.RiakTemplate;

  public class Example {

      @Autowired
      RiakTemplate riak;

      public void getData(String bucket, String key) throws Exception {
          // What you get depends on Content-Type. 
          // application/json=Map, text/plain=String, etc...
          Object o = riak.get(bucket, key); 
        
          // If your entry is Content-Type: application/json...
          // It will automatically be converted when retrieved.
          MyPojo s = riak.getAsType(bucket, key, MyPojo.class);
        
          // If your entry is Content-Type: application/octet-stream,
          // you can access the raw bytes.
          byte[] b = riak.getAsBytes(bucket, key); 
      }

  }
        

5.3 Linking Entries

Riak has the ability to link entries together using an arbitrary tag. This relationship information is stored in the Link header. The RiakTemplate exposes a method for linking entries together called link. Its usage is quite simple:

[Important]Important

A link is uni-directional, so keep in mind that the bucket and key you pass first should be that of the child (or target) object and the second set of bucket/key pairs you pass to the link method is the source of the relationship. It's this second entry that will receive an updated Link header that points to the child or target entry.

@Autowired
RiakTemplate riak;

riak.link("childbucket", "childkey", "sourcebucket", "sourcekey", "tagname");
      

Now, querying the metadata on the entry at sourcebucket:sourcekey will result in a Link header that points to the child object: </riak/childbucket/childkey>; riaktag="tagname"

5.3.1 Link Walking

When entries are linked together in Riak, those relationships can be efficiently traversed on the server using a feature called Link Walking. Rather than requesting each object in a link's relationship individually, a link walk pulls all the related objects at once and sends that data back to the client as MIME-encoded multipart data. As such, it requires special processing to convert those multiple entries into a List of objects, just as if you had used a get method. If you don't specify a type to convert the objects to, the linkWalk method will try to infer it from the bucket name. If the bucket name is not a valid class name, it will default to using a java.util.Map.

To link walk a relationship and return a list of custom POJOs, you would do something like this:

@Autowired
RiakTemplate riak;

List<MyPojo> result = riak.linkWalk("sourcebucket", "sourcekey", "tagname", MyPojo.class);
        

5.4 Map/Reduce

Riak supports Map/Reduce functionality in a couple different ways. You can specify the Javascript source to execute (termed "anonymous" Javascript), you can reference some Javascript already stored in Riak at a specfic bucket and key, or you can reference an Erlang module and function. The Map/Reduce support in SDKV covers all these bases by giving you meaningful abstractions over the Map/Reduce job that represent the various aspects of the Map/Reduce process.

At the highest level, every Map/Reduce request is represented by a MapReduceJob. The MapReduceJob represents the inputs, the phases, and the optional arg to send to Riak to execute the Map/Reduce job. The toJson method is responsible for serializing the entire job into the appropriate JSON data to send to Riak.

5.4.1 Specifying Inputs

Riak will accept either a string denoting the bucket in which to get the list of keys to operate on, or a List of Lists denoting the bucket/key pairs to operate on while executing this Map/Reduce job. If you call the addInputs method on the job passing a List with a single string entry, the job will assume you want to operate on an entire bucket. Otherwise, you'll need to pass a multi-dimensional List of bucket/key pairs.

To operate on an entire bucket:

@Autowired
RiakTemplate riak;

RiakMapReduceJob job = riak.createMapReduceJob();
List<String> bucket = new ArrayList<String>() {{
  add("mybucket");
}};
job.addInputs(bucket); // Will M/R entire bucket
        

To operate on a set of keys:

import org.springframework.data.keyvalue.riak.mapreduce.*;
          
@Autowired
RiakTemplate riak;

RiakMapReduceJob job = riak.createMapReduceJob();

List<String> pair = new ArrayList<String>() {{
  add("mybucket");
  add("mykey");
}};
List<List<String>> keys = new ArrayList<List<String>>() {{
  add(pair);
}};
job.addInputs(keys); // Will M/R only specified keys
        

5.4.2 Defining Phases

Map/Reduce operations in Riak are broken up into phases. Phases contain a MapReduceOperation. There are currently two implementations to handle Javascript or Erlang M/R operations: JavascriptMapReduceOperation and ErlangMapReduceOperation.

An example Map/Reduce job defining a single "map" phase defined in anonymous Javascript might look like this:

import org.springframework.data.keyvalue.riak.mapreduce.*;

@Autowired
RiakTemplate riak;

RiakMapReduceJob job = riak.createMapReduceJob();
List<String> bucket = new ArrayList<String>() {{
  add("mybucket");
}};

job.addInputs(bucket); // M/R the entire bucket

MapReduceOperation mapOper = new JavascriptMapReduceOperation("function(v){ ...M/R function body... }");
MapReducePhase mapPhase = new RiakMapReducePhase("map", "javascript", mapOper);

job.addPhase(mapPhase);
        

5.4.3 Executing and Working with the Result

To execute a configured job on your Riak server, use either the synchronous execute or asynchronous submit methods of your configured RiakTemplate:

Object o = riak.execute(job); // Results of last Map or Reduce phase. Should be a List<?>

...or...

List<MyPojo> o = riak.execute(job, MyPojo.class); // Coerce to given type

...or...

Future<List<?>> f = riak.submit(job); // Job runs in a separate thread
        

5.5 Managing Bucket Properties

It's sometimes useful to manage settings like the Quality-of-Service parameters w and dw (write and durable write thresholds) and the n_val setting at the bucket level. It's also possible to list the keys in a particular bucket by calling the getBucketSchema method, passing true as the second parameter, which tells the RiakTemplate to list the keys.

To list the keys in a bucket, you would do something like this:

@Autowired
RiakTemplate riak;

Map<String, Object> schema = riak.getBucketSchema("mybucket", true);
List<String> keys = schema.get("keys")
for(String key : keys) {
  ...do something with each key...
}
      

To update the bucket settings, pass a Map of properties:

@Autowired
RiakTemplate riak;

Map<String, Integer> props = new HashMap<String, Integer>();
props.put("n_val", 6);
props.put("dw", 3);

riak.updateBucketSchema("mybucket", props);
      

Only the properties specified in the passed-in Map will be updated. Properties that have already been set in previous operations and not specified in this operation will be unaffected.

5.6 Asynchronous Access

SDKV for Riak also includes an asynchronous version of most of the methods available to the RiakTemplate, whose method calls are all synchronous. The asynchronous version of the template is called AsyncRiakTemplate.

5.6.1 Template Configuration

The AsyncRiakTemplate has the same basic configuration properties as the synchronous RiakTemplate. The only other property specific to the AsyncRiakTemplate you might want to configure is the thread pool the template uses to execute tasks asynchronously (by default a cached ThreadPoolExecutor). Set your ExecutorService on the template's workerPool property.

5.6.2 Callbacks

Using the asynchronous Riak support in SDKV means you'll be relying on callbacks to execute your business logic when the requested operation is completed. All asynchronous operations follow a similar pattern:

  • They are named similarly to their synchronous counterparts.
  • They take a AsyncKeyValueStoreOperation<?, ?> as a final parameter.
  • They return a Future<?>.

To perform an asynchronous get on a JSON-serialized Map object which returns a custom object from the callback, you'd do something like:

@Autowired
AsyncRiakTemplate riak;

Future<MyObject> future = riak.get("mybucket", "mykey", new AsyncKeyValueStoreOperation<Map, MyObject>() {

  MyObject obj = new MyObject();
  
  MyObject completed(KeyValueStoreMetaData meta, Map result) {
    obj.setName(result.get("name"));
    return obj;
  }

  MyObject failed(Throwable error) {
    obj.setError(error);
    return obj;
  }
  
});

// Maybe do other work while waiting...
MyObject obj = future.get();
        

5.7 Groovy Builder Support

If your application uses Groovy, either in a standalone context, or as part of a Grails application, then you could benefit from using the Groovy RiakBuilder that comes with SDKV for Riak. Underneath, it uses the AsyncRiakTemplate. To use the RiakBuilder, pass the constructor a configured AsyncRiakTemplate.

[Important]Important

Instances of RiakBuilder are NOT thread-safe and should not be shared across threads.

The RiakBuilder implements an easy-to-use DSL for interacting with Riak. It doesn't implement the full set of methods available on the underlying AsyncRiakTemplate but a subset. The methods that the RiakBuilder responds to are:

  • set
  • setAsBytes
  • put
  • get
  • getAsBytes
  • getAsType
  • containsKey
  • delete
  • foreach

5.7.1 Riak DSL Usage

The following example illustrates the different uses of the Riak DSL, including batching requests together into a logical group, using a default bucket name (the node directly beneath riak will be considered the default bucket to use for the contained operations unless a different one is specified on the operation itself):

def riak = new RiakBuilder(asyncRiakTemplate)
riak {
  test {
    put(value: [test: "value"]) { completed { v, meta -> meta.key }}
    put(value: [test: "value"]) { completed { v, meta -> meta.key }}
    put(value: [test: "value"]) { completed { v, meta -> meta.key }}
    put(value: [test: "value"]) { completed { v, meta -> meta.key }}

    mapreduce {
      query {
        map(arg: [test: "arg", alist: [1, 2, 3, 4]]) {
          source "function(v, keyInfo, arg){ return [1]; }"
        }
        reduce {
          source "function(v){ return Riak.reduceSum(v); }"
        }
      }
      failed { it.printStackTrace() }
    }
  }
}
def results = riak.results

riak.foreach(bucket: "test") {
  completed { v, meta ->
    riak.delete(bucket: "test", key: meta.key)
  }
}
        

Some important things to note from this example:

  • Each operation in the Riak DSL has two callbacks: completed and failed.
  • The completed closure is passed either the result object, or, if your closure is defined with two parameters, the result object and the metadata associated with that entry.
  • Operations can be enclosed in an arbitrarily-named closure which the builder interprets as a default bucket name (in this case, the node "test" tells the builder to use the bucket name "test" for a default, unless one is specified on one of the enclosed operations).
  • Each operation within a builder's execution will be accumulated inside the special results property. Code that needs to know the output of individual operations within the batch can get access to that object through this property. Note that this means that RiakBuilder instances are NOT thread-safe.

[Important]Important

Even though the Riak DSL uses an asynchronous template underneath, all operations performed through the DSL will, by default, block until complete. To get a truly asynchronous operation, pass the parameter wait: 0 (or give a meaningful timeout in milliseconds to wait for the operation to complete) on the operation.

QosParameters on Riak DSL Operations

You can pass QosParameters to Riak DSL operations by simply defining them as parameters to the operation:

def riak = new RiakBuilder(asyncRiakTemplate)

def myobj = riak.set(bucket: "mybucket", key: "mykey", qos: ["dw": "all"])
          

Working with Riak DSL Output

The output of DSL operations will either be passed to the configured completed callback, or be returned to the caller if no callback is specified. In the example above, the mapreduce operation has no completed closure. Therefore, the return of the reduce phase is simply passed back to the builder, which makes that output available on the special results property.

To gain access to the operation's results immediately, simply assign it to a variable:

def riak = new RiakBuilder(asyncRiakTemplate)

def myobj = riak.get(bucket: "mybucket", key: "mykey")
          

If you add a non-zero wait value to the operation, "myobj" will contain a Future<?> rather than the result object itself.

5.8 Working with streams

SDKV for Riak includes a couple of useful helper objects to make reading and writing plain text or binary data in Riak really easy. If you want to store a file in Riak, then you can create a RiakOutputStream and simply write your data to it (making sure to call the "flush" method, which actually sends the data to Riak).

import org.springframework.data.keyvalue.riak.core.RiakTemplate;
import org.springframework.data.keyvalue.riak.core.io.RiakOutputStream;
        
public class Example {

    @Autowired
    RiakTemplate riak;

    public void writeToRiak(String bucket, String key, String data) throws Exception {
        OutputStream out = new RiakOutputStream(riak, bucket, key);
        try {
            out.write(data.getBytes());
        } finally {
            out.flush();
            out.close();
        }
    }
    
}
      

Reading data from Riak is similarly easy. SDKV provides a java.io.File subclass that represents a resource in Riak. There's also a Spring IO Resource abstraction called RiakResource that can be used anywhere a Resource is required. There's also an InputStream implementation called RiakInputStream.

import org.springframework.data.keyvalue.riak.core.RiakTemplate;
import org.springframework.data.keyvalue.riak.core.io.RiakInputStream;
        
public class Example {

    @Autowired
    RiakTemplate riak;

    public String readFromRiak(String bucket, String key) throws Exception {
        InputStream in = new RiakInputStream(riak, bucket, key);
        String data;
        ...read data and work with it...
        return data;
    }
    
}