Once the Hadoop configuration is taken care of, one needs to actually submit some work to it. SHDP makes it easy to configure and run Hadoop jobs whether they are vanilla map-reduce type or streaming. Let us start with an example:
<hdp:job id="mr-job" input-path="/input/" output-path="/ouput/" mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper" reducer="org.apache.hadoop.examples.WordCount.IntSumReducer"/>
The declaration above creates a typical Hadoop Job
: specifies its
input and output, the mapper and the reducer classes. Notice that there
is no reference to the Hadoop configuration above - that’s because, if
not specified, the default naming convention (hadoopConfiguration
)
will be used instead. Neither is there to the key or value types - these
two are automatically determined through a best-effort attempt by
analyzing the class information of the mapper and the reducer. Of
course, these settings can be overridden: the former through the
configuration-ref
element, the latter through key
and value
attributes. There are plenty of options available not shown in the
example (for simplicity) such as the jar (specified directly or by
class), sort or group comparator, the combiner, the partitioner, the
codecs to use or the input/output format just to name a few - they are
supported, just take a look at the SHDP schema (?) or simply trigger
auto-completion (usually CTRL+SPACE
) in your IDE; if it supports XML
namespaces and is properly configured it will display the available
elements. Additionally one can extend the default Hadoop configuration
object and add any special properties not available in the namespace or
its backing bean (JobFactoryBean).
It is worth pointing out that per-job specific configurations are supported by specifying the custom properties directly or referring to them (more information on the pattern is available #hadoop:config:properties[here]):
<hdp:job id="mr-job" input-path="/input/" output-path="/ouput/" mapper="mapper class" reducer="reducer class" jar-by-class="class used for jar detection" properties-location="classpath:special-job.properties"> electric=sea </hdp:job>
<hdp:job>
provides additional properties, such as the
#hadoop:generic-options[generic options], however one that is worth
mentioning is jar
which allows a job (and its dependencies) to be
loaded entirely from a specified jar. This is useful for isolating jobs
and avoiding classpath and versioning collisions. Note that provisioning
of the jar into the cluster still depends on the target environment -
see the aforementioned #hadoop:generic-options[section] for more info
(such as libs
).
Hadoop Streaming job (or in short streaming),
is a popular feature of Hadoop as it allows
the creation of Map/Reduce jobs with any executable or script (the
equivalent of using the previous counting words example is to use
cat
and
wc
commands). While it is
rather easy to start up streaming from the command line, doing so
programatically, such as from a Java environment, can be challenging due
to the various number of parameters (and their ordering) that need to be
parsed. SHDP simplifies such a task - it’s as easy and straightforward
as declaring a job
from the previous section; in fact most of the
attributes will be the same:
<hdp:streaming id="streaming" input-path="/input/" output-path="/ouput/" mapper="${path.cat}" reducer="${path.wc}"/>
Existing users might be wondering how they can pass the command line
arguments (such as -D
or -cmdenv
). While the former customize the
Hadoop configuration (which has been convered in the previous
#hadoop:config[section]), the latter are supported through the cmd-env
element:
<hdp:streaming id="streaming-env" input-path="/input/" output-path="/ouput/" mapper="${path.cat}" reducer="${path.wc}"> <hdp:cmd-env> EXAMPLE_DIR=/home/example/dictionaries/ ... </hdp:cmd-env> </hdp:streaming>
Just like job
, streaming
supports the
#hadoop:generic-options[generic options]; follow the link for more
information.
The jobs, after being created and configured, need to be submitted for
execution to a Hadoop cluster. For non-trivial cases, a coordinating,
workflow solution such as Spring Batch is recommended . However for
basic job submission SHDP provides the job-runner
element (backed by
JobRunner class) which submits several jobs sequentially (and waits by
default for their completion):
<hdp:job-runner id="myjob-runner" pre-action="cleanup-script" post-action="export-results" job-ref="myjob" run-at-startup="true"/> <hdp:job id="myjob" input-path="/input/" output-path="/output/" mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper" reducer="org.apache.hadoop.examples.WordCount.IntSumReducer" />
Multiple jobs can be specified and even nested if they are not used outside the runner:
<hdp:job-runner id="myjobs-runner" pre-action="cleanup-script" job-ref="myjob1, myjob2" run-at-startup="true"/> <hdp:job id="myjob1" ... /> <hdp:streaming id="myjob2" ... />
One or multiple Map-Reduce jobs can be specified through the job
attribute in the order of the execution. The runner will trigger the
execution during the application start-up (notice the run-at-startup
flag which is by default false
). Do note that the runner will not run
unless triggered manually or if run-at-startup
is set to true
.
Additionally the runner (as in fact do all
runners in
SHDP) allows one or multiple pre
and post
actions to be specified to
be executed before and after each run. Typically other runners (such as
other jobs or scripts) can be specified but any JDK Callable
can be
passed in. For more information on runners, see the
dedicated chapter.
Note | |
---|---|
As the Hadoop job submission and execution (when wait-for-completion is true) is blocking, JobRunner uses a JDK Executor to start (or stop) a job. The default implementation, SyncTaskExecutor uses the calling thread to execute the job, mimicking the hadoop command line behaviour. However, as the hadoop jobs are time-consuming, in some cases this can lead to application freeze, preventing normal operations or even application shutdown from occuring properly. Before going into production, it is recommended to double-check whether this strategy is suitable or whether a throttled or pooled implementation is better. One can customize the behaviour through the executor-ref parameter. |
The job runner also allows running jobs to be cancelled (or killed) at
shutdown. This applies only to jobs that the runner waits for
(wait-for-completion
is true
) using a different executor then the
default - that is, using a different thread then the calling one (since
otherwise the calling thread has to wait for the job to finish first
before executing the next task). To customize this behaviour, one should
set the kill-job-at-shutdown
attribute to false
and/or change the
executor-ref
implementation.
For Spring Batch environments, SHDP provides a dedicated tasklet to execute Hadoop jobs as a step in a Spring Batch workflow. An example declaration is shown below:
<hdp:job-tasklet id="hadoop-tasklet" job-ref="mr-job" wait-for-completion="true" />
The tasklet above references a Hadoop job definition named "mr-job". By
default, wait-for-completion
is true so that the tasklet will wait for
the job to complete when it executes. Setting wait-for-completion
to
false
will submit the job to the Hadoop cluster but not wait for it to
complete.
It is common for Hadoop utilities and libraries to be started from the
command-line (ex: hadoop jar
some.jar). SHDP offers generic support
for such cases provided that the packages in question are built on top
of Hadoop standard infrastructure, namely Tool and ToolRunner classes.
As opposed to the command-line usage, Tool instances benefit from
Spring’s IoC features; they can be parameterized, created and destroyed
on demand and have their properties (such as the Hadoop configuration)
injected.
Consider the typical jar
example - invoking a class with some (two in
this case) arguments (notice that the Hadoop configuration properties
are passed as well):
bin/hadoop jar -conf hadoop-site.xml -jt darwin:50020 -Dproperty=value someJar.jar
Since SHDP has first-class support for #hadoop:config[configuring]
Hadoop, the so called generic options
aren’t needed any more, even
more so since typically there is only one Hadoop configuration per
application. Through tool-runner
element (and its backing ToolRunner
class) one typically just needs to specify the Tool
implementation and
its arguments:
<hdp:tool-runner id="someTool" tool-class="org.foo.SomeTool" run-at-startup="true"> <hdp:arg value="data/in.txt"/> <hdp:arg value="data/out.txt"/> property=value </hdp:tool-runner>
Additionally the runner (just like the job runner) allows one or
multiple pre
and post
actions to be specified to be executed before
and after each run. Typically other runners (such as other jobs or
scripts) can be specified but any JDK Callable
can be passed in. Do
note that the runner will not run unless triggered manually or if
run-at-startup
is set to true
. For more information on runners, see
the dedicated chapter.
The previous example assumes the Tool
dependencies (such as its class)
are available in the classpath. If that is not the case, tool-runner
allows a jar to be specified:
<hdp:tool-runner ... jar="myTool.jar"> ... </hdp:tool-runner>
The jar is used to instantiate and start the tool - in fact all its dependencies are loaded from the jar meaning they no longer need to be part of the classpath. This mechanism provides proper isolation between tools as each of them might depend on certain libraries with different versions; rather then adding them all into the same app (which might be impossible due to versioning conflicts), one can simply point to the different jars and be on her way. Note that when using a jar, if the main class (as specified by the Main-Class entry) is the target Tool, one can skip specifying the tool as it will picked up automatically.
Like the rest of the SHDP elements, tool-runner
allows the passed
Hadoop configuration (by default hadoopConfiguration
but specified in
the example for clarity) to be #hadoop:config:properties[customized]
accordingly; the snippet only highlights the property initialization for
simplicity but more options are available. Since usually the Tool
implementation has a default argument, one can use the tool-class
attribute. However it is possible to refer to another Tool
instance or
declare a nested one:
<hdp:tool-runner id="someTool" run-at-startup="true"> <hdp:tool> <bean class="org.foo.AnotherTool" p:input="data/in.txt" p:output="data/out.txt"/> </hdp:tool> </hdp:tool-runner>
This is quite convenient if the Tool
class provides setters or richer
constructors. Note that by default the tool-runner
does not execute
the Tool
until its definition is actually called - this behavior can
be changed through the run-at-startup
attribute above.
tool-runner
is a nice way for migrating series or shell invocations or
scripts into fully wired, managed Java objects. Consider the following
shell script:
hadoop jar job1.jar -files fullpath:props.properties -Dconfig=config.properties ... hadoop jar job2.jar arg1 arg2... ... hadoop jar job10.jar ...
Each job is fully contained in the specified jar, including all the dependencies (which might conflict with the ones from other jobs). Additionally each invocation might provide some generic options or arguments but for the most part all will share the same configuration (as they will execute against the same cluster).
The script can be fully ported to SHDP, through the tool-runner
element:
<hdp:tool-runner id="job1" tool-class="job1.Tool" jar="job1.jar" files="fullpath:props.properties" properties-location="config.properties"/> <hdp:tool-runner id="job2" jar="job2.jar"> <hdp:arg value="arg1"/> <hdp:arg value="arg2"/> </hdp:tool-runner> <hdp:tool-runner id="job3" jar="job3.jar"/> ...
All the features have been explained in the previous sections but let us
review what happens here. As mentioned before, each tool gets autowired
with the hadoopConfiguration
; job1
goes beyond this and uses its own
properties instead. For the first jar, the Tool class is specified,
however the rest assume the jar _Main-Class_es implement the Tool
interface; the namespace will discover them automatically and use them
accordingly. When needed (such as with job1
), additional files or libs
are provisioned in the cluster. Same thing with the job arguments.
However more things that go beyond scripting, can be applied to this
configuration - each job can have multiple properties loaded or declared
inlined - not just from the local file system, but also from the
classpath or any url for that matter. In fact, the whole configuration
can be externalized and parameterized (through Spring’s
property placeholder
and/or
Environment abstraction). Moreover, each job
can be ran by itself (through the JobRunner) or as part of
a workflow - either through Spring’s depends-on
or the much more powerful
Spring Batch and tool-tasklet
.
For Spring Batch environments, SHDP provides a dedicated tasklet to
execute Hadoop tasks as a step in a Spring Batch workflow. The tasklet
element supports the same configuration options as
#hadoop:tool-runner[tool-runner] except for run-at-startup
(which does
not apply for a workflow):
<hdp:tool-tasklet id="tool-tasklet" tool-ref="some-tool" />
SHDP also provides support for executing vanilla Hadoop jars. Thus the famous WordCount example:
bin/hadoop jar hadoop-examples.jar wordcount /wordcount/input /wordcount/output
becomes
<hdp:jar-runner id="wordcount" jar="hadoop-examples.jar" run-at-startup="true"> <hdp:arg value="wordcount"/> <hdp:arg value="/wordcount/input"/> <hdp:arg value="/wordcount/output"/> </hdp:jar-runner>
Note | |
---|---|
Just like the hadoop jar command, by default the jar support reads the jar’s Main-Class if none is specified. This can be customized through the main-class attribute. |
Additionally the runner (just like the job runner) allows one or
multiple pre
and post
actions to be specified to be executed before
and after each run. Typically other runners (such as other jobs or
scripts) can be specified but any JDK Callable
can be passed in. Do
note that the runner will not run unless triggered manually or if
run-at-startup
is set to true
. For more information on runners, see
the dedicated chapter.
The jar support
provides a nice and easy migration path from jar
invocations from the command-line to SHDP (note that Hadoop
#hadoop:generic-options[generic options] are also supported). Especially
since SHDP enables Hadoop Configuration
objects, created during the
jar execution, to automatically inherit the context Hadoop
configuration. In fact, just like other SHDP elements, the jar
element
allows #hadoop:config:properties[configurations properties] to be
declared locally, just for the jar run. So for example, if one would use
the following declaration:
<hdp:jar-runner id="wordcount" jar="hadoop-examples.jar" run-at-startup="true"> <hdp:arg value="wordcount"/> ... speed=fast </hdp:jar-runner>
inside the jar code, one could do the following:
assert "fast".equals(new Configuration().get("speed"));
This enabled basic Hadoop jars to use, without changes, the enclosing application Hadoop configuration.
And while we think it is a useful feature (that is why we added it in the first place), we strongly recommend using the tool support instead or migrate to it; there are several reasons for this mainly because there are no contracts to use, leading to very poor embeddability caused by:
No standard Configuration
injection
While SHDP does a best effort to pass the Hadoop configuration to the
jar, there is no guarantee the jar itself does not use a special
initialization mechanism, ignoring the passed properties. After all, a
vanilla Configuration
is not very useful so applications tend to
provide custom code to address this.
System.exit()
calls
Most jar examples out there (including WordCount
) assume they are
started from the command line and among other things, call
System.exit
, to shut down the JVM, whether the code is succesful or
not. SHDP prevents this from happening (otherwise the entire application
context would shutdown abruptly) but it is a clear sign of poor code
collaboration.
SHDP tries to use sensible defaults to provide the best integration
experience possible but at the end of the day, without any contract in
place, there are no guarantees. Hence using the Tool
interface is a
much better alternative.
Like for the rest of its tasks, for Spring Batch environments, SHDP
provides a dedicated tasklet to execute Hadoop jars as a step in a
Spring Batch workflow. The tasklet element supports the same
configuration options as #hadoop:jar-runner[jar-runner] except for
run-at-startup
(which does not apply for a workflow):
<hdp:jar-tasklet id="jar-tasklet" jar="some-jar.jar" />
DistributedCache
is a Hadoop facility for distributing application-specific, large,
read-only files (text, archives, jars and so on) efficiently.
Applications specify the files to be cached via urls (hdfs://
) using
DistributedCache
and the framework will copy the necessary files to
the slave nodes before any tasks for the job are executed on that node.
Its efficiency stems from the fact that the files are only copied once
per job and the ability to cache archives which are un-archived on the
slaves. Note that DistributedCache
assumes that the files to be cached
(and specified via hdfs:// urls) are already present on the Hadoop
FileSystem
.
SHDP provides first-class configuration for the distributed cache
through its cache
element (backed by DistributedCacheFactoryBean
class), allowing files and archives to be easily distributed across
nodes:
<hdp:cache create-symlink="true"> <hdp:classpath value="/cp/some-library.jar#library.jar" /> <hdp:cache value="/cache/some-archive.tgz#main-archive" /> <hdp:cache value="/cache/some-resource.res" /> <hdp:local value="some-file.txt" /> </hdp:cache>
The definition above registers several resources with the cache (adding
them to the job cache or classpath) and creates symlinks for them. As
described in the DistributedCache
documentation
the declaration format is (absolute-path#link-name
). The link name is
determined by the URI fragment (the text following the # such as
#library.jar or #main-archive above) - if no name is specified, the
cache bean will infer one based on the resource file name. Note that one
does not have to specify the hdfs://node:port
prefix as these are
automatically determined based on the configuration wired into the bean;
this prevents environment settings from being hard-coded into the
configuration which becomes portable. Additionally based on the resource
extension, the definition differentiates between archives (.tgz
,
.tar.gz
, .zip
and .tar
) which will be uncompressed, and regular
files that are copied as-is. As with the rest of the namespace
declarations, the definition above relies on defaults - since it
requires a Hadoop Configuration
and FileSystem
objects and none are
specified (through configuration-ref
and file-system-ref
) it falls
back to the default naming and is wired with the bean named
hadoopConfiguration, creating the FileSystem
automatically.
Warning | |
---|---|
Clients setting up a classpath in the DistributedCache, running on Windows
platforms should set the System <hdp:script language="javascript" run-at-startup="true"> // set System 'path.separator' to ':' - see HADOOP-9123 java.lang.System.setProperty("path.separator", ":") </hdp:script> |
The job
, streaming
and tool
all support a subset of generic
options, specifically archives
, files
and libs
. libs
is
probably the most useful as it enriches a job classpath (typically with
some jars) - however the other two allow resources or archives to be
copied throughout the cluster for the job to consume. Whenver faced with
provisioning issues, revisit these options as they can help up
significantly. Note that the fs
, jt
or conf
options are not
supported - these are designed for command-line usage, for bootstrapping
the application. This is no longer needed, as the SHDP offers
first-class support for defining and customizing Hadoop
configurations.