One of the common tasks when using Hadoop is interacting with its runtime - whether it is a local setup or a remote cluster, one needs to properly configure and bootstrap Hadoop in order to submit the required jobs. This chapter will focus on how Spring Hadoop (SHDP) leverages Spring's lightweight IoC container to simplify the interaction with Hadoop and make deployment, testing and provisioning easier and more manageable.
To simplify configuration, SHDP provides a dedicated namespace for most of its components. However, one can opt to configure the beans
directly through the usual <bean>
definition. For more information about XML Schema-based configuration in Spring, see
this appendix in the
Spring Framework reference documentation.
To use the SHDP namespace, one just needs to import it inside the configuration:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd"> <bean id ... > <hdp:configuration ...> </beans>
Spring Hadoop namespace prefix. Any name can do but through out the reference documentation, the | |
The namespace URI. | |
The namespace URI location. Note that even though the location points to an external address (which exists and is valid), Spring will resolve the schema locally as it is included in the Spring Hadoop library. | |
Declaration example for the Hadoop namespace. Notice the prefix usage. |
Once declared, the namespace elements can be declared simply by appending the aforementioned prefix. Note that is possible to change the default namespace,
for example from <beans>
to <hdp>
. This is useful for configuration composed mainly of Hadoop components as
it avoids declaring the prefix. To achieve this, simply swap the namespace prefix declaration above:
<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns="http://www.springframework.org/schema/hadoop" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/gemfire/spring-hadoop.xsd"> <beans:bean id ... > <configuration ...> </beans:beans>
The default namespace declaration for this XML file points to the Spring Hadoop namespace. | |
The beans namespace prefix declaration. | |
Bean declaration using the | |
Bean declaration using the |
For the remainder of this doc, to improve readability, the XML examples will simply refer to the <hdp>
namespace
without the namespace declaration, where possible.
In order to use Hadoop, one needs to first configure it namely by creating a Configuration
object. The configuration holds information about the job tracker, the input, output format and the various
other parameters of the map reduce job.
In its simplest form, the configuration definition is a one liner:
<hdp:configuration />
The declaration above defines a Configuration
bean (to be precise a factory bean of type ConfigurationFactoryBean
) named, by default,
hadoop-configuration
. The default name is used, by conventions, by the other elements that require a configuration - this leads to simple and very concise configurations as the
main components can automatically wire themselves up without requiring any specific configuration.
For scenarios where the defaults need to be tweaked, one can pass in additional configuration files:
<hdp:configuration resources="classpath:/custom-site.xml, classpath:/hq-site.xml">
In this example, two additional Hadoop configuration resources are added to the configuration.
Note | |
---|---|
Note that the configuration makes use of Spring's |
In addition to referencing configuration resources, one can tweak Hadoop settings directly through Java Properties
.
This can be quite handy when just a few options need to be changed:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd"> <hdp:configuration> fs.default.name=hdfs://localhost:9000 hadoop.tmp.dir=/tmp/hadoop electric=sea </hdp:configuration> </beans>
One can further customize the settings by avoiding the so called hard-coded values by externalizing them so they can be replaced at runtime, based on the existing environment without touching the configuration:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd"> <hdp:configuration> fs.default.name=${hd.fs} hadoop.tmp.dir=file://${java.io.tmpdir} hangar=${number:18} </hdp:configuration> <context:property-placeholder location="classpath:hadoop.properties" /> </beans>
Through Spring's property placeholder support, SpEL and the environment
abstraction (available in Spring 3.1). one can externalize environment specific properties from the main code base easing the deployment across multiple machines. In the example above, the default file system is
replaced based on the properties available in hadoop.properties
while the temp dir is determined dynamically through SpEL
. Both approaches offer a lot
of flexbility in adapting to the running environment - in fact we use this approach extensivly in the Spring Hadoop test suite to cope with the differences between the different development boxes
and the CI server.
Additionally, external Properties
files can be loaded, Properties
beans (typically declared through Spring's
util
namespace).
Along with the nested properties declaration, this allows customized configurations to be easily declared:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd"> <!-- merge the local properties, the props bean and the two properties files --> <hdp:configuration properties-ref="props" properties-location="cfg-1.properties, cfg-2.properties"> star=chasing captain=eo </hdp:configuration> <util:properties id="props" location="props.properties"/> </beans>
When merging several properties, ones defined locally win. In the example above the configuration properties are the primary source, followed by the props
bean followed by the external
properties file based on their defined order. While it's not typical for a configuration to refer to use so many properties, the example showcases the various options available.
Note | |
---|---|
For more properties utilities, including using the System as a source or fallback, or control over the merging order, consider using Spring's
PropertiesFactoryBean (which is what Spring Hadoop and
util:properties use underneath). |
It is possible to create configuration based on existing ones - this allows one to create dedicated configurations, slightly different from the main ones, usable for certain jobs
(such as streaming - more on that below). Simply use the configuration-ref
attribute to refer to the parent configuration - all its properties will be inherited and
overridden as specified by the child:
<!-- default name is 'hadoop-configuration' --> <hdp:configuration> fs.default.name=${hd.fs} hadoop.tmp.dir=file://${java.io.tmpdir} </hdp:configuration> <hdp:configuration id="custom" configuration-ref="hadoop-configuration"> fs.default.name=${custom.hd.fs} </hdp:configuration> ...
Make sure though you specify a different name since otherwise, since both definitions will have the same name, the Spring container will interpret this as being the same definition (and will usually consider the last one found).
Another option worth mentioning is register-url-handler
which, as the name implies, automatically registers an URL handler in the running VM. This allows urls referrencing
hdfs resource (by using the hdfs
prefix) to be properly resolved - if the handler is not registered, such an URL would through an exception since the VM does not know what
hdfs
mean.
Note | |
---|---|
Since only one URL handler can be registered per VM, at most once, this option is turned off by default. Due to the reasons mentioned before, once enabled if it fails, it will log the error but will not
throw an exception. If your |
Last but not least a reminder that one can mix and match all these options to her preference. In general, consider externalizing configuration since it allows easier updates without interfering with the application configuration. When dealing with multiple, similar configuration use configuration composition as it tends to keep the definitions concise, in sync and easy to update.
Once the Hadoop configuration is taken care of, one needs to actually submit some work to it. SHDP makes it easy to configure and run Hadoop jobs whether they are vanilla map-reduce type or streaming. Let us start with an example:
<hdp:job id="mr-job" input-path="/input/" output-path="/ouput/" mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper" reducer="org.apache.hadoop.examples.WordCount.IntSumReducer"/>
The declaration above creates a typical Hadoop Job
: specifies its input and output, the mapper and the reducer classes. Notice that there is no reference to the Hadoop configuration above - that's
because, if not specified, the default naming convention (hadoop-configuration
) will be used instead. Neither are the key or value types - these two are automatically determined through a best-effort attempt
by analyzing the class information of the mapper and the reducer. Of course, these settings can be overridden: the former through the configuration-ref
element, the latter through key
and
value
attributes.
There are plenty of options available not shown in the example (for simplicity) such as the jar (specified directly or by class), sort or group comparator, the combiner, the partitioner, the codecs to use or the input/output format just to name a few -
they are supported, just take a look at the SHDP schema (Appendix A, Spring Data Hadoop Schema) or simply trigger auto-completion (usually ALT+SPACE
) in your IDE; if it supports XML namespaces and is properly configured it will display the
available elements. Additionally one can extend the default Hadoop configuration object and add any special properties not available in the namespace or its backing bean
(JobFactoryBean
).
It is worth pointing out that per-job specific configurations are supported by specifying the custom properties directly or referring to them (more information on the pattern is available here):
<hdp:job id="mr-job" input-path="/input/" output-path="/ouput/" mapper="mapper class" reducer="reducer class" jar-by-class="class used for jar detection" properties-location="classpath:special-job.properties"> electric=sea </hdp:job>
Note | |
---|---|
The job definition can validate the existance of the input and output paths before submitting the actual job (which is slow), to prevent its failure. Take a look at validate-paths attribute to avoid these errors early on without having to touch the job tracker only to get an exception. |
Hadoop Streaming job (or in short streaming), is a popular feature of Hadoop as they allow the creation of Map/Reduce jobs
with any executable or script (the equivalent of using the previous counting words example is to use cat
and
wc
commands).
While it is rather easy to start up streaming from the command line, doing so programatically, such as from a Java environment, can be challenging due to the various number of parameters (and their ordering)
that need to be parsed. SHDP simplifies such as tasks - it's as easy and straight-forward as declaring a job
from the previous section; in fact most of the attributes will be the same:
<hdp:streaming id="streaming" input-path="/input/" output-path="/ouput/" mapper="${path.cat}" reducer="${path.wc}"/>
Existing users might be wondering how can they pass the command line arguments (such as -D
or -cmdenv
). These former customize the Hadoop configuration
(which has been convered in the previous section) while the latter are supported through the cmd-env
element:
<hdp:streaming id="streaming-env" input-path="/input/" output-path="/ouput/" mapper="${path.cat}" reducer="${path.wc}"> <hdp:cmd-env> EXAMPLE_DIR=/home/example/dictionaries/ ... </hdp:cmd-env> </hdp:streaming>
The jobs, after being created and configured, need to be submitted for execution to a Hadoop cluster. For non-trivial cases, a coordinating, workflow solution such as Spring Batch is recommended . However for basic job submission SHDP provides JobRunner
class which submits several jobs sequentially (and waits by default for their
completion):
<bean id="runner" class="org.springframework.data.hadoop.mapreduce.JobRunner" p:jobs-ref="job"/> <hdp:job id="job" input-path="/input/" output-path="/output/" mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper" reducer="org.apache.hadoop.examples.WordCount.IntSumReducer" />
Multiple jobs can be specified and even nested if they are not used outside the runner:
<bean id="runner" class="org.springframework.data.hadoop.mapreduce.JobRunner"> <property name="jobs"><list> <!-- reference to another job named 'job' --> <ref bean="streaming-job"/> <!-- nested bean definition --> <hdp:job id="nested-job" .... /> </list></property> </bean> <hdp:job id="job" ... />
DistributedCache is a Hadoop facility for distributing application-specific, large, read-only files (
text, archives, jars and so on) efficiently. Applications specify the files to be cached via urls (hdfs://
) using DistributedCache
and the framework will copy the necessary
files to the slave nodes before any tasks for the job are executed on that node. Its efficiency stems from the fact that the files are only copied once per job and the ability to cache archives which are
un-archived on the slaves.
Note that DistributedCache
assumes that the files to be cached (and specified via hdfs:// urls) are already present on the Hadoop FileSystem
.
SHDP provides first-class configuration for the distributed cache through its cache
element (backed by DistributedCacheFactoryBean
class),
allowing files and archives to be easily distributed across nodes:
<hdp:cache create-symlink="true"> <hdp:classpath value="/cp/some-library.jar#library.jar" /> <hdp:cache value="/cache/some-archive.tgz#main-archive" /> <hdp:cache value="/cache/some-resource.res" /> <hdp:local value="some-file.txt" /> </hdp:cache>
The definition above registers several resources with the cache (adding them to the job cache or classpath) and creates symlinks for them. As described in the DistributedCache
documentation, the declaration format is (absolute-path#link-name
).
The link name is determined by the URI fragment (the text following the # such as #library.jar or #main-archive above) - if no name is specified,
the cache bean will infer one based on the resource file name. Note that one does not have to specify the hdfs://node:port
prefix as these are automatically determined based on the
configuration wired into the bean; this prevents environment settings from being hard-coded into the configuration which becomes portable.
Additionally based on the resource extension, the definition differentiates between archives (.tgz
, .tar.gz
, .zip
and .tar
) which will be uncompressed, and regular
files that are copied as-is. As with the rest of the namespace declarations, the definition above relies on defaults - since it requires a Hadoop Configuration
and FileSystem
objects and none are specified (through configuration-ref
and file-system-ref
) it falls back to the default naming and is wired with the bean named
hadoop-configuration, creating the FileSystem
automatically.
For Spring Batch environments, SHDP provides a dedicated tasklet to execute Hadoop jobs as a step in a Spring Batch workflow. An example declaration is shown below:
<hdp:tasklet id="hadoop-tasklet" job-ref="mr-job" wait-for-job="true" />
The tasklet above references a Hadoop job definition named "mr-job". By default, wait-for-job is true so that the tasklet will wait for the job to complete when it executes. Setting wait-for-job to false will submit the job to the Hadoop cluster but not wait for it to complete.