3. Working with the Hadoop File System

A common task in Hadoop is interacting with its file system, whether for provisioning, adding new files to be processed, parsing results, or performing cleanup. Hadoop offers several ways to achieve that: one can use its Java API (namely FileSystem) or use the hadoop command line, in particular the file system shell. However there is no middle ground, one either has to use the (somewhat verbose, full of checked exceptions) API or fall back to the command line, outside the application. SHDP addresses this issue by bridging the two worlds, exposing both the FileSystem and the fs shell through an intuitive, easy-to-use Java API. Add your favorite JVM scripting language right inside your Spring for Apache Hadoop application and you have a powerful combination.

3.1 Configuring the file-system

The Hadoop file-system, HDFS, can be accessed in various ways - this section will cover the most popular protocols for interacting with HDFS and their pros and cons. SHDP does not enforce any specific protocol to be used - in fact, as described in this section any FileSystem implementation can be used, allowing even other implementations then HDFS to be used.

The table below describes the common HDFS APIs in use:

Table 3.1. HDFS APIs

File SystemComm. MethodScheme / PrefixRead / WriteCross Version
HDFSRPChdfs://Read / WriteSame HDFS version only
HFTPHTTPhftp://Read onlyVersion independent
WebHDFSHTTP (REST)webhdfs://Read / WriteVersion independent

hdfs:// protocol should be familiar to most reader - most docs (and in fact the previous chapter as well) mention it. It works out of the box and it's fairly efficient however because it is RPC based, it requires both the client and the Hadoop cluster to share the same version. Upgrading one without the other causes serialization errors meaning the client cannot interact with the cluster. As an alternative one can use hftp:// which is HTTP-based or its more secure brother hsftp:// (based on SSL) which gives you a version independent protocol meaning you can use it to interact with clusters with an unknown or different version then that of the client. hftp is read only (write operations will fail right away) and it is typically used with disctp for reading data. webhdfs:// is one of the additions in Hadoop 1.0 and is a mixture between hdfs and hftp protocol - it provides a version-independent, read-write, REST-based protocol which means that you can read and write to/from Hadoop clusters no matter their version. Further more, since webhdfs:// is backed by a REST APIs, clients in other languages can use it with minimal effort.

[Note]Note

Not all file-system work out of the box. For example WebHDFS needs to be enabled first in the cluster (through dfs.webhdfs.enabled property) see this document for more information) while the secure hftp, hsftp requires the SSL configuration (such as certificates) to be specified. More about this (and how to use hftp/hsftp for proxying) in this page.

Once the schema has been decided upon, one can specify it through the standard Hadoop configuration, either through the Hadoop configuration files and its properties:

<hdp:configuration>
  fs.default.name=webhdfs://localhost
  ...
</hdp:configuration>

This instructs Hadoop (and automatically SHDP) what the default, implied file-system is. In SHDP, one can create additional file-systems (potentially to connect to other clusters) and specify a different schema:

<!-- manually creates the default SHDP file-system named 'hadoopFs' -->
<hdp:file-system uri="webhdfs://localhost"/>
 
<!-- create a different FileSystem instance --> 
<hdp:file-system id="old-cluster" uri="hftp://old-cluster/"/>

As with the rest of the components, the file systems can be injected where needed - such as file shell or inside scripts (see the next section).

3.2 Scripting the Hadoop API

Since Hadoop is written in Java, accessing its APIs in a native way provides maximum control and flexibility over the interaction with Hadoop. This holds true for working with its files system; in fact all the other tools that one might use are built upon these. The main entry point is the org.apache.hadoop.fs.FileSystem abstract class which provides the foundation of most (if not all) of the actual file system implementations out there. Whether one is using a local, remote or distributed store through the FileSystem API she can query and manipulate the available resources or create new ones. To do so however, one needs to write Java code, compile the classes and configure them which is somewhat cumbersome especially when performing simple, straight-forward operations (like copy a file or delete a directory).

JVM scripting languages (such as Groovy, JRuby, Jython or Rhino to name just a few) provide a nice solution to the Java language; they run on the JVM, can interact with the Java code with no or few changes or restrictions and have a nicer, simpler, less ceremonial syntax; that is, there is no need to define a class or a method - simply write the code that you want to execute and you are done. SHDP combines the two, taking care of the configuration and the infrastructure so one can interact with the Hadoop environment from her language of choice

Let us take a look of a JavaScript example using Rhino (which is part of JDK 6 or higher, meaning one does not need any extra libraries):

<beans xmlns="http://www.springframework.org/schema/beans" ...>		
  <hdp:configuration .../>
		
  <hdp:script id="inlined-js" language="javascript" run-at-startup="true">
    importPackage(java.util);

    name = UUID.randomUUID().toString()
    scriptName = "src/test/resources/test.properties"
    // fs - FileSystem instance based on 'hadoopConfiguration' bean
    // call FileSystem#copyFromLocal(Path, Path)  
    fs.copyFromLocalFile(scriptName, name)
    // return the file length 
    fs.getLength(name)
  </hdp:script>
	 
