The Store sub-project of Spring for Apache Hadoop provides abstractions for writing and reading various types of data residing in HDFS. We currently support different file types either via our own store accessors or by using the Dataset support in Kite SDK.
Currently, the Store sub-project doesn’t have an XML namespace or javaconfig based configuration classes as it’s considered to be a foundational library. However, this may change in future releases.
Native store abstractions provide various writer and reader interfaces so that the end user don’t have to worry about the underlying implementation actually doing the work on files in HDFS. Implementations are usually strongly typed and provides constructors and setters for additional setup to work with naming, compression codecs and everything else defining the behaviour. Interfaces are meant to be used from integration components which don’t need to know the internal workings of writers and readers.
Main interface writing into a store is a DataWriter which have one method write which simply writes an entity and the backing implementation will handle the rest.
public interface DataWriter<T> { void write(T entity) throws IOException; }
The DataStoreWriter interface adds methods to close and flush a writer. Some of the writers have a property to close a stream after an idle time or a close time has been reached but generally this interface is meant for programmatic control of these operations.
public interface DataStoreWriter<T> extends DataWriter<T>, Flushable, Closeable { }
Different file naming strategies are used to automatically determine the name of a file to be used. Writers without additional naming configuration will usually use a given base path as is. As soon as any type of a strategy is configured, given base path is considered to be a base directory and the name of the file is resolved by file naming strategies.
For example, if defined base path is “/tmp/path” and the
StaticFileNamingStrategy with “data” parameter is used then the actual
file path resolved would be “/tmp/path/data”.
Path path = new Path("/tmp/path"); Configuration config = new Configuration(); TextFileWriter writer = new TextFileWriter(config, path, null); StaticFileNamingStrategy fileNamingStrategy = new StaticFileNamingStrategy("data") writer.setFileNamingStrategy(fileNamingStrategy);
At first look this may feel a little complicated, but it will make sense after more file naming strategies are added. These will also provide facilities for using writers in parallel, or for a re-launched writer to be able to create a new file based on already existing files in the directry. For example, RollingFileNamingStrategy will add a simple increasing value to a file name and will try to initialize itself with the correct position.
Built-in strategies currently supported are StaticFileNamingStrategy, RollingFileNamingStrategy, UuidFileNamingStrategy and CodecFileNamingStrategy. ChainedFileNamingStrategy can be used to chain multiple strategies together where each individual strategy will provide its own part.
File rolling strategy is used to determine a condition in a writer when a current stream should be automatically closed and the next file should be opened. This is usually done together with RollingFileNamingStrategy to rollover when a certain file size limit has been reached.
Currently, only one strategy SizeRolloverStrategy is supported.
Partitioning is a concept of choosing a target file on demand either based on content to be written or any other information available to a writer at the time of the write operation. While it would be perfectly alright to use multiple writers manually, the framework already does all the heavy lifting around partitioning. We work through interfaces and provide a generic default implementation still allowing to plug a customized version if there’s a need for it.
PartitionStrategy is a strategy interface defining PartitionResolver and PartitionKeyResolver.
public interface PartitionStrategy<T,K> { PartitionResolver<K> getPartitionResolver(); PartitionKeyResolver<T, K> getPartitionKeyResolver(); }
PartitionResolver is an interface used to resolve arbitrary partition keys into a path. We don’t force any specific partition key type in the interface level itself but usually the implementation needs to be aware of its type.
public interface PartitionResolver<K> { Path resolvePath(K partitionKey); }
PartitionKeyResolver is an interface which is responsible for creating a partition key from an entity. This is needed because writer interfaces allow us to write entities without an explicit partition key.
public interface PartitionKeyResolver<T, K> { K resolvePartitionKey(T entity); }
PartitionDataStoreWriter is an extension of DataStoreWriter adding a method to write an entity with a partition key. In this context the partition key is something what the partition strategy is able to use.
public interface PartitionDataStoreWriter<T,K> extends DataStoreWriter<T> { void write(T entity, K partitionKey) throws IOException; }
DefaultPartitionStrategy is a generic default implementation meant to be used together with an expression using Spring’s SpEL expression language. PartitionResolver used in DefaultPartitionStrategy expects partition key to be a type of Map<String,Object> and partition key created by PartitionKeyResolver is a DefaultPartitionKey which itself is a Map<String,Object>.
In order to make it easy to work with SpEL and partitioning, map values can be directly accessed with keys and additional partitioning methods has been registered.
SpEL expression is evaluated against a partition key passed into a HDFS writer.
If partition key is a type of Map any property given to a SpEL expression is automatically resolved from a map.
In addition to normal SpEL functionality, a few custom methods have been added to make it easier to build partition paths. These custom methods can be used to work with normal partition concepts like date formatting, lists, ranges and hashes.
path(String... paths)
You can concatenate paths together with a / delimiter. This method can
be used to make the expression less verbose than using a native SpEL
functionality to combine path parts together. To create a path
part1/part2, expression 'part1' + '/' + 'part2' is equivalent to
path('part1','part2').
/.
dateFormat(String pattern) dateFormat(String pattern, Long epoch) dateFormat(String pattern, Date date) dateFormat(String pattern, String datestring) dateFormat(String pattern, String datestring, String dateformat)
Creates a path using date formatting. Internally this method delegates to SimpleDateFormat and needs a Date and a pattern.
Method signature with three parameters can be used to create a custom Date object which is then passed to SimpleDateFormat conversion using a dateformat pattern. This is useful in use cases where partition should be based on a date or time string found from a payload content itself. Default dateformat pattern if omitted is yyyy-MM-dd.
list(Object source, List<List<Object>> lists)
Creates a partition path part by matching a source against a lists denoted by lists.
Lets assume that data is being written and it’s possible to extract an appid from the content. We can automatically do a list based partition by using a partition method list(appid,\{\{'1TO3','APP1','APP2','APP3'},\{'4TO6','APP4','APP5','APP6'}}). This method would create three partitions, 1TO3_list, 4TO6_list and list. The latter is used if no match is found from partition lists passed to lists.
range(Object source, List<Object> list)
Creates a partition path part by matching a source against a list denoted by list using a simple binary search.
The partition method takes source as first argument and a list as the second argument. Behind the scenes this is using the JVM’s binarySearch which works on an Object level so we can pass in anything. Remember that meaningful range match only works if passed in Object and types in list are of same type like Integer. Range is defined by a binarySearch itself so mostly it is to match against an upper bound except the last range in a list. Having a list of \{1000,3000,5000} means that everything above 3000 will be matched with 5000. If that is an issue then simply adding Integer.MAX_VALUE as last range would overflow everything above 5000 into a new partition. Created partitions would then be 1000_range, 3000_range and 5000_range.
hash(Object source, int bucketcount)
Creates a partition path part by calculating hashkey using source`s hashCode and bucketcount. Using a partition method hash(timestamp,2) would then create partitions named 0_hash, 1_hash and 2_hash. Number suffixed with hash is simply calculated using _Object.hashCode() % bucketcount.
Creating a custom partition strategy is as easy as just implementing needed interfaces. Custom strategy may be needed in use cases where it is just not feasible to use SpEL expressions. This will then give total flexibility to implement partitioning as needed.
Below sample demonstrates how a simple customer id could be used as a base for partitioning.
public class CustomerPartitionStrategy implements PartitionStrategy<String, String> { CustomerPartitionResolver partitionResolver = new CustomerPartitionResolver(); CustomerPartitionKeyResolver keyResolver = new CustomerPartitionKeyResolver(); @Override public PartitionResolver<String> getPartitionResolver() { return partitionResolver; } @Override public PartitionKeyResolver<String, String> getPartitionKeyResolver() { return keyResolver; } } public class CustomerPartitionResolver implements PartitionResolver<String> { @Override public Path resolvePath(String partitionKey) { return new Path(partitionKey); } } public class CustomerPartitionKeyResolver implements PartitionKeyResolver<String, String> { @Override public String resolvePartitionKey(String entity) { if (entity.startsWith("customer1")) { return "customer1"; } else if (entity.startsWith("customer2")) { return "customer2"; } else if (entity.startsWith("customer3")) { return "customer3"; } return null; } }
We provide a number of writer implementations to be used based on the type of file to write.
HDFS client library which is usually referred as a DFS Client is using a rather complex set of buffers to make writes fast. Using a compression codec adds yet another internal buffer. One big problem with these buffers is that if a jvm suddenly dies bufferred data is naturally lost.
With TextFileWriter and TextSequenceFileWriter it is possible to enable either append or syncable mode which effectively is causing our store libraries to call sync method which will flush buffers from a client side into a currently active datanodes.
| ![[Note]](images/note.png) | Note | 
|---|---|
| Appending or synching data will be considerably slower than a normal write. It is always a trade-off between fast write and data integrity. Using append or sync with a compression is also problematic because it’s up to a codec implementation when it can actually flush its own data to a datanode. | 
Main interface reading from a store is a DataReader.
public interface DataReader<T> { T read() throws IOException; }
DataStoreReader is an extension of DataReader providing close method for a reader.
public interface DataStoreReader<T> extends DataReader<T>, Closeable { }
Some of the HDFS storage and file formats can be read using an input splits instead of reading a whole file at once. This is a fundamental concept in Hadoop’s MapReduce to parallelize data processing. Instead of reading a lot of small files, which would be a source of a Hadoop’s “small file problem”, one large file can be used. However one need to remember that not all file formats support input splitting especially when compression is used.
Support for reading input split is denoted via a Split interface which simply mark starting and ending positions.
public interface Split { long getStart(); long getLength(); long getEnd(); }
Interface Splitter defines an contract how Split’s are calculate from a given path.
public interface Splitter { List<Split> getSplits(Path path) throws IOException; }
We provide few generic Splitter implementations to construct Split’s.
StaticLengthSplitter is used to split input file with a given length.
StaticBlockSplitter is used to split input by used HDFS file block size. It’s also possible to split further down the road within the blocks itself.
SlopBlockSplitter is an extension of StaticBlockSplitter which tries to estimate how much a split can overflow to a next block to taggle unnecessary overhead if last file block is very small compared to an actual split size.
We provide a number of reader implementations to be used based on the type of file to read.
Supported compression codecs are denoted via an interface CodecInfo which simply defines if codec supports splitting, what is it’s fully qualified java class and what is its default file suffix.
public interface CodecInfo { boolean isSplittable(); String getCodecClass(); String getDefaultSuffix(); }
Codecs provides an enum for easy access to supported codecs.
| ![[Note]](images/note.png) | Note | 
|---|---|
| Lzo based compression codecs doesn’t exist in maven dependencies due to licensing restrictions and need for native libraries. Order to use it add codec classes to classpath and its native libs using java.library.path. | 
One common requirement is to persist a large number of POJOs in serialized form using HDFS. The Kite SDK project provides a Kite Data Module that provides an API for working with datasets stored in HDFS. We are using this functionality and provide a some simple helper classes to aid in configuration and use in a Spring environment.
The Kite SDK project provides support for writing data using both the Avro and Parquet data formats. The data format you choose to use influences the data types you can use in your POJO classes. We’ll discuss the basics of the Java type mapping for the two data formats but we recommend that you consult each project’s documentation for additional details.
| ![[Note]](images/note.png) | Note | 
|---|---|
| Currently, you can’t provide your own schema. This is something that we are considering changing in upcomming releases. We are also planning to provide better mapping support in line with the support we currently provide for NoSQL stores like MongoDB. | 
When using Avro as the data format the schema generation is based on reflection of thet POJO class used. Primitive data types and their corresponding wrapper classes are mapped to the corresponding Avro data type. More complex types, as well as the POJO itself, are mapped to a record type consisting of one or more fields.
The table below shows the mapping from some common types:
Table 6.1. Some common Java to Avro data types mapping
| Java type | Avro type | Comment | 
|---|---|---|
| String | string | [multiblock cell omitted] | 
| int / Integer | int | 32-bit signed integer | 
| long / Long | long | 64-bit signed integer | 
| float / Float | float | 32-bit floating point | 
| double / Double | double | 64-bit floating point | 
| boolean / Boolean | boolean | [multiblock cell omitted] | 
| byte[] | bytes | byte array | 
| java.util.Date | record | [multiblock cell omitted] | 
When using Parquet as the data format the schema generation is based on reflection of thet POJO class used. The POJO class must be a proper JavaBean and not have any nested types. We only support primitive data types and their corresponding wrapper classes plus byte arrays. We do rely on the Avro-to-Parquet mapping support that the Kite SDK uses, so the schema will be generated by Avro.
| ![[Note]](images/note.png) | Note | 
|---|---|
| The Parquet support we currently povide is considered experimental. We are planning to relax a lot of the restrictions on the POJO class in upcoming releases. | 
The table below shows the mapping from some common types:
Table 6.2. Some common Java to Parquet data types mapping
| Java type | Parquet type | Comment | 
|---|---|---|
| String | BINARY/UTF8 | [multiblock cell omitted] | 
| int / Integer | INT32 | 32-bit signed integer | 
| long / Long | INT64 | 64-bit signed integer | 
| float / Float | FLOAT | 32-bit floating point | 
| double / Double | DOUBLE | 64-bit floating point | 
| boolean / Boolean | BOOLEAN | [multiblock cell omitted] | 
| byte[] | BINARY/BYTE_ARRAY | byte array | 
In order to use the dataset support you need to configure the following classes:
The following example shows a simple configuration class:
@Configuration @ImportResource("hadoop-context.xml") public class DatasetConfig { private @Autowired org.apache.hadoop.conf.Configuration hadoopConfiguration; @Bean public DatasetRepositoryFactory datasetRepositoryFactory() { DatasetRepositoryFactory datasetRepositoryFactory = new DatasetRepositoryFactory(); datasetRepositoryFactory.setConf(hadoopConfiguration); datasetRepositoryFactory.setBasePath("/user/spring"); return datasetRepositoryFactory; } @Bean public DatasetDefinition fileInfoDatasetDefinition() { DatasetDefinition definition = new DatasetDefinition(); definition.setFormat(Formats.AVRO.getName()); definition.setTargetClass(FileInfo.class); definition.setAllowNullValues(false); return definition; } }
To write datasets to Hadoop you should use either the AvroPojoDatasetStoreWriter or the ParquetDatasetStoreWriter depending on the data format you want to use.
| ![[Tip]](images/tip.png) | Tip | 
|---|---|
| To mark your fields as nullable use the @Nullable annotation (org.apache.avro.reflect.Nullable). This will result in the schema defining your field as a union of null and your datatype. | 
We are using a FileInfo POJO that we have defined to hold some information based on the files we read from our local file system. The dataset will be stored in a directory that is the name of the class using lowercase, so in this case it would be fileinfo. This directory is placed inside the basePath specified in the configuration of the DatasetRepositoryFactory.:
package org.springframework.samples.hadoop.dataset; import org.apache.avro.reflect.Nullable; public class FileInfo { private String name; private @Nullable String path; private long size; private long modified; public FileInfo(String name, String path, long size, long modified) { this.name = name; this.path = path; this.size = size; this.modified = modified; } public FileInfo() { } public String getName() { return name; } public String getPath() { return path; } public long getSize() { return size; } public long getModified() { return modified; } }
To create a writer add the following bean definition to your configuration class:
@Bean public DataStoreWriter<FileInfo> dataStoreWriter() { return new AvroPojoDatasetStoreWriter<FileInfo>(FileInfo.class, datasetRepositoryFactory(), fileInfoDatasetDefinition()); }
Next, have your class use the writer bean:
private DataStoreWriter<FileInfo> writer; @Autowired public void setDataStoreWriter(DataStoreWriter dataStoreWriter) { this.writer = dataStoreWriter; }
Now we can use the writer, it will be opened automatically once we start writing to it:
FileInfo fileInfo = new FileInfo(file.getName(), file.getParent(), (int)file.length(), file.lastModified()); writer.write(fileInfo);
Once we are done writing we should close the writer:
try { writer.close(); } catch (IOException e) { throw new StoreException("Error closing FileInfo", e); }
We should now have dataset containing all the FileInfo entries in a
/user/spring/demo/fileinfo directory:
$ hdfs dfs -ls /user/spring/* Found 2 items drwxr-xr-x - spring supergroup 0 2014-06-09 17:09 /user/spring/fileinfo/.metadata -rw-r--r-- 3 spring supergroup 13824695 2014-06-09 17:10 /user/spring/fileinfo/6876f250-010a-404a-b8c8-0ce1ee759206.avro
The .metadata directory contains dataset information including the
Avro schema:
$ hdfs dfs -cat /user/spring/fileinfo/.metadata/schema.avsc
{
  "type" : "record",
  "name" : "FileInfo",
  "namespace" : "org.springframework.samples.hadoop.dataset",
  "fields" : [ {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "path",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "size",
    "type" : "long"
  }, {
    "name" : "modified",
    "type" : "long"
  } ]
}
To read datasets to Hadoop we use the DatasetTemplate class.
To create a DatasetTemplate add the following bean definition to your configuration class:
@Bean public DatasetOperations datasetOperations() { DatasetTemplate datasetOperations = new DatasetTemplate(); datasetOperations.setDatasetRepositoryFactory(datasetRepositoryFactory()); return datasetOperations; }
Next, have your class use the DatasetTemplate:
private DatasetOperations datasetOperations; @Autowired public void setDatasetOperations(DatasetOperations datasetOperations) { this.datasetOperations = datasetOperations; }
Now we can read and count the entries using a RecordCallback callback interface that gets called once per retrieved record:
final AtomicLong count = new AtomicLong(); datasetOperations.read(FileInfo.class, new RecordCallback<FileInfo>() { @Override public void doInRecord(FileInfo record) { count.getAndIncrement(); } }); System.out.println("File count: " + count.get());
To create datasets that are partitioned on one or more data fields we use the PartitionStrategy.Builder class that the Kite SDK project provides.
DatasetDefinition definition = new DatasetDefinition(); definition.setPartitionStrategy(new PartitionStrategy.Builder().year("modified").build());
This option lets you specify one or more paths that will be used to partition the files that the data is written to based on the content of the data. You can use any of the FieldPartitioners that are available for the Kite SDK project. We simply use what is specified to create the corresponding partition strategy. The following partitioning functions are available:
year, month, day, hour, minute creates partitions based on the value of a timestamp and creates directories named like "YEAR=2014" (works well with fields of datatype long)
specify function plus field name like:
year("timestamp")
optionally, specify a partition name to replace the default one:
year("timestamp", "YY")
dateformat creates partitions based on a timestamp and a dateformat expression provided - creates directories based on the name provided (works well with fields of datatype long)
specify function plus field name, a name for the partition and the date format like:
dateFormat("timestamp", "Y-M", "yyyyMM")
range creates partitions based on a field value and the upper bounds for each bucket that is specified (works well with fields of datatype int and string)
specify function plus field name and the upper bounds for each partition bucket like:
range("age", 20, 50, 80, Integer.MAX_VALUE)
identity creates partitions based on the exact value of a field (works well with fields of datatype string, long and int)
specify function plus field name, a name for the partition, the type of the field (String or Integer) and the number of values/buckets for the partition like:
identity("region", "R", String.class, 10)
hash creates partitions based on the hash calculated from the value of a field divided into a number of buckets that is specified (works well with all data types)
specify function plus field name and number of buckets like:
hash("lastname", 10)
Multiple expressions can be specified by simply chaining them like:
identity("region", "R", String.class, 10).year("timestamp").month("timestamp")
Spring Hadoop doesn’t have support for configuring store components using xml but have a support using JavaConfig for writer configuration.
JavaConfig is using same concepts found from other parts of a Spring Hadoop where whole configuration logic works around use of an adapter.
@Configuration @EnableDataStoreTextWriter static class Config extends SpringDataStoreTextWriterConfigurerAdapter { @Override public void configure(DataStoreTextWriterConfigurer config) throws Exception { config .basePath("/tmp/foo"); } }
What happened in above example:
@Configuration class extending
SpringDataStoreTextWriterConfigurerAdapter.
configure method having
 DataStoreTextWriterConfigurer
as its argument.
/tmp/foo.
We can also do configuration for other usual properties like,
idleTimeout, closeTimeout, partitioning strategy,
naming strategy and rollover strategy.
@Configuration @EnableDataStoreTextWriter static class Config extends SpringDataStoreTextWriterConfigurerAdapter { @Override public void configure(DataStoreTextWriterConfigurer config) throws Exception { config .basePath("/tmp/store") .idleTimeout(60000) .closeTimeout(120000) .inWritingSuffix(".tmp") .withPartitionStrategy() .map("dateFormat('yyyy/MM/dd/HH/mm', timestamp)") .and() .withNamingStrategy() .name("data") .uuid() .rolling() .name("txt", ".") .and() .withRolloverStrategy() .size("1M"); } }
What happened in above example:
.tmp which will indicate that file
is currently open for writing. Writer will automatically remove this
suffix when file is closed.
yyyy/MM/dd/HH/mm. This will partition data based on timestamp when
write operation happens.
data-38400000-8cf0-11bd-b23e-10b96e4ef00d-1.txt.
1M data is written.
Writer can be auto-wired using DataStoreWriter.
| ![[Important]](images/important.png) | Important | 
|---|---|
| Autowiring by type  | 
static class MyBean { @Autowired DataStoreWriter<String> writer; @Autowired PartitionDataStoreWriter<String, Map<String, Object>> writer; }
In some cases it is more convenient to name the bean instead letting
Spring to create that name automatically. @EnableDataStoreTextWriter
and @EnableDataStorePartitionTextWriter both have a name field
which works in a same way than normal Spring @Bean annotation. You’d
use this custom naming in cases where multiple writers are created and
auto-wiring by type would no longer work.
@Configuration @EnableDataStoreTextWriter(name={"mywriter", "myalias"}) static class Config extends SpringDataStoreTextWriterConfigurerAdapter { }
In above example bean was created with a name mywriter having an
alias named myalias.