5. Writing and reading data using the Hadoop File System

The Store sub-project of Spring for Apache Hadoop provides abstractions for writing and reading various types of data residing in HDFS. We currently support different file types either via our own store accessors or by using the Dataset support in Kite SDK.

Currently, the Store sub-project doesn’t have an XML namespace or javaconfig based configuration classes as it's considered to be a foundational library. However, this may change in future releases.

5.1 Store Abstraction

Native store abstractions provide various writer and reader interfaces so that the end user don't have to worry about the underlying implementation actually doing the work on files in HDFS. Implementations are usually strongly typed and provides constructors and setters for additional setup to work with naming, compression codecs and everything else defining the behaviour. Interfaces are meant to be used from integration components which don’t need to know the internal workings of writers and readers.

5.1.1 Writing Data

Main interface writing into a store is a DataWriter which have one method write which simply writes an entity and the backing implementation will handle the rest.

public interface DataWriter<T> {
  void write(T entity) throws IOException;
}

The DataStoreWriter interface adds methods to close and flush a writer. Some of the writers have a property to close a stream after an idle time has been reached but generally this interface is meant for programmatic control of these operations.

public interface DataStoreWriter<T> extends DataWriter<T>, Flushable, Closeable {
}

File Naming

Different file naming strategies are used to automatically determine the name of a file to be used. Writers without additional naming configuration will usually use a given base path as is. As soon as any type of a strategy is configured, given base path is considered to be a base directory and the name of the file is resolved by file naming strategies.

For example, if defined base path is “/tmp/path” and the StaticFileNamingStrategy with “data” parameter is used then the actual file path resolved would be “/tmp/path/data”.

Path path = new Path("/tmp/path");
Configuration config = new Configuration();
TextFileWriter writer = new TextFileWriter(config, path, null);
StaticFileNamingStrategy fileNamingStrategy = new StaticFileNamingStrategy("data")
writer.setFileNamingStrategy(fileNamingStrategy);

At first look this may feel a little complicated, but it will make sense after more file naming strategies are added. These will also provide facilities for using writers in parallel, or for a re-launched writer to be able to create a new file based on already existing files in the directry. For example, RollingFileNamingStrategy will add a simple increasing value to a file name and will try to initialize itself with the correct position.

Built-in strategies currently supported are StaticFileNamingStrategy, RollingFileNamingStrategy, UuidFileNamingStrategy and CodecFileNamingStrategy. ChainedFileNamingStrategy can be used to chain multiple strategies together where each individual strategy will provide its own part.

File Rollover

File rolling strategy is used to determine a condition in a writer when a current stream should be automatically closed and the next file should be opened. This is usually done together with RollingFileNamingStrategy to rollover when a certain file size limit has been reached.

Currently, only one strategy SizeRolloverStrategy is supported.

Partitioning

Partitioning is a concept of choosing a target file on demand either based on content to be written or any other information available to a writer at the time of the write operation. While it would be perfectly alright to use multiple writers manually, the framework already does all the heavy lifting around partitioning. We work through interfaces and provide a generic default implementation still allowing to plug a customized version if there’s a need for it.

PartitionStrategy is a strategy interface defining PartitionResolver and PartitionKeyResolver.

public interface PartitionStrategy<T,K> {
  PartitionResolver<K> getPartitionResolver();
  PartitionKeyResolver<T, K> getPartitionKeyResolver();
}

PartitionResolver is an interface used to resolve arbitrary partition keys into a path. We don’t force any specific partition key type in the interface level itself but usually the implementation needs to be aware of its type.

public interface PartitionResolver<K> {
  Path resolvePath(K partitionKey);
}

PartitionKeyResolver is an interface which is responsible for creating a partition key from an entity. This is needed because writer interfaces allow us to write entities without an explicit partition key.

public interface PartitionKeyResolver<T, K> {
  K resolvePartitionKey(T entity);
}

PartitionDataStoreWriter is an extension of DataStoreWriter adding a method to write an entity with a partition key. In this context the partition key is something what the partition strategy is able to use.

public interface PartitionDataStoreWriter<T,K> extends DataStoreWriter<T> {
  void write(T entity, K partitionKey) throws IOException;
}
DefaultPartitionStrategy

DefaultPartitionStrategy is a generic default implementation meant to be used together with an expression using Spring's SpEL expression language. PartitionResolver used in DefaultPartitionStrategy expects partition key to be a type of Map<String,Object> and partition key created by PartitionKeyResolver is a DefaultPartitionKey which itself is a Map<String,Object>.

In order to make it easy to work with SpEL and partitioning, map values can be directly accessed with keys and additional partitioning methods has been registered.

Partition Path Expression

SpEL expression is evaluated against a partition key passed into a HDFS writer.

Accessing Properties

If partition key is a type of Map any property given to a SpEL expression is automatically resolved from a map.

Custom Methods

In addition to normal SpEL functionality, a few custom methods have been added to make it easier to build partition paths. These custom methods can be used to work with normal partition concepts like date formatting, lists, ranges and hashes.

path
path(String... paths)

You can concatenate paths together with a / delimiter. This method can be used to make the expression less verbose than using a native SpEL functionality to combine path parts together. To create a path part1/part2, expression 'part1' + '/' + 'part2' is equivalent to path('part1','part2').

Parameters

paths.  Any number of path parts

Return Value

Concatenated value of paths delimited with /.

dateFormat
dateFormat(String pattern)
dateFormat(String pattern, Long epoch)
dateFormat(String pattern, Date date)
dateFormat(String pattern, String datestring)
dateFormat(String pattern, String datestring, String dateformat)

Creates a path using date formatting. Internally this method delegates to SimpleDateFormat and needs a Date and a pattern.

Method signature with three parameters can be used to create a custom Date object which is then passed to SimpleDateFormat conversion using a dateformat pattern. This is useful in use cases where partition should be based on a date or time string found from a payload content itself. Default dateformat pattern if omitted is yyyy-MM-dd.

Parameters

pattern.  Pattern compatible with SimpleDateFormat to produce a final output.

epoch.  Timestamp as Long which is converted into a Date.

date.  A Date to be formatted.

dateformat.  Secondary pattern to convert datestring into a Date.

datestring.  Date as a String

Return Value

A path part representation which can be a simple file or directory name or a directory structure.

list
list(Object source, List<List<Object>> lists)

Creates a partition path part by matching a source against a lists denoted by lists.

Lets assume that data is being written and it’s possible to extract an appid from the content. We can automatically do a list based partition by using a partition method list(appid,{{'1TO3','APP1','APP2','APP3'},{'4TO6','APP4','APP5','APP6'}}). This method would create three partitions, 1TO3_list, 4TO6_list and list. The latter is used if no match is found from partition lists passed to lists.

Parameters

source.  An Object to be matched against lists.

lists.  A definition of list of lists.

Return Value

A path part prefixed with a matched key i.e. XXX_list or list if no match.

range
range(Object source, List<Object> list)

Creates a partition path part by matching a source against a list denoted by list using a simple binary search.

The partition method takes source as first argument and a list as the second argument. Behind the scenes this is using the JVM’s binarySearch which works on an Object level so we can pass in anything. Remember that meaningful range match only works if passed in Object and types in list are of same type like Integer. Range is defined by a binarySearch itself so mostly it is to match against an upper bound except the last range in a list. Having a list of {1000,3000,5000} means that everything above 3000 will be matched with 5000. If that is an issue then simply adding Integer.MAX_VALUE as last range would overflow everything above 5000 into a new partition. Created partitions would then be 1000_range, 3000_range and 5000_range.

Parameters

source.  An Object to be matched against list.

list.  A definition of list.

Return Value

A path part prefixed with a matched key i.e. XXX_range.

hash
hash(Object source, int bucketcount)

Creates a partition path part by calculating hashkey using source`s hashCode and bucketcount. Using a partition method hash(timestamp,2) would then create partitions named 0_hash, 1_hash and 2_hash. Number suffixed with _hash is simply calculated using Object.hashCode() % bucketcount.

Parameters

source.  An Object which hashCode will be used.

bucketcount.  A number of buckets

Return Value

A path part prefixed with a hash key i.e. XXX_hash.

Creating a Custom Partition Strategy

Creating a custom partition strategy is as easy as just implementing needed interfaces. Custom strategy may be needed in use cases where it is just not feasible to use SpEL expressions. This will then give total flexibility to implement partitioning as needed.

Below sample demonstrates how a simple customer id could be used as a base for partitioning.

public class CustomerPartitionStrategy implements PartitionStrategy<String, String> {

  CustomerPartitionResolver partitionResolver = new CustomerPartitionResolver();
  CustomerPartitionKeyResolver keyResolver = new CustomerPartitionKeyResolver();

  @Override
  public PartitionResolver<String> getPartitionResolver() {
    return partitionResolver;
  }

  @Override
  public PartitionKeyResolver<String, String> getPartitionKeyResolver() {
    return keyResolver;
  }
}

public class CustomerPartitionResolver implements PartitionResolver<String> {

  @Override
  public Path resolvePath(String partitionKey) {
    return new Path(partitionKey);
  }
}

public class CustomerPartitionKeyResolver implements PartitionKeyResolver<String, String> {

  @Override
  public String resolvePartitionKey(String entity) {
    if (entity.startsWith("customer1")) {
      return "customer1";
    } else if (entity.startsWith("customer2")) {
      return "customer2";
    } else if (entity.startsWith("customer3")) {
      return "customer3";
    }
    return null;
  }
}

Writer Implementations

We provide a number of writer implementations to be used based on the type of file to write.

  • TextFileWriter an implementation meant to write a simple text data where entities are separated by a delimiter. Simple example for this is a text file with line terminations.

  • DelimitedTextFileWriter an extension atop of TextFileWriter where written entity itself is also delimited. Simple example for this is a csv file.

  • TextSequenceFileWriter a similar implementation to TextFileWriter except that backing file is a Hadoop's SequenceFile.

  • PartitionTextFileWriter wraps multiple TextFileWriters providing automatic partitioning functionality.

5.1.2 Reading Data

Main interface reading from a store is a DataReader.

public interface DataReader<T> {
  T read() throws IOException;
}

DataStoreReader is an extension of DataReader providing close method for a reader.

public interface DataStoreReader<T> extends DataReader<T>, Closeable {
}

Input Splits

Some of the HDFS storage and file formats can be read using an input splits instead of reading a whole file at once. This is a fundamental concept in Hadoop’s MapReduce to parallelize data processing. Instead of reading a lot of small files, which would be a source of a Hadoop’s “small file problem”, one large file can be used. However one need to remember that not all file formats support input splitting especially when compression is used.

Support for reading input split is denoted via a Split interface which simply mark starting and ending positions.

public interface Split {
  long getStart();
  long getLength();
  long getEnd();
}

Interface Splitter defines an contract how Split’s are calculate from a given path.

public interface Splitter {
  List<Split> getSplits(Path path) throws IOException;
}

We provide few generic Splitter implementations to construct Split’s.

StaticLengthSplitter is used to split input file with a given length.

StaticBlockSplitter is used to split input by used HDFS file block size. It’s also possible to split further down the road within the blocks itself.

SlopBlockSplitter is an extension of StaticBlockSplitter which tries to estimate how much a split can overflow to a next block to taggle unnecessary overhead if last file block is very small compared to an actual split size.

Reader Implementations

We provide a number of reader implementations to be used based on the type of file to read.

  • TextFileReader used to read data written by a TextFileWriter.

  • DelimitedTextFileReader used to read data writte by a DelimitedTextFileWriter.

  • TextSequenceFileReader used to read data written by a TextSequenceFileWriter.

5.1.3 Using Codecs

Supported compression codecs are denoted via an interface CodecInfo which simply defines if codec supports splitting, what is it’s fully qualified java class and what is its default file suffix.

public interface CodecInfo {
  boolean isSplittable();
  String getCodecClass();
  String getDefaultSuffix();
}

Codecs provides an enum for easy access to supported codecs.

  • GZIP - org.apache.hadoop.io.compress.GzipCodec

  • SNAPPY - org.apache.hadoop.io.compress.SnappyCodec

  • BZIP2 - org.apache.hadoop.io.compress.BZip2Codec

  • LZO - com.hadoop.compression.lzo.LzoCodec (non-splittable)

  • SLZO - com.hadoop.compression.lzo.LzoCodec (splittable)

5.2 Persisting POJO datasets using Kite SDK

One common requirement is to persist a large number of POJOs in serialized form using HDFS. The Kite SDK project provides a Kite Data Module that provides an API for working with datasets stored in HDFS. We are using this functionality and provide a some simple helper classes to aid in configuration and use in a Spring environment.

5.2.1 Data Formats

The Kite SDK project provides support for writing data using both the Avro and Parquet data formats. The data format you choose to use influences the data types you can use in your POJO classes. We'll discuss the basics of the Java type mapping for the two data formats but we recommend that you consult each project's documentation for additional details.

[Note]Note

Currently, you can't provide your own schema. This is something that we are considering changing in upcomming releases. We are also planning to provide better mapping support in line with the support we currently provide for NoSQL stores like MongoDB.

Using Avro

When using Avro as the data format the schema generation is based on reflection of thet POJO class used. Primitive data types and their corresponding wrapper classes are mapped to the corresponding Avro data type. More complex types, as well as the POJO itself, are mapped to a record type consisting of one or more fields.

The table below shows the mapping from some common types:

Table 5.1. Some common Java to Avro data types mapping

Java typeAvro typeComment
Stringstring 
int / Integerint32-bit signed integer
long / Longlong64-bit signed integer
float / Floatfloat32-bit floating point
double / Doubledouble64-bit floating point
boolean / Booleanboolean 
byte[]bytesbyte array
java.util.Daterecord 

Using Parquet

When using Parquet as the data format the schema generation is based on reflection of thet POJO class used. The POJO class must be a proper JavaBean and not have any nested types. We only support primitive data types and their corresponding wrapper classes plus byte arrays. We do rely on the Avro-to-Parquet mapping support that the Kite SDK uses, so the schema will be generated by Avro.

[Note]Note

The Parquet support we currently povide is considered experimental. We are planning to relax a lot of the restrictions on the POJO class in upcoming releases.

The table below shows the mapping from some common types:

Table 5.2. Some common Java to Parquet data types mapping

Java typeParquet typeComment
StringBINARY/UTF8 
int / IntegerINT3232-bit signed integer
long / LongINT6464-bit signed integer
float / FloatFLOAT32-bit floating point
double / DoubleDOUBLE64-bit floating point
boolean / BooleanBOOLEAN 
byte[]BINARY/BYTE_ARRAYbyte array

5.2.2 Configuring the dataset support

In order to use the dataset support you need to configure the following classes:

  • DatasetRepositoryFactory that needs a org.apache.hadoop.conf.Configuration so we know how to connect to HDFS and a base path where the data will be written.

  • DatasetDefinition that defines the dataset you are writing. Configuration options include the POJO class that is being stored, the type of format to use (Avro or Parquet). You can also specify whether to allow null values for all fields (default is false) and an optional partition strategy to use for the dataset (see below for partitioning).

The following example shows a simple configuration class:

@Configuration
@ImportResource("hadoop-context.xml")
public class DatasetConfig {

  private @Autowired org.apache.hadoop.conf.Configuration hadoopConfiguration;

  @Bean
  public DatasetRepositoryFactory datasetRepositoryFactory() {
    DatasetRepositoryFactory datasetRepositoryFactory = new DatasetRepositoryFactory();
    datasetRepositoryFactory.setConf(hadoopConfiguration);
    datasetRepositoryFactory.setBasePath("/user/spring");
    return datasetRepositoryFactory;
  }

  @Bean
  public DatasetDefinition fileInfoDatasetDefinition() {
    DatasetDefinition definition = new DatasetDefinition();
    definition.setFormat(Formats.AVRO.getName());
    definition.setTargetClass(FileInfo.class);
    definition.setAllowNullValues(false);
    return definition;
  }
}

5.2.3 Writing datasets

To write datasets to Hadoop you should use either the AvroPojoDatasetStoreWriter or the ParquetDatasetStoreWriter depending on the data format you want to use.

[Tip]Tip

To mark your fields as nullable use the @Nullable annotation (org.apache.avro.reflect.Nullable). This will result in the schema defining your field as a union of null and your datatype.

We are using a FileInfo POJO that we have defined to hold some information based on the files we read from our local file system. The dataset will be stored in a directory that is the name of the class using lowercase, so in this case it would be fileinfo. This directory is placed inside the basePath specified in the configuration of the DatasetRepositoryFactory.:

package org.springframework.samples.hadoop.dataset;

import org.apache.avro.reflect.Nullable;

public class FileInfo {
  private String name;
  private @Nullable String path;
  private long size;
  private long modified;

  public FileInfo(String name, String path, long size, long modified) {
    this.name = name;
    this.path = path;
    this.size = size;
    this.modified = modified;
  }

  public FileInfo() {
  }

  public String getName() {
      return name;
  }

  public String getPath() {
      return path;
  }

  public long getSize() {
      return size;
  }

  public long getModified() {
      return modified;
  }
}

To create a writer add the following bean definition to your configuration class:

  @Bean
  public DataStoreWriter<FileInfo> dataStoreWriter() {
    return new AvroPojoDatasetStoreWriter<FileInfo>(FileInfo.class, 
        datasetRepositoryFactory(), fileInfoDatasetDefinition());
  }

Next, have your class use the writer bean:

    private DataStoreWriter<FileInfo> writer;

    @Autowired
    public void setDataStoreWriter(DataStoreWriter dataStoreWriter) {
        this.writer = dataStoreWriter;
    }

Now we can use the writer, it will be opened automatically once we start writing to it:

      FileInfo fileInfo = new FileInfo(file.getName(), 
          file.getParent(), (int)file.length(), file.lastModified());
      writer.write(fileInfo);

Once we are done writing we should close the writer:

    try {
      writer.close();
    } catch (IOException e) {
      throw new StoreException("Error closing FileInfo", e);
    }

We should now have dataset containing all the FileInfo entries in a /user/spring/demo/fileinfo directory:

$ hdfs dfs -ls /user/spring/*
Found 2 items
drwxr-xr-x   - spring supergroup          0 2014-06-09 17:09 /user/spring/fileinfo/.metadata
-rw-r--r--   3 spring supergroup   13824695 2014-06-09 17:10 /user/spring/fileinfo/6876f250-010a-404a-b8c8-0ce1ee759206.avro

The .metadata directory contains dataset information including the Avro schema:

$ hdfs dfs -cat /user/spring/fileinfo/.metadata/schema.avsc
{
  "type" : "record",
  "name" : "FileInfo",
  "namespace" : "org.springframework.samples.hadoop.dataset",
  "fields" : [ {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "path",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "size",
    "type" : "long"
  }, {
    "name" : "modified",
    "type" : "long"
  } ]
} 

5.2.4 Reading datasets

To read datasets to Hadoop we use the DatasetTemplate class.

To create a DatasetTemplate add the following bean definition to your configuration class:

 @Bean
  public DatasetOperations datasetOperations() {
    DatasetTemplate datasetOperations = new DatasetTemplate();
    datasetOperations.setDatasetRepositoryFactory(datasetRepositoryFactory());
    return datasetOperations;
  }

Next, have your class use the DatasetTemplate:

  private DatasetOperations datasetOperations;

  @Autowired
  public void setDatasetOperations(DatasetOperations datasetOperations) {
      this.datasetOperations = datasetOperations;
  }

Now we can read and count the entries using a RecordCallback callback interface that gets called once per retrieved record:

        final AtomicLong count = new AtomicLong();
        datasetOperations.read(FileInfo.class, new RecordCallback<FileInfo>() {
            @Override
            public void doInRecord(FileInfo record) {
                count.getAndIncrement();
            }
        });
        System.out.println("File count: " + count.get());

5.2.5 Partitioning datasets

To create datasets that are partitioned on one or more data fields we use the PartitionStrategy.Builder class that the Kite SDK project provides.

DatasetDefinition definition = new DatasetDefinition();
definition.setPartitionStrategy(new PartitionStrategy.Builder().year("modified").build());

This option lets you specify one or more paths that will be used to partition the files that the data is written to based on the content of the data. You can use any of the FieldPartitioners that are available for the Kite SDK project. We simply use what is specified to create the corresponding partition strategy. The following partitioning functions are available:

  • year, month, day, hour, minute creates partitions based on the value of a timestamp and creates directories named like "YEAR=2014" (works well with fields of datatype long)

    • specify function plus field name like:

      year("timestamp")
    • optionally, specify a partition name to replace the default one:

      year("timestamp", "YY")
  • dateformat creates partitions based on a timestamp and a dateformat expression provided - creates directories based on the name provided (works well with fields of datatype long)

    • specify function plus field name, a name for the partition and the date format like:

      dateFormat("timestamp", "Y-M", "yyyyMM")
  • range creates partitions based on a field value and the upper bounds for each bucket that is specified (works well with fields of datatype int and string)

    • specify function plus field name and the upper bounds for each partition bucket like:

      range("age", 20, 50, 80, Integer.MAX_VALUE)
  • identity creates partitions based on the exact value of a field (works well with fields of datatype string, long and int)

    • specify function plus field name, a name for the partition, the type of the field (String or Integer) and the number of values/buckets for the partition like:

      identity("region", "R", String.class, 10)
  • hash creates partitions based on the hash calculated from the value of a field divided into a number of buckets that is specified (works well with all data types)

    • specify function plus field name and number of buckets like:

      hash("lastname", 10)

Multiple expressions can be specified by simply chaining them like:

identity("region", "R", String.class, 10).year("timestamp").month("timestamp")