One of the common tasks when using Hadoop is interacting with its runtime - whether it is a local setup or a remote cluster, one needs to properly configure and bootstrap Hadoop in order to submit the required jobs. This chapter will focus on how Spring for Apache Hadoop (SHDP) leverages Spring's lightweight IoC container to simplify the interaction with Hadoop and make deployment, testing and provisioning easier and more manageable.
To simplify configuration, SHDP provides a dedicated namespace for most of its components. However, one can opt to configure the beans
directly through the usual <bean>
definition. For more information about XML Schema-based configuration in Spring, see
this appendix in the
Spring Framework reference documentation.
To use the SHDP namespace, one just needs to import it inside the configuration:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd"> <bean id ... > <hdp:configuration ...> </beans>
Spring for Apache Hadoop namespace prefix. Any name can do but through out the reference documentation, the | |
The namespace URI. | |
The namespace URI location. Note that even though the location points to an external address (which exists and is valid), Spring will resolve the schema locally as it is included in the Spring for Apache Hadoop library. | |
Declaration example for the Hadoop namespace. Notice the prefix usage. |
Once declared, the namespace elements can be declared simply by appending the aforementioned prefix. Note that is possible to change the default namespace,
for example from <beans>
to <hdp>
. This is useful for configuration composed mainly of Hadoop components as
it avoids declaring the prefix. To achieve this, simply swap the namespace prefix declaration above:
<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns="http://www.springframework.org/schema/hadoop" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/gemfire/spring-hadoop.xsd"> <beans:bean id ... > <configuration ...> </beans:beans>
The default namespace declaration for this XML file points to the Spring for Apache Hadoop namespace. | |
The beans namespace prefix declaration. | |
Bean declaration using the | |
Bean declaration using the |
For the remainder of this doc, to improve readability, the XML examples will simply refer to the <hdp>
namespace
without the namespace declaration, where possible.
In order to use Hadoop, one needs to first configure it namely by creating a Configuration
object. The configuration holds information about the job tracker, the input, output format and the various
other parameters of the map reduce job.
In its simplest form, the configuration definition is a one liner:
<hdp:configuration />
The declaration above defines a Configuration
bean (to be precise a factory bean of type ConfigurationFactoryBean
) named, by default,
hadoopConfiguration
. The default name is used, by conventions, by the other elements that require a configuration - this leads to simple and very concise configurations as the
main components can automatically wire themselves up without requiring any specific configuration.
For scenarios where the defaults need to be tweaked, one can pass in additional configuration files:
<hdp:configuration resources="classpath:/custom-site.xml, classpath:/hq-site.xml">
In this example, two additional Hadoop configuration resources are added to the configuration.
Note | |
---|---|
Note that the configuration makes use of Spring's |
In addition to referencing configuration resources, one can tweak Hadoop settings directly through Java Properties
.
This can be quite handy when just a few options need to be changed:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd"> <hdp:configuration> fs.default.name=hdfs://localhost:9000 hadoop.tmp.dir=/tmp/hadoop electric=sea </hdp:configuration> </beans>
One can further customize the settings by avoiding the so called hard-coded values by externalizing them so they can be replaced at runtime, based on the existing environment without touching the configuration:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd"> <hdp:configuration> fs.default.name=${hd.fs} hadoop.tmp.dir=file://${java.io.tmpdir} hangar=${number:18} </hdp:configuration> <context:property-placeholder location="classpath:hadoop.properties" /> </beans>
Through Spring's property placeholder support, SpEL and the environment
abstraction (available in Spring 3.1). one can externalize environment specific properties from the main code base easing the deployment across multiple machines. In the example above, the default file system is
replaced based on the properties available in hadoop.properties
while the temp dir is determined dynamically through SpEL
. Both approaches offer a lot
of flexbility in adapting to the running environment - in fact we use this approach extensivly in the Spring for Apache Hadoop test suite to cope with the differences between the different development boxes
and the CI server.
Additionally, external Properties
files can be loaded, Properties
beans (typically declared through Spring's
util
namespace).
Along with the nested properties declaration, this allows customized configurations to be easily declared:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd"> <!-- merge the local properties, the props bean and the two properties files --> <hdp:configuration properties-ref="props" properties-location="cfg-1.properties, cfg-2.properties"> star=chasing captain=eo </hdp:configuration> <util:properties id="props" location="props.properties"/> </beans>
When merging several properties, ones defined locally win. In the example above the configuration properties are the primary source, followed by the props
bean followed by the external
properties file based on their defined order. While it's not typical for a configuration to refer to use so many properties, the example showcases the various options available.
Note | |
---|---|
For more properties utilities, including using the System as a source or fallback, or control over the merging order, consider using Spring's
PropertiesFactoryBean (which is what Spring for Apache Hadoop and
util:properties use underneath). |
It is possible to create configuration based on existing ones - this allows one to create dedicated configurations, slightly different from the main ones, usable for certain jobs
(such as streaming - more on that below). Simply use the configuration-ref
attribute to refer to the parent configuration - all its properties will be inherited and
overridden as specified by the child:
<!-- default name is 'hadoopConfiguration' --> <hdp:configuration> fs.default.name=${hd.fs} hadoop.tmp.dir=file://${java.io.tmpdir} </hdp:configuration> <hdp:configuration id="custom" configuration-ref="hadoopConfiguration"> fs.default.name=${custom.hd.fs} </hdp:configuration> ...
Make sure though you specify a different name since otherwise, since both definitions will have the same name, the Spring container will interpret this as being the same definition (and will usually consider the last one found).
Another option worth mentioning is register-url-handler
which, as the name implies, automatically registers an URL handler in the running VM. This allows urls referrencing
hdfs resource (by using the hdfs
prefix) to be properly resolved - if the handler is not registered, such an URL would through an exception since the VM does not know what
hdfs
mean.
Note | |
---|---|
Since only one URL handler can be registered per VM, at most once, this option is turned off by default. Due to the reasons mentioned before, once enabled if it fails, it will log the error but will not
throw an exception. If your |
Last but not least a reminder that one can mix and match all these options to her preference. In general, consider externalizing configuration since it allows easier updates without interfering with the application configuration. When dealing with multiple, similar configuration use configuration composition as it tends to keep the definitions concise, in sync and easy to update.
Once the Hadoop configuration is taken care of, one needs to actually submit some work to it. SHDP makes it easy to configure and run Hadoop jobs whether they are vanilla map-reduce type or streaming. Let us start with an example:
<hdp:job id="mr-job" input-path="/input/" output-path="/ouput/" mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper" reducer="org.apache.hadoop.examples.WordCount.IntSumReducer"/>
The declaration above creates a typical Hadoop Job
: specifies its input and output, the mapper and the reducer classes. Notice that there is no reference to the Hadoop configuration above - that's
because, if not specified, the default naming convention (hadoopConfiguration
) will be used instead. Neither are the key or value types - these two are automatically determined through a best-effort attempt
by analyzing the class information of the mapper and the reducer. Of course, these settings can be overridden: the former through the configuration-ref
element, the latter through key
and
value
attributes.
There are plenty of options available not shown in the example (for simplicity) such as the jar (specified directly or by class), sort or group comparator, the combiner, the partitioner, the codecs to use or the input/output format just to name a few -
they are supported, just take a look at the SHDP schema (Appendix A, Spring for Apache Hadoop Schema) or simply trigger auto-completion (usually ALT+SPACE
) in your IDE; if it supports XML namespaces and is properly configured it will display the
available elements. Additionally one can extend the default Hadoop configuration object and add any special properties not available in the namespace or its backing bean
(JobFactoryBean
).
It is worth pointing out that per-job specific configurations are supported by specifying the custom properties directly or referring to them (more information on the pattern is available here):
<hdp:job id="mr-job" input-path="/input/" output-path="/ouput/" mapper="mapper class" reducer="reducer class" jar-by-class="class used for jar detection" properties-location="classpath:special-job.properties"> electric=sea </hdp:job>
Note | |
---|---|
The job definition can validate the existance of the input and output paths before submitting the actual job (which is slow), to prevent its failure. Take a look at validate-paths attribute avoid these errors early on without having to touch the job tracker only to get an exception. |
job
provides additional properties, such as the generic options, however one that is worth mentioning is jar
which
allows a job (and its dependencies) to be loaded form entirely from a specified jar. This is useful for isolating jobs and avoiding classpath and versioning collisions. Note that provisioning of the jar
into the cluster, still depends on the target environment - see the aforementioned section for more info (such as libs
).
Hadoop Streaming job (or in short streaming), is a popular feature of Hadoop as they allow the creation of Map/Reduce jobs
with any executable or script (the equivalent of using the previous counting words example is to use cat
and
wc
commands).
While it is rather easy to start up streaming from the command line, doing so programatically, such as from a Java environment, can be challenging due to the various number of parameters (and their ordering)
that need to be parsed. SHDP simplifies such as tasks - it's as easy and straight-forward as declaring a job
from the previous section; in fact most of the attributes will be the same:
<hdp:streaming id="streaming" input-path="/input/" output-path="/ouput/" mapper="${path.cat}" reducer="${path.wc}"/>
Existing users might be wondering how can they pass the command line arguments (such as -D
or -cmdenv
). These former customize the Hadoop configuration
(which has been convered in the previous section) while the latter are supported through the cmd-env
element:
<hdp:streaming id="streaming-env" input-path="/input/" output-path="/ouput/" mapper="${path.cat}" reducer="${path.wc}"> <hdp:cmd-env> EXAMPLE_DIR=/home/example/dictionaries/ ... </hdp:cmd-env> </hdp:streaming>
Just like job
, streaming
supports the generic options; follow the link for more information.
The jobs, after being created and configured, need to be submitted for execution to a Hadoop cluster. For non-trivial cases, a coordinating, workflow solution such as Spring Batch is recommended . However for basic job submission SHDP provides JobRunner
class which submits several jobs sequentially (and waits by default for their
completion):
<bean id="runner" class="org.springframework.data.hadoop.mapreduce.JobRunner" p:jobs-ref="job"/> <hdp:job id="job" input-path="/input/" output-path="/output/" mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper" reducer="org.apache.hadoop.examples.WordCount.IntSumReducer" />
Multiple jobs can be specified and even nested if they are not used outside the runner:
<bean id="runner" class="org.springframework.data.hadoop.mapreduce.JobRunner"> <property name="jobs"><list> <!-- reference to another job named 'job' --> <ref bean="streaming-job"/> <!-- nested bean definition --> <hdp:job id="nested-job" .... /> </list></property> </bean> <hdp:job id="job" ... />
For Spring Batch environments, SHDP provides a dedicated tasklet to execute Hadoop jobs as a step in a Spring Batch workflow. An example declaration is shown below:
<hdp:tasklet id="hadoop-tasklet" job-ref="mr-job" wait-for-job="true" />
The tasklet above references a Hadoop job definition named "mr-job". By default, wait-for-job is true so that the tasklet will wait for the job to complete when it executes. Setting wait-for-job to false will submit the job to the Hadoop cluster but not wait for it to complete.
It is common for Hadoop utilities and libraries to be started from the command-line (ex: hadoop jar
some.jar). SHDP offers generic support for such cases provided
that the packages in question are built on top of Hadoop standard infrastructure, namely Tool
and ToolRunner
classes. As oppose to the command-line usage,
Tool
instances benefit from Spring's IoC features; they can be parameterized, created and destroyed on demand and have their properties (such as the Hadoop configuration) injected.
Consider the typical jar
example - invoking a class with some (two in this case) arguments (notice that the Hadoop configuration properties are passed as well):
bin/hadoop jar -conf hadoop-site.xml -jt darwin:50020 -D property=value someJar.jar org.foo.SomeTool data/in.txt data/out.txt
Since SHDP has first-class support for configuring Hadoop, the so called generic options
aren't needed any more, even more so since typically there is only one Hadoop configuration
per application. Through tool-runner
element (and its backing ToolRunner
class) one typically just needs to specify the Tool
implementation and its arguments:
<hdp:tool-runner id="someTool" tool-class="org.foo.SomeTool" configuration-ref="hadoopConfiguration"> <hdp:arg value="data/in.txt"/> <hdp:arg value="data/out.txt"/> property=value </hdp:tool-runner>
The previous example assumes the Tool
dependencies (such as its class) are available in the classpath. If that is not the case, tool-runner
allows a jar
to be specified:
<hdp:tool-runner ... jar="myTool.jar"> ... </hdp:tool-runner>
The jar is used to instantiate and start the tool - in fact all its dependencies are loaded from the jar meaning they no longer need to be part of the classpath. This mechanism provides proper
isolation between tools as each of them might depend on certain libraries with different versions; rather then adding them all into the same app (which might be impossible due to versioning conflicts
), one can simply point to the different jars and be on her way. Note that when using a jar, if the main class (as specified by the
Main-Class entry) is the target Tool
, one can skip specifying the tool as it will
picked up automatically.
Like the rest of the SHDP elements, tool-runner
allows the passed Hadoop configuration (by default hadoopConfiguration
but specified in the example for clarity) to be
customized accordingly; the snippet only highlights the property initialization for simplicity but more options are available. Since usually the Tool
implementation has a default argument, one can use the tool-class
attribute however it is possible to refer to another Tool
instance or declare a nested one:
<hdp:tool-runner id="someTool" run-at-startup="true"> <bean class="org.foo.AnotherTool" p:input="data/in.txt" p:output="data/out.txt"/> </hdp:tool-runner>
This is quite convenient if the Tool
class provides setters or richer constructors. Note that by default the tool-runner
does not executes the Tool
until
its definition is actually called - this behavior can be changed through the run-at-startup
attribute above.
tool
is a nice way for migrating series or shell invocations or scripts into fully wired, managed Java objects. Consider the following shell script:
hadoop jar job1.jar -files fullpath:props.properties -Dconfig=config.properties ... hadoop jar job2.jar arg1 arg2... ... hadoop jar job10.jar ...
Each job is fully contained in the specified jar, including all the dependencies (which might conflict with the ones from other jobs). Additionally each invocation might provide some generic options or arguments but for the most part all will share the same configuration (as they will execute against the same cluster).
The script can be fully ported to SHDP, through the tool
element:
<hdp:tool id="job1" tool-class="job1.Tool" jar="job1.jar" files="fullpath:props.properties" properties-location="config.properties"/> <hdp:tool id="job2" jar="job2.jar"> <hdp:arg value="arg1"/> <hdp:arg value="arg2"/> </hdp:tool> <hdp:tool id="job3" jar="job3.jar"/> ...
All the features have been explained in the previous sections but let us review what happens here.
As mentioned before, each tool gets autowired with the hadoopConfiguration
; job1
goes beyond this and uses its own properties instead.
For the first jar, the Tool
class is specified, however the rest assume the jar main classes implement the
Tool
interface; the namespace will discover them automatically and use it accordingly.
When needed (such as with job1
), additional files or libs are provisioned in the cluster. Same thing with the job arguments.
However more things that go beyond scripting, can be applied to this configuration - each job can have multiple properties loaded or declared inlined - not just from the local file system, but also
from the classpath or any url for that matter. In fact, the whole configuration can be externalized and parameterized (through Spring's
property placeholder and/or
Environment abstraction).
Moreover, each job can be ran by itself (through the JobRunner
) or as part of a workflow - either through Spring's
depends-on
or the much more powerful Spring Batch and tool-tasklet
.
For Spring Batch environments, SHDP provides a dedicated tasklet to execute Hadoop tasks as a step in a Spring Batch workflow. The tasklet element supports the same configuration options as
tool-runner except for run-at-startup
(which does not apply for a workflow):
<hdp:tool-tasklet id="tool-tasklet" tool-ref="some-tool" />
The job
, streaming
and tool
all support a subset of generic options,
specifically archives
, files
and libs
. libs
is probably the most useful as it enriches a job classpath (typically with some jars) - however the
other two allow resources or archives to be copied through-out the cluster for the job to consume. Whenver faced with provisioning issues, revisit these options as they can help up significantly.
Note that the fs
, jt
or conf
options are not supported - these are designed for command-line usage, for bootstrapping the application.
This is no longer the case, as the SHDP offers first-class support for defining and customizing Hadoop configurations.
DistributedCache is a Hadoop facility for distributing application-specific, large, read-only files (
text, archives, jars and so on) efficiently. Applications specify the files to be cached via urls (hdfs://
) using DistributedCache
and the framework will copy the necessary
files to the slave nodes before any tasks for the job are executed on that node. Its efficiency stems from the fact that the files are only copied once per job and the ability to cache archives which are
un-archived on the slaves.
Note that DistributedCache
assumes that the files to be cached (and specified via hdfs:// urls) are already present on the Hadoop FileSystem
.
SHDP provides first-class configuration for the distributed cache through its cache
element (backed by DistributedCacheFactoryBean
class),
allowing files and archives to be easily distributed across nodes:
<hdp:cache create-symlink="true"> <hdp:classpath value="/cp/some-library.jar#library.jar" /> <hdp:cache value="/cache/some-archive.tgz#main-archive" /> <hdp:cache value="/cache/some-resource.res" /> <hdp:local value="some-file.txt" /> </hdp:cache>
The definition above registers several resources with the cache (adding them to the job cache or classpath) and creates symlinks for them. As described in the DistributedCache
documentation, the declaration format is (absolute-path#link-name
).
The link name is determined by the URI fragment (the text following the # such as #library.jar or #main-archive above) - if no name is specified,
the cache bean will infer one based on the resource file name. Note that one does not have to specify the hdfs://node:port
prefix as these are automatically determined based on the
configuration wired into the bean; this prevents environment settings from being hard-coded into the configuration which becomes portable.
Additionally based on the resource extension, the definition differentiates between archives (.tgz
, .tar.gz
, .zip
and .tar
) which will be uncompressed, and regular
files that are copied as-is. As with the rest of the namespace declarations, the definition above relies on defaults - since it requires a Hadoop Configuration
and FileSystem
objects and none are specified (through configuration-ref
and file-system-ref
) it falls back to the default naming and is wired with the bean named
hadoopConfiguration, creating the FileSystem
automatically.