A common task in Hadoop is interacting with its file system, whether for
provisioning, adding new files to be processed, parsing results, or
performing cleanup. Hadoop offers several ways to achieve that: one can
use its Java API (namely
FileSystem
)
or use the hadoop
command line, in particular the file system
shell.
However there is no middle ground, one either has to use the (somewhat
verbose, full of checked exceptions) API or fall back to the command
line, outside the application. SHDP addresses this issue by bridging the
two worlds, exposing both the FileSystem
and the fs shell through an
intuitive, easy-to-use Java API. Add your favorite
JVM scripting
language right inside your Spring for Apache Hadoop application and you
have a powerful combination.
The Hadoop file-system, HDFS, can be accessed in various ways - this
section will cover the most popular protocols for interacting with HDFS
and their pros and cons. SHDP does not enforce any specific protocol to
be used - in fact, as described in this section any FileSystem
implementation can be used, allowing even other implementations than
HDFS to be used.
The table below describes the common HDFS APIs in use:
Table 4.1. HDFS APIs
File System | Comm. Method | Scheme / Prefix | Read / Write | Cross Version |
---|---|---|---|---|
HDFS | RPC |
| Read / Write | Same HDFS version only |
HFTP | HTTP |
| Read only | Version independent |
WebHDFS | HTTP (REST) |
| Read / Write | Version independent |
This chapter focuses on the core file-system protocols supported by
Hadoop. S3 (see the Appendix), FTP and the
rest of the other FileSystem
implementations are supported as well -
Spring for Apache Hadoop has no dependency on the underlying system
rather just on the public Hadoop API.
hdfs://
protocol should be familiar to most readers - most docs (and
in fact the previous chapter as well) mention it. It works out of the
box and it’s fairly efficient. However because it is RPC based, it
requires both the client and the Hadoop cluster to share the same
version. Upgrading one without the other causes serialization errors
meaning the client cannot interact with the cluster. As an alternative
one can use hftp://
which is HTTP-based or its more secure brother
hsftp://
(based on SSL) which gives you a version independent protocol
meaning you can use it to interact with clusters with an unknown or
different version than that of the client. hftp
is read only (write
operations will fail right away) and it is typically used with disctp
for reading data. webhdfs://
is one of the additions in Hadoop 1.0 and
is a mixture between hdfs
and hftp
protocol - it provides a
version-independent, read-write, REST-based protocol which means that
you can read and write to/from Hadoop clusters no matter their version.
Furthermore, since webhdfs://
is backed by a REST API, clients in
other languages can use it with minimal effort.
Note | |
---|---|
Not all file systems work out of the box. For example WebHDFS needs to
be enabled first in the cluster (through |
Once the scheme has been decided upon, one can specify it through the standard Hadoop #hadoop:config[configuration], either through the Hadoop configuration files or its properties:
<hdp:configuration> fs.defaultFS=webhdfs://localhost ... </hdp:configuration>
This instructs Hadoop (and automatically SHDP) what the default, implied file-system is. In SHDP, one can create additional file-systems (potentially to connect to other clusters) and specify a different scheme:
<!-- manually creates the default SHDP file-system named 'hadoopFs' --> <hdp:file-system uri="webhdfs://localhost"/> <!-- creates a different FileSystem instance --> <hdp:file-system id="old-cluster" uri="hftp://old-cluster/"/>
As with the rest of the components, the file systems can be injected where needed - such as file shell or inside scripts (see the next section).
In Spring the ResourceLoader interface is meant to be implemented by objects that can return (i.e. load) Resource instances.
public interface ResourceLoader { Resource getResource(String location); }
All application contexts implement the ResourceLoader interface, and therefore all application contexts may be used to obtain Resource instances.
When you call getResource()
on a specific application context, and the
location path specified doesn’t have a specific prefix, you will get
back a Resource
type that is appropriate to that particular
application context. For example, assume the following snippet of code
was executed against a ClassPathXmlApplicationContext instance:
Resource template = ctx.getResource("some/resource/path/myTemplate.txt");
What would be returned would be a ClassPathResource
; if the same
method was executed against a FileSystemXmlApplicationContext instance,
you’d get back a FileSystemResource. For a WebApplicationContext, you’d
get back a ServletContextResource, and so on.
As such, you can load resources in a fashion appropriate to the particular application context.
On the other hand, you may also force ClassPathResource to be used,
regardless of the application context type, by specifying the special
classpath:
prefix:
Resource template = ctx.getResource("classpath:some/resource/path/myTemplate.txt");
Note | |
---|---|
More information about the generic usage of resource loading, check the Spring Framework Documentation. |
Spring Hadoop
is adding its own functionality into generic concept of
resource loading. Resource abstraction in Spring has always been a way
to ease resource access in terms of not having a need to know where
there resource is and how it’s accessed. This abstraction also goes
beyond a single resource by allowing to use patterns to access multiple
resources.
Lets first see how HdfsResourceLoader is used manually.
<hdp:file-system /> <hdp:resource-loader id="loader" file-system-ref="hadoopFs" /> <hdp:resource-loader id="loaderWithUser" user="myuser" uri="hdfs://localhost:8020" />
In above configuration we created two beans, one with reference to
existing Hadoop FileSystem bean
and one with impersonated user.
// get path '/tmp/file.txt' Resource resource = loader.getResource("/tmp/file.txt"); // get path '/tmp/file.txt' with user impersonation Resource resource = loaderWithUser.getResource("/tmp/file.txt"); // get path '/user/<current user>/file.txt' Resource resource = loader.getResource("file.txt"); // get path '/user/myuser/file.txt' Resource resource = loaderWithUser.getResource("file.txt"); // get all paths under '/tmp/' Resource[] resources = loader.getResources("/tmp/*"); // get all paths under '/tmp/' recursively Resource[] resources = loader.getResources("/tmp/**/*"); // get all paths under '/tmp/' using more complex ant path matching Resource[] resources = loader.getResources("/tmp/?ile?.txt");
What would be returned in above examples would be instances of HdfsResources.
If there is a need for Spring Application Context to be aware of
HdfsResourceLoader it needs to be registered using
hdp:resource-loader-registrar
namespace tag.
<hdp:file-system /> <hdp:resource-loader file-system-ref="hadoopFs" handle-noprefix="false" /> <hdp:resource-loader-registrar />
Note | |
---|---|
On default the HdfsResourceLoader will handle all resource paths without
prefix. Attribute |
// get 'default.txt' from current user's home directory Resource[] resources = context.getResources("hdfs:default.txt"); // get all files from hdfs root Resource[] resources = context.getResources("hdfs:/*"); // let context handle classpath prefix Resource[] resources = context.getResources("classpath:cfg*properties");
What would be returned in above examples would be instances of HdfsResources and ClassPathResource for the last one. If requesting resource paths without existing prefix, this example would fall back into Spring Application Context. It may be advisable to let HdfsResourceLoader to handle paths without prefix if your application doesn’t rely on loading resources from underlying context without prefixes.
Table 4.2. hdp:resource-loader
attributes
Name | Values | Description |
---|---|---|
| Bean Reference | Reference to existing Hadoop FileSystem bean |
| Boolean(defaults to true) | Indicates whether to use (or not) the codecs found inside the Hadoop configuration when accessing the resource input stream. |
| String | The security user (ugi) to use for impersonation at runtime. |
| String | The underlying HDFS system URI. |
| Boolean(defaults to true) | Indicates if loader should handle resource paths without prefix. |
Table 4.3. hdp:resource-loader-registrar
attributes
Name | Values | Description |
---|---|---|
| Bean Reference | Reference to existing Hdfs resource loader bean. Default value is 'hadoopResourceLoader'. |
SHDP scripting supports any
JSR-223 (also known as
javax.scripting
) compliant scripting engine. Simply add the engine jar
to the classpath and the application should be able to find it. Most
languages (such as Groovy or JRuby) provide JSR-233 support out of the
box; for those that do not see the
scripting project that provides
various adapters.
Since Hadoop is written in Java, accessing its APIs in a native way provides maximum control and flexibility over the interaction with Hadoop. This holds true for working with its file systems; in fact all the other tools that one might use are built upon these. The main entry point is the org.apache.hadoop.fs.FileSystem abstract class which provides the foundation of most (if not all) of the actual file system implementations out there. Whether one is using a local, remote or distributed store through the FileSystem API she can query and manipulate the available resources or create new ones. To do so however, one needs to write Java code, compile the classes and configure them which is somewhat cumbersome especially when performing simple, straightforward operations (like copy a file or delete a directory).
JVM scripting languages (such as Groovy, JRuby, Jython or Rhino to name just a few) provide a nice solution to the Java language; they run on the JVM, can interact with the Java code with no or few changes or restrictions and have a nicer, simpler, less ceremonial syntax; that is, there is no need to define a class or a method - simply write the code that you want to execute and you are done. SHDP combines the two, taking care of the configuration and the infrastructure so one can interact with the Hadoop environment from her language of choice.
Let us take a look at a JavaScript example using Rhino (which is part of JDK 6 or higher, meaning one does not need any extra libraries):
<beans xmlns="http://www.springframework.org/schema/beans" ...> <hdp:configuration .../> <hdp:script id="inlined-js" language="javascript" run-at-startup="true"> try {load("nashorn:mozilla_compat.js");} catch (e) {} // for Java 8 importPackage(java.util); name = UUID.randomUUID().toString() scriptName = "src/test/resources/test.properties" // - FileSystem instance based on 'hadoopConfiguration' bean // call FileSystem#copyFromLocal(Path, Path) .copyFromLocalFile(scriptName, name) // return the file length .getLength(name) </hdp:script> </beans>
The script
element, part of the SHDP namespace, builds on top of the
scripting support in Spring permitting script declarations to be
evaluated and declared as normal bean definitions. Furthermore it
automatically exposes Hadoop-specific objects, based on the existing
configuration, to the script such as the FileSystem
(more on that in
the next section). As one can see, the script is fairly obvious: it
generates a random name (using the UUID class from java.util
package)
and then copies a local file into HDFS under the random name. The last
line returns the length of the copied file which becomes the value of
the declaring bean (in this case inlined-js
) - note that this might
vary based on the scripting engine used.
Note | |
---|---|
The attentive reader might have noticed that the arguments passed to the
FileSystem object are not of type Path but rather String. To avoid the
creation of Path object, SHDP uses a wrapper class |
Note that for inlined scripts, one can use Spring’s property placeholder configurer to automatically expand variables at runtime. Using one of the examples seen before:
<beans ... > <context:property-placeholder location="classpath:hadoop.properties" /> <hdp:script language="javascript" run-at-startup="true"> ... tracker= ... </hdp:script> </beans>
Notice how the script above relies on the property placeholder to expand
${hd.fs}
with the values from hadoop.properties
file available in
the classpath.
As you might have noticed, the script
element defines a runner for JVM
scripts. And just like the rest of the SHDP runners, it allows one or
multiple pre
and post
actions to be specified to be executed before
and after each run. Typically other runners (such as other jobs or
scripts) can be specified but any JDK Callable
can be passed in. Do
note that the runner will not run unless triggered manually or if
run-at-startup
is set to true
. For more information on runners, see
the dedicated chapter.
Inlined scripting is quite handy for doing simple operations and coupled with the property expansion is quite a powerful tool that can handle a variety of use cases. However when more logic is required or the script is affected by XML formatting, encoding or syntax restrictions (such as Jython/Python for which white-spaces are important) one should consider externalization. That is, rather than declaring the script directly inside the XML, one can declare it in its own file. And speaking of Python, consider the variation of the previous example:
<hdp:script location="org/company/basic-script.py" run-at-startup="true"/>
The definition does not bring any surprises but do notice there is no
need to specify the language (as in the case of a inlined declaration)
since script extension (py
) already provides that information. Just
for completeness, the basic-script.py
looks as follows:
from java.util import UUID from org.apache.hadoop.fs import Path print "Home dir is " + str(fs.homeDirectory) print "Work dir is " + str(fs.workingDirectory) print "/user exists " + str(fs.exists("/user")) name = UUID.randomUUID().toString() scriptName = "src/test/resources/test.properties" fs.copyFromLocalFile(scriptName, name) print Path(name).makeQualified(fs)
To ease the interaction of the script with its enclosing context, SHDP binds by default the so-called implicit variables. These are:
Table 4.4. Implicit variables
Name | Type | Description |
---|---|---|
cfg | Hadoop Configuration (relies on hadoopConfiguration bean or singleton type match) | |
cl | ClassLoader used for executing the script | |
ctx | Enclosing application context | |
ctxRL | Enclosing application context ResourceLoader | |
distcp | Programmatic access to DistCp | |
fs | Hadoop File System (relies on 'hadoop-fs' bean or singleton type match, falls back to creating one based on 'cfg') | |
fsh | File System shell, exposing hadoop 'fs' commands as an API | |
hdfsRL | Hdfs resource loader (relies on 'hadoop-resource-loader' or singleton type match, falls back to creating one automatically based on 'cfg') |
Note | |
---|---|
If no Hadoop Configuration can be detected (either by name hadoopConfiguration or by type), several log warnings will be made and none of the Hadoop-based variables (namely cfg , distcp , fs , fsh , distcp or hdfsRL) will be bound. |
As mentioned in the Description column, the variables are first looked
(either by name or by type) in the application context and, in case they
are missing, created on the spot based on the existing configuration.
Note that it is possible to override or add new variables to the scripts
through the property
sub-element that can set values or references to
other beans:
<hdp:script location="org/company/basic-script.js" run-at-startup="true"> <hdp:property name="foo" value="bar"/> <hdp:property name="ref" ref="some-bean"/> </hdp:script>
The script
namespace provides various options to adjust its behaviour
depending on the script content. By default the script is simply
declared - that is, no execution occurs. One however can change that so
that the script gets evaluated at startup (as all the examples in this
section do) through the run-at-startup
flag (which is by default
false
) or when invoked manually (through the Callable). Similarily, by
default the script gets evaluated on each run. However for scripts that
are expensive and return the same value every time one has various
caching options, so the evaluation occurs only when needed through the
evaluate
attribute:
Table 4.5. script
attributes
Name | Values | Description |
---|---|---|
|
| Wether the script is executed at startup or not |
|
| Wether to
actually evaluate the script when invoked or used a previous value.
|
For Spring Batch environments, SHDP provides a dedicated tasklet to execute scripts.
<script-tasklet id="script-tasklet"> <script language="groovy"> inputPath = "/user/gutenberg/input/word/" outputPath = "/user/gutenberg/output/word/" if (fsh.test(inputPath)) { fsh.rmr(inputPath) } if (fsh.test(outputPath)) { fsh.rmr(outputPath) } inputFile = "src/main/resources/data/nietzsche-chapter-1.txt" fsh.put(inputFile, inputPath) </script> </script-tasklet>
The tasklet above embedds the script as a nested element. You can also declare a reference to another script definition, using the script-ref attribute which allows you to externalize the scripting code to an external resource.
<script-tasklet id="script-tasklet" script-ref="clean-up"/> <hdp:script id="clean-up" location="org/company/myapp/clean-up-wordcount.groovy"/>
A handy utility provided by the Hadoop distribution is the file system
shell
which allows UNIX-like commands to be executed against HDFS. One can
check for the existence of files, delete, move, copy directories or
files or set up permissions. However the utility is only available from
the command-line which makes it hard to use from/inside a Java
application. To address this problem, SHDP provides a lightweight, fully
embeddable shell, called FsShell
which mimics most of the commands
available from the command line: rather than dealing with System.in
or
System.out
, one deals with objects.
Let us take a look at using FsShell
by building on the previous
scripting examples:
<hdp:script location="org/company/basic-script.groovy" run-at-startup="true"/>
name = UUID.randomUUID().toString() scriptName = "src/test/resources/test.properties" fs.copyFromLocalFile(scriptName, name) // use the shell made available under variable dir = "script-dir" if (!fsh.test(dir)) { fsh.mkdir(dir); fsh.cp(name, dir); fsh.chmodr(700, dir) println "File content is " + fsh.cat(dir + name).toString() } println fsh.ls(dir).toString() fsh.rmr(dir)
As mentioned in the previous section, a FsShell
instance is
automatically created and configured for scripts, under the name fsh.
Notice how the entire block relies on the usual commands: test
,
mkdir
, cp
and so on. Their semantics are exactly the same as in the
command-line version however one has access to a native Java API that
returns actual objects (rather than String`s) making it easy to use
them programmatically whether in Java or another language. Furthermore,
the class offers enhanced methods (such as `chmodr
which stands for
recursive chmod
) and multiple overloaded methods taking advantage of
varargs
so that multiple parameters can be specified. Consult the
API
for more information.
To be as close as possible to the command-line shell, FsShell
mimics
even the messages being displayed. Take a look at line 9 which prints
the result of fsh.cat()
. The method returns a Collection
of Hadoop
Path
objects (which one can use programatically). However when
invoking toString
on the collection, the same printout as from the
command-line shell is being displayed:
File content is
The same goes for the rest of the methods, such as ls
. The same script
in JRuby would look something like this:
require 'java' name = java.util.UUID.randomUUID().to_s scriptName = "src/test/resources/test.properties" $fs.copyFromLocalFile(scriptName, name) # use the shell dir = "script-dir/" ... print $fsh.ls(dir).to_s
which prints out something like this:
drwx------ - user supergroup 0 2012-01-26 14:08 /user/user/script-dir -rw-r--r-- 3 user supergroup 344 2012-01-26 14:08 /user/user/script-dir/520cf2f6-a0b6-427e-a232-2d5426c2bc4e
As you can see, not only can you reuse the existing tools and commands with Hadoop inside SHDP, but you can also code against them in various scripting languages. And as you might have noticed, there is no special configuration required - this is automatically inferred from the enclosing application context.
Note | |
---|---|
The careful reader might have noticed that besides the syntax, there are some minor differences in how the various languages interact with the java objects. For example the automatic toString call called in Java for doing automatic String conversion is not necessarily supported (hence the to_s in Ruby or str in Python). This is to be expected as each language has its own semantics - for the most part these are easy to pick up but do pay attention to details. |
Similar to the FsShell
, SHDP provides a lightweight, fully embeddable
DistCp
version that builds on top of the distcp
from the Hadoop distro. The
semantics and configuration options are the same however, one can use it
from within a Java application without having to use the command-line.
See the
API
for more information:
<hdp:script language="groovy">distcp.copy("${distcp.src}", "${distcp.dst}")</hdp:script>
The bean above triggers a distributed copy relying again on Spring’s property placeholder variable expansion for its source and destination.