</beans>

The script element, part of the SHDP namespace, builds on top of the scripting support in Spring permitting script declarations to be evaluated and declared as normal bean definitions. Further more it automatically exposes Hadoop-specific objects, based on the existing configuration, to the script such as the FileSystem (more on that in the next section). As one can see, the script is fairly obvious: it generates a random name (using the UUID class from java.util package) and the copies a local file into HDFS under the random name. The last line returns the length of the copied file which becomes the value of the declaring bean (in this case inlined-js) - note that this might vary based on the scripting engine used.

[Note]Note
The attentive reader might have noticed that the arguments passed to the FileSystem object are not of type Path but rather String. To avoid the creation of Path object, SHDP uses a wrapper class (SimplerFileSystem) which automatically does the conversion so you don't have to. For more information see the implicit variables section.

Note that for inlined scripts, one can use Spring's property placeholder configurer to automatically expand variables at runtime. Using one of the examples before:

<beans ...>
  <context:property-placeholder location="classpath:hadoop.properties" />
   
  <hdp:script language="javascript" run-at-startup="true">
    ...
    tracker=${hd.fs}
    ...
  </hdp:script>
</beans>

Notice how the script above relies on the property placeholder to expand ${hd.fs} with the values from hadoop.properties file available in the classpath.

As you might have noticed, the script element defines a runner for JVM scripts. And just like the rest of the SHDP runners, it allows one or multiple pre and post actions to be specified to be executed before and after each run. Typically other runners (such as other jobs or scripts) can be specified but any JDK Callable can be passed in. Do note that the runner will not run unless triggered manually or if run-at-startup is set to true. For more information on runners, see the dedicated chapter.

3.2.1 Using scripts

Inlined scripting is quite handy for doing simple operations and couple with the property expansion is quite a powerful tool that can handle a variety of use cases. However when more logic is required or the script is affected by XML formatting, encoding or syntax restrictions (such as Jython/Python for which white-spaces are important) one should consider externalization. That is rather then declaring the script directly inside the XML, one can declare it in its own file. And speaking of Python, consider the variation of the previous example:

<hdp:script location="org/company/basic-script.py" run-at-startup="true"/>

The definition does not bring any surprises but do notice there is no need to specify the language (as in the case of a inlined declaration) since script extension (py) already provides that information. Just for completeness, the basic-script.py looks as follows:

from java.util import UUID
from org.apache.hadoop.fs import Path

print "Home dir is " + str(fs.homeDirectory)
print "Work dir is " + str(fs.workingDirectory)
print "/user exists " + str(fs.exists("/user"))

name = UUID.randomUUID().toString()
scriptName = "src/test/resources/test.properties"
fs.copyFromLocalFile(scriptName, name)
print Path(name).makeQualified(fs)

3.3 Scripting implicit variables

To ease the interaction of the script with its enclosing context, SHDP binds by default the so-called implicit variables. These are:

Table 3.2. Implicit variables

NameTypeDescription
cfgorg.apache.hadoop.conf.ConfigurationHadoop Configuration (relies on hadoopConfiguration bean or singleton type match)
cljava.lang.ClassLoaderClassLoader used for executing the script
ctxorg.springframework.context.ApplicationContextEnclosing application context
ctxRLorg.springframework.io.support.ResourcePatternResolverEnclosing application context ResourceLoader
distcporg.springframework.data.hadoop.fs.DistributedCopyUtilProgrammatic access to DistCp
fsorg.apache.hadoop.fs.FileSystemHadoop File System (relies on 'hadoop-fs' bean or singleton type match, falls back to creating one based on 'cfg')
fshorg.springframework.data.hadoop.fs.FsShellFile System shell, exposing hadoop 'fs' commands as an API
hdfsRLorg.springframework.data.hadoop.io.HdfsResourceLoaderHdfs resource loader (relies on 'hadoop-resource-loader' or singleton type match, falls back to creating one automatically based on 'cfg')

[Note]Note
If no Hadoop Configuration can be detected (either by name hadoopConfiguration or type), several log warnings will be made and none of the Hadoop-based variables namely cfg, distcp, fs, fsh, distcp or hdfsRL bound.

As mentioned in the Description column, the variables are first looked (either by name or by type) in the application context and, in case they are missing, created on the spot based on the existing configuration. Note that it is possible to override or add new variables to the scripts through the property sub-element that can set values or references to other beans:

<hdp:script location="org/company/basic-script.js" run-at-startup="true">
   <hdp:property name="foo" value="bar"/>
   <hdp:property name="ref" ref="some-bean"/>
</hdp:script>

3.3.1 Running scripts

The script namespace provides various options to adjust its behaviour depending on the script content. By default the script is simply declared - that is, no execution occurs. One however can change that so that the script gets evaluated at startup (as all the examples in this section do) through the run-at-startup flag (which is by default false) or when invoked manually (through the Callable) Similarily, by default the script gets evaluated on each run. However for scripts that are expensive and return the same value every time one has various caching options, so the evaluation occurs only when needed through the evaluate attribute:

Table 3.3. script attributes

NameValuesDescription
run-at-startupfalse(default), trueWether the script is executed at startup or not
evaluateALWAYS(default), IF_MODIFIED, ONCEWether to actually evaluate the script when invoked or used a previous value. ALWAYS means evaluate every time, IF_MODIFIED evaluate if the backing resource (such as a file) has been modified in the meantime and ONCE only one.

3.3.2 Using the Scripting tasklet

For Spring Batch environments, SHDP provides a dedicated tasklet to execute scripts.

<script-tasklet id="script-tasklet">
  <script language="groovy">
    inputPath = "/user/gutenberg/input/word/"
    outputPath = "/user/gutenberg/output/word/"
    if (fsh.test(inputPath)) {
      fsh.rmr(inputPath)
    }
    if (fsh.test(outputPath)) {
      fsh.rmr(outputPath)
    }
    inputFile = "src/main/resources/data/nietzsche-chapter-1.txt"
    fsh.put(inputFile, inputPath)
  </script>
</script-tasklet>

The tasklet above embedds the script as a nested element. You can also declare a reference to another script definition, using the script-ref attribute which allows you to externalize the scripting code to an external resource.

<script-tasklet id="script-tasklet" script-ref="clean-up"/>
	<hdp:script id="clean-up" location="org/company/myapp/clean-up-wordcount.groovy"/>

3.4 File System Shell (FsShell)

A handy utility provided by the Hadoop distribution is the file system shell which allows UNIX-like commands to be executed against HDFS. One can check for the existance of files, delete, move, copy directories or files or setting up permissions. However the utility is only available from the command-line which makes it hard to use it from/inside a Java application. To address this problem, SHDP provides a lightweight, fully embeddable shell, called FsShell which mimics most of the commands available from the command line: rather then dealing with the System.in or System.out, one deals with objects.

Let us take a look of using FsShell by building on the previous scripting examples:

<hdp:script location="org/company/basic-script.groovy" run-at-startup="true"/>
name = UUID.randomUUID().toString()
scriptName = "src/test/resources/test.properties"
fs.copyFromLocalFile(scriptName, name)

// use the shell (made available under variable fsh 
dir = "script-dir"
if (!fsh.test(dir)) {
   fsh.mkdir(dir); fsh.cp(name, dir); fsh.chmodr(700, dir)
   println "File content is " + fsh.cat(dir + name).toString()
}
println fsh.ls(dir).toString()
fsh.rmr(dir)

As mentioned in the previous section, a FsShell instance is automatically created and for configured for scripts, under the name fsh. Notice how the entire block relies on the usual commands: test, mkdir, cp and so on. Their semantics are exactly the same as in the command-line version however one has access to a native Java API that returns actual objects (rather then Strings) making it easy to use them programmatically whether in Java or another language. Further more, the class offers enhanced methods (such as chmodr which stands for recursive chmod) and multiple overloaded methods taking advantage of varargs so that multiple parameters can be specified. Consult the API for more information.

To be as close as possible to the command-line shell, FsShell mimics even the messages being displayed. Take a look at line 9 which prints the result of fsh.cat(). The method returns a Collection of Hadoop Path objects (which one can use programatically). However when invoking toString on the collection, the same printout as from the command-line shell is being displayed:

File content is some text

The same goes for the rest of the methods, such as ls. The same script in JRuby would look something like this:

require 'java'
name = java.util.UUID.randomUUID().to_s
scriptName = "src/test/resources/test.properties"
$fs.copyFromLocalFile(scriptName, name)

# use the shell
dir = "script-dir/"
...
print $fsh.ls(dir).to_s

which prints out something like this:

drwx------   - user     supergroup          0 2012-01-26 14:08 /user/user/script-dir
-rw-r--r--   3 user     supergroup        344 2012-01-26 14:08 /user/user/script-dir/520cf2f6-a0b6-427e-a232-2d5426c2bc4e

As you can see, not only you can reuse the existing tools and commands with Hadoop inside SHDP, but you can also code against them in various scripting languages. And as you might have noticed, there is no special configuration required - this is automatically inferred from the enclosing application context.

[Note]Note
The careful reader might have noticed that besides the syntax, there are some minor differences in how the various langauges interact with the java objects. For example the automatic toString call called in Java for doing automatic String conversion is not necessarily supported (hence the to_s in Ruby or str in Python). This is to be expected as each language has its own semantics - for the most part these are easy to pick up but do pay attention to details.

3.4.1 DistCp API

Similar to the FsShell, SHDP provides a lightweight, fully embeddable DistCp version that builds on top of the distcp from the Hadoop distro. The semantics are configuration options are the same however, one can use it from within an Java application without having to use the command-line. See the API for more information:

<hdp:script language="groovy">distcp.copy("${distcp.src}", "${distcp.dst}")</hdp:script>

The bean above triggers a distributed copy relying again on Spring's property placeholder variable expansion for its source and destination.