The Store sub-project of Spring for Apache Hadoop provides abstractions for writing and reading various types of data residing in HDFS. We currently support different file types either via our own store accessors or by using the Dataset support in Kite SDK.
Currently, the Store sub-project doesn’t have an XML namespace or javaconfig based configuration classes as it's considered to be a foundational library. However, this may change in future releases.
Native store abstractions provide various writer and reader interfaces so that the end user don't have to worry about the underlying implementation actually doing the work on files in HDFS. Implementations are usually strongly typed and provides constructors and setters for additional setup to work with naming, compression codecs and everything else defining the behaviour. Interfaces are meant to be used from integration components which don’t need to know the internal workings of writers and readers.
Main interface writing into a store is a DataWriter
which have one method write
which simply writes an entity and the backing
implementation will handle the rest.
public interface DataWriter<T> { void write(T entity) throws IOException; }
The DataStoreWriter
interface adds methods to close and
flush a writer. Some of the writers have a property to close a stream after
an idle time has been reached but generally this interface is meant for
programmatic control of these operations.
public interface DataStoreWriter<T> extends DataWriter<T>, Flushable, Closeable { }
Different file naming strategies are used to automatically determine the name of a file to be used. Writers without additional naming configuration will usually use a given base path as is. As soon as any type of a strategy is configured, given base path is considered to be a base directory and the name of the file is resolved by file naming strategies.
For example, if defined base path is “/tmp/path”
and the
StaticFileNamingStrategy
with “data”
parameter is used
then the actual file path resolved would be “/tmp/path/data”
.
Path path = new Path("/tmp/path"); Configuration config = new Configuration(); TextFileWriter writer = new TextFileWriter(config, path, null); StaticFileNamingStrategy fileNamingStrategy = new StaticFileNamingStrategy("data") writer.setFileNamingStrategy(fileNamingStrategy);
At first look this may feel a little complicated, but it will make sense after
more file naming strategies are added. These will also provide facilities for using
writers in parallel, or for a re-launched writer to be able to create a new file based on already
existing files in the directry. For example, RollingFileNamingStrategy
will add a simple increasing value to a file name and will try to initialize
itself with the correct position.
Built-in strategies currently supported are
StaticFileNamingStrategy
,
RollingFileNamingStrategy
,
UuidFileNamingStrategy
and
CodecFileNamingStrategy
.
ChainedFileNamingStrategy
can be used to chain
multiple strategies together where each individual strategy will
provide its own part.
File rolling strategy is used to determine a condition in a writer
when a current stream should be automatically closed and the next file should be opened.
This is usually done together with RollingFileNamingStrategy
to rollover when a certain file size limit has been reached.
Currently, only one strategy SizeRolloverStrategy
is supported.
Partitioning is a concept of choosing a target file on demand either based on content to be written or any other information available to a writer at the time of the write operation. While it would be perfectly alright to use multiple writers manually, the framework already does all the heavy lifting around partitioning. We work through interfaces and provide a generic default implementation still allowing to plug a customized version if there’s a need for it.
PartitionStrategy
is a strategy interface
defining PartitionResolver
and
PartitionKeyResolver
.
public interface PartitionStrategy<T,K> { PartitionResolver<K> getPartitionResolver(); PartitionKeyResolver<T, K> getPartitionKeyResolver(); }
PartitionResolver
is an interface used
to resolve arbitrary partition keys into a path. We don’t force any specific
partition key type in the interface level itself but usually the implementation
needs to be aware of its type.
public interface PartitionResolver<K> { Path resolvePath(K partitionKey); }
PartitionKeyResolver
is an interface which is
responsible for creating a partition key from an entity. This is needed because writer
interfaces allow us to write entities without an explicit partition key.
public interface PartitionKeyResolver<T, K> { K resolvePartitionKey(T entity); }
PartitionDataStoreWriter
is an extension of
DataStoreWriter
adding a method to write an entity
with a partition key. In this context the partition key is something what the
partition strategy is able to use.
public interface PartitionDataStoreWriter<T,K> extends DataStoreWriter<T> { void write(T entity, K partitionKey) throws IOException; }
DefaultPartitionStrategy
is a generic default
implementation meant to be used together with an expression using Spring's SpEL expression language.
PartitionResolver
used in
DefaultPartitionStrategy
expects partition key to be a type
of Map<String,Object> and partition key created by
PartitionKeyResolver
is a
DefaultPartitionKey
which itself is a
Map<String,Object>.
In order to make it easy to work with SpEL and partitioning, map values can be directly accessed with keys and additional partitioning methods has been registered.
SpEL expression is evaluated against a partition key passed into a HDFS writer.
If partition key is a type of Map any property given to a SpEL expression is automatically resolved from a map.
In addition to normal SpEL functionality, a few custom methods have been added to make it easier to build partition paths. These custom methods can be used to work with normal partition concepts like date formatting, lists, ranges and hashes.
path(String... paths)
You can concatenate paths together with a /
delimiter.
This method can be used to make the expression less verbose than using a
native SpEL functionality to combine path parts together. To create a path
part1/part2, expression
'part1' + '/' + 'part2' is equivalent to
path('part1','part2').
dateFormat(String pattern) dateFormat(String pattern, Long epoch) dateFormat(String pattern, Date date) dateFormat(String pattern, String datestring) dateFormat(String pattern, String datestring, String dateformat)
Creates a path using date formatting. Internally this method delegates
to SimpleDateFormat
and needs a Date and a pattern.
Method signature with three parameters can be used to create a custom
Date
object which is then passed to
SimpleDateFormat
conversion using a dateformat
pattern. This is useful in use cases where partition should be based on
a date or time string found from a payload content itself. Default dateformat
pattern if omitted is yyyy-MM-dd.
pattern. Pattern compatible with SimpleDateFormat to produce a final output.
epoch. Timestamp as Long which is converted into a Date.
date. A Date to be formatted.
dateformat. Secondary pattern to convert datestring into a Date.
datestring. Date as a String
list(Object source, List<List<Object>> lists)
Creates a partition path part by matching a source against a lists denoted by lists.
Lets assume that data is being written and it’s possible to extract an appid from the content. We can automatically do a list based partition by using a partition method list(appid,{{'1TO3','APP1','APP2','APP3'},{'4TO6','APP4','APP5','APP6'}}). This method would create three partitions, 1TO3_list, 4TO6_list and list. The latter is used if no match is found from partition lists passed to lists.
range(Object source, List<Object> list)
Creates a partition path part by matching a source against a list denoted by list using a simple binary search.
The partition method takes source as first argument and a list as the
second argument. Behind the scenes this is using the JVM’s binarySearch which
works on an Object level so we can pass in anything. Remember that meaningful
range match only works if passed in Object and types in list are of same type
like Integer
. Range is defined by a binarySearch itself
so mostly it is to match against an upper bound except the last range in a list.
Having a list of {1000,3000,5000} means that everything
above 3000 will be matched with 5000. If that is an issue then simply adding
Integer.MAX_VALUE as last range would overflow everything
above 5000 into a new partition. Created partitions would then be
1000_range, 3000_range and
5000_range.
hash(Object source, int bucketcount)
Creates a partition path part by calculating hashkey using source`s hashCode and bucketcount. Using a partition method hash(timestamp,2) would then create partitions named 0_hash, 1_hash and 2_hash. Number suffixed with _hash is simply calculated using Object.hashCode() % bucketcount.
Creating a custom partition strategy is as easy as just implementing needed interfaces. Custom strategy may be needed in use cases where it is just not feasible to use SpEL expressions. This will then give total flexibility to implement partitioning as needed.
Below sample demonstrates how a simple customer id could be used as a base for partitioning.
public class CustomerPartitionStrategy implements PartitionStrategy<String, String> { CustomerPartitionResolver partitionResolver = new CustomerPartitionResolver(); CustomerPartitionKeyResolver keyResolver = new CustomerPartitionKeyResolver(); @Override public PartitionResolver<String> getPartitionResolver() { return partitionResolver; } @Override public PartitionKeyResolver<String, String> getPartitionKeyResolver() { return keyResolver; } } public class CustomerPartitionResolver implements PartitionResolver<String> { @Override public Path resolvePath(String partitionKey) { return new Path(partitionKey); } } public class CustomerPartitionKeyResolver implements PartitionKeyResolver<String, String> { @Override public String resolvePartitionKey(String entity) { if (entity.startsWith("customer1")) { return "customer1"; } else if (entity.startsWith("customer2")) { return "customer2"; } else if (entity.startsWith("customer3")) { return "customer3"; } return null; } }
We provide a number of writer implementations to be used based on the type of file to write.
TextFileWriter
.
an implementation meant to write a
simple text data where entities are separated by a delimiter. Simple example for this
is a text file with line terminations.
DelimitedTextFileWriter
.
an extension atop of
TextFileWriter
where written entity itself is also delimited.
Simple example for this is a csv file.
TextSequenceFileWriter
.
a similar implementation
to TextFileWriter
except that backing file is a
Hadoop's SequenceFile
.
PartitionTextFileWriter
.
wraps multiple
TextFileWriter
s providing automatic partitioning functionality.
Main interface reading from a store is a DataReader
.
public interface DataReader<T> { T read() throws IOException; }
DataStoreReader
is an extension of
DataReader
providing close method for a reader.
public interface DataStoreReader<T> extends DataReader<T>, Closeable { }
Some of the HDFS storage and file formats can be read using an input splits instead of reading a whole file at once. This is a fundamental concept in Hadoop’s MapReduce to parallelize data processing. Instead of reading a lot of small files, which would be a source of a Hadoop’s “small file problem”, one large file can be used. However one need to remember that not all file formats support input splitting especially when compression is used.
Support for reading input split is denoted via a Split interface which simply mark starting and ending positions.
public interface Split { long getStart(); long getLength(); long getEnd(); }
Interface Splitter defines an contract how Split’s are calculate from a given path.
public interface Splitter { List<Split> getSplits(Path path) throws IOException; }
We provide few generic Splitter implementations to construct Split’s.
StaticLengthSplitter
is used to split input file with
a given length.
StaticBlockSplitter
is used to split input by used HDFS
file block size. It’s also possible to split further down the road within
the blocks itself.
SlopBlockSplitter
is an extension of
StaticBlockSplitter
which tries to estimate how much a split
can overflow to a next block to taggle unnecessary overhead if last file block is
very small compared to an actual split size.
We provide a number of reader implementations to be used based on the type of file to read.
TextFileReader
.
used to read data written by
a TextFileWriter
.
DelimitedTextFileReader
.
used to read data writte
by a DelimitedTextFileWriter
.
TextSequenceFileReader
.
used to read data written
by a TextSequenceFileWriter
.
Supported compression codecs are denoted via an interface
CodecInfo
which simply defines if codec supports
splitting, what is it’s fully qualified java class and what is its default
file suffix.
public interface CodecInfo { boolean isSplittable(); String getCodecClass(); String getDefaultSuffix(); }
Codecs
provides an enum for easy access to
supported codecs.
GZIP - org.apache.hadoop.io.compress.GzipCodec
SNAPPY - org.apache.hadoop.io.compress.SnappyCodec
BZIP2 - org.apache.hadoop.io.compress.BZip2Codec
LZO - com.hadoop.compression.lzo.LzoCodec
(non-splittable)
SLZO - com.hadoop.compression.lzo.LzoCodec
(splittable)
One common requirement is to persist a large number of POJOs in serialized form using HDFS. The Kite SDK project provides a Kite Data Module that provides an API for working with datasets stored in HDFS. We are using this functionality and provide a some simple helper classes to aid in configuration and use in a Spring environment.
The Kite SDK project provides support for writing data using both the Avro and Parquet data formats. The data format you choose to use influences the data types you can use in your POJO classes. We'll discuss the basics of the Java type mapping for the two data formats but we recommend that you consult each project's documentation for additional details.
Note | |
---|---|
Currently, you can't provide your own schema. This is something that we are considering changing in upcomming releases. We are also planning to provide better mapping support in line with the support we currently provide for NoSQL stores like MongoDB. |
When using Avro as the data format the schema generation is based on reflection of thet POJO class used. Primitive data types and their corresponding wrapper classes are mapped to the corresponding Avro data type. More complex types, as well as the POJO itself, are mapped to a record type consisting of one or more fields.
The table below shows the mapping from some common types:
Table 5.1. Some common Java to Avro data types mapping
Java type | Avro type | Comment |
---|---|---|
String | string | |
int / Integer | int | 32-bit signed integer |
long / Long | long | 64-bit signed integer |
float / Float | float | 32-bit floating point |
double / Double | double | 64-bit floating point |
boolean / Boolean | boolean | |
byte[] | bytes | byte array |
java.util.Date | record |
When using Parquet as the data format the schema generation is based on reflection of thet POJO class used. The POJO class must be a proper JavaBean and not have any nested types. We only support primitive data types and their corresponding wrapper classes plus byte arrays. We do rely on the Avro-to-Parquet mapping support that the Kite SDK uses, so the schema will be generated by Avro.
Note | |
---|---|
The Parquet support we currently povide is considered experimental. We are planning to relax a lot of the restrictions on the POJO class in upcoming releases. |
The table below shows the mapping from some common types:
Table 5.2. Some common Java to Parquet data types mapping
Java type | Parquet type | Comment |
---|---|---|
String | BINARY/UTF8 | |
int / Integer | INT32 | 32-bit signed integer |
long / Long | INT64 | 64-bit signed integer |
float / Float | FLOAT | 32-bit floating point |
double / Double | DOUBLE | 64-bit floating point |
boolean / Boolean | BOOLEAN | |
byte[] | BINARY/BYTE_ARRAY | byte array |
In order to use the dataset support you need to configure the following classes:
DatasetRepositoryFactory
that needs a org.apache.hadoop.conf.Configuration
so we
know how to connect to HDFS and a base path where the data will be written.
DatasetDefinition
that defines the dataset you are writing. Configuration options include the
POJO class that is being stored, the type of format to use (Avro or Parquet). You can also specify whether to allow null values for
all fields (default is false) and an optional partition strategy to use for the dataset (see below for partitioning).
The following example shows a simple configuration class:
@Configuration @ImportResource("hadoop-context.xml") public class DatasetConfig { private @Autowired org.apache.hadoop.conf.Configuration hadoopConfiguration; @Bean public DatasetRepositoryFactory datasetRepositoryFactory() { DatasetRepositoryFactory datasetRepositoryFactory = new DatasetRepositoryFactory(); datasetRepositoryFactory.setConf(hadoopConfiguration); datasetRepositoryFactory.setBasePath("/user/spring"); return datasetRepositoryFactory; } @Bean public DatasetDefinition fileInfoDatasetDefinition() { DatasetDefinition definition = new DatasetDefinition(); definition.setFormat(Formats.AVRO.getName()); definition.setTargetClass(FileInfo.class); definition.setAllowNullValues(false); return definition; } }
To write datasets to Hadoop you should use either the AvroPojoDatasetStoreWriter
or
the ParquetDatasetStoreWriter
depending on the data format you want to use.
Tip | |
---|---|
To mark your fields as nullable use the |
We are using a FileInfo
POJO that we have defined to hold some information based on the files we read from our local
file system. The dataset will be stored in a directory that is the name of the class using lowercase, so in this case it would be fileinfo.
This directory is placed inside the basePath specified in the configuration of the DatasetRepositoryFactory
.:
package org.springframework.samples.hadoop.dataset; import org.apache.avro.reflect.Nullable; public class FileInfo { private String name; private @Nullable String path; private long size; private long modified; public FileInfo(String name, String path, long size, long modified) { this.name = name; this.path = path; this.size = size; this.modified = modified; } public FileInfo() { } public String getName() { return name; } public String getPath() { return path; } public long getSize() { return size; } public long getModified() { return modified; } }
To create a writer add the following bean definition to your configuration class:
@Bean public DataStoreWriter<FileInfo> dataStoreWriter() { return new AvroPojoDatasetStoreWriter<FileInfo>(FileInfo.class, datasetRepositoryFactory(), fileInfoDatasetDefinition()); }
Next, have your class use the writer bean:
private DataStoreWriter<FileInfo> writer; @Autowired public void setDataStoreWriter(DataStoreWriter dataStoreWriter) { this.writer = dataStoreWriter; }
Now we can use the writer, it will be opened automatically once we start writing to it:
FileInfo fileInfo = new FileInfo(file.getName(), file.getParent(), (int)file.length(), file.lastModified()); writer.write(fileInfo);
Once we are done writing we should close the writer:
try { writer.close(); } catch (IOException e) { throw new StoreException("Error closing FileInfo", e); }
We should now have dataset containing all the FileInfo entries in a /user/spring/demo/fileinfo
directory:
$ hdfs dfs -ls /user/spring/* Found 2 items drwxr-xr-x - spring supergroup 0 2014-06-09 17:09 /user/spring/fileinfo/.metadata -rw-r--r-- 3 spring supergroup 13824695 2014-06-09 17:10 /user/spring/fileinfo/6876f250-010a-404a-b8c8-0ce1ee759206.avro
The .metadata
directory contains dataset information including the Avro schema:
$ hdfs dfs -cat /user/spring/fileinfo/.metadata/schema.avsc { "type" : "record", "name" : "FileInfo", "namespace" : "org.springframework.samples.hadoop.dataset", "fields" : [ { "name" : "name", "type" : "string" }, { "name" : "path", "type" : [ "null", "string" ], "default" : null }, { "name" : "size", "type" : "long" }, { "name" : "modified", "type" : "long" } ] }
To read datasets to Hadoop we use the DatasetTemplate
class.
To create a DatasetTemplate
add the following bean definition to your configuration class:
@Bean public DatasetOperations datasetOperations() { DatasetTemplate datasetOperations = new DatasetTemplate(); datasetOperations.setDatasetRepositoryFactory(datasetRepositoryFactory()); return datasetOperations; }
Next, have your class use the DatasetTemplate
:
private DatasetOperations datasetOperations; @Autowired public void setDatasetOperations(DatasetOperations datasetOperations) { this.datasetOperations = datasetOperations; }
Now we can read and count the entries using a RecordCallback
callback interface that gets called once per
retrieved record:
final AtomicLong count = new AtomicLong(); datasetOperations.read(FileInfo.class, new RecordCallback<FileInfo>() { @Override public void doInRecord(FileInfo record) { count.getAndIncrement(); } }); System.out.println("File count: " + count.get());
To create datasets that are partitioned on one or more data fields we use the PartitionStrategy.Builder
class that the Kite SDK project provides.
DatasetDefinition definition = new DatasetDefinition(); definition.setPartitionStrategy(new PartitionStrategy.Builder().year("modified").build());
This option lets you specify one or more paths that will be used to partition the files that the data is written to based on the content of the data. You can use any of the FieldPartitioner
s that are available for the Kite SDK project. We simply use what is specified to create the corresponding partition strategy. The following partitioning functions are available:
year, month, day, hour, minute creates partitions based on the value of a timestamp and creates directories named like "YEAR=2014" (works well with fields of datatype long)
specify function plus field name like:
year("timestamp")
optionally, specify a partition name to replace the default one:
year("timestamp", "YY")
dateformat creates partitions based on a timestamp and a dateformat expression provided - creates directories based on the name provided (works well with fields of datatype long)
specify function plus field name, a name for the partition and the date format like:
dateFormat("timestamp", "Y-M", "yyyyMM")
range creates partitions based on a field value and the upper bounds for each bucket that is specified (works well with fields of datatype int and string)
specify function plus field name and the upper bounds for each partition bucket like:
range("age", 20, 50, 80, Integer.MAX_VALUE)
identity creates partitions based on the exact value of a field (works well with fields of datatype string, long and int)
specify function plus field name, a name for the partition, the type of the field (String or Integer) and the number of values/buckets for the partition like:
identity("region", "R", String.class, 10)
hash creates partitions based on the hash calculated from the value of a field divided into a number of buckets that is specified (works well with all data types)
specify function plus field name and number of buckets like:
hash("lastname", 10)
Multiple expressions can be specified by simply chaining them like:
identity("region", "R", String.class, 10).year("timestamp").month("timestamp")