You've propbably seen a lot of topics around Yarn and next version of Hadoop's Map Reduce called MapReduce Version 2. Originally Yarn was a component of MapReduce itself created to overcome some performance issues in Hadoop's original design. The fundamental idea of MapReduce v2 is to split up the two major functionalities of the JobTracker, resource management and job scheduling/monitoring, into separate daemons. The idea is to have a global Resource Manager (RM) and per-application Application Master (AM). An application is either a single job in the classical sense of Map-Reduce jobs or a group of jobs.
Let's take a step back and see how original MapReduce Version 1 works. Job Tracker is a global singleton entity responsible for managing resources like per node Task Trackers and job life-cycle. Task Tracker is responsible for executing tasks from a Job Tracker and periodically reporting back the status of the tasks. Naturally there is a much more going on behind the scenes but the main point of this is that the Job Tracker has always been a bottleneck in terms of scalability. This is where Yarn steps in by splitting the load away from a global resource management and job tracking into per application masters. Global resource manager can then concentrate in its main task of handling the management of resources.
Note | |
---|---|
Yarn is usually referred as a synonym for MapReduce Version 2. This is not exactly true and it's easier to understand the relationship between those two by saying that MapReduce Version 2 is an application running on top of Yarn. |
As we just mentioned MapReduce Version 2 is an application running of top of Yarn. It is possible to make similar custom Yarn based application which have nothing to do with MapReduce. Yarn itself doesn't know that it is running MapReduce Version 2. While there's nothing wrong to do everything from scratch one will soon realise that steps to learn how to work with Yarn are rather deep. This is where Spring Hadoop support for Yarn steps in by trying to make things easier so that user could concentrate on his own code and not having to worry about framework internals.
To simplify configuration, SHDP provides a dedicated namespace for
Yarn components. However, one can opt to configure the beans
directly through the usual <bean>
definition.
For more information about XML Schema-based configuration in Spring, see
this
appendix in the Spring Framework reference documentation.
To use the SHDP namespace, one just needs to import it inside the configuration:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:yarn="http://www.springframework.org/schema/yarn" xmlns:yarn-int="http://www.springframework.org/schema/yarn/integration" xmlns:yarn-batch="http://www.springframework.org/schema/yarn/batch" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/yarn http://www.springframework.org/schema/yarn/spring-yarn.xsd http://www.springframework.org/schema/yarn/integration http://www.springframework.org/schema/yarn/integration/spring-yarn-integration.xsd http://www.springframework.org/schema/yarn/batch http://www.springframework.org/schema/yarn/batch/spring-yarn-batch.xsd"> <bean id ... > <yarn:configuration ...> </beans>
Spring for Apache Hadoop Yarn namespace prefix for core package.
Any name can do but through out the reference documentation, the | |
The namespace URI. | |
Spring for Apache Hadoop Yarn namespace prefix for integration package.
Any name can do but through out the reference documentation, the | |
The namespace URI. | |
Spring for Apache Hadoop Yarn namespace prefix for batch package.
Any name can do but through out the reference documentation, the | |
The namespace URI. | |
The namespace URI location. Note that even though the location points to an external address (which exists and is valid), Spring will resolve the schema locally as it is included in the Spring for Apache Hadoop Yarn library. | |
The namespace URI location. | |
The namespace URI location. | |
Declaration example for the Yarn namespace. Notice the prefix usage. |
Once declared, the namespace elements can be declared simply by
appending the aforementioned prefix. Note that is possible to change
the default namespace, for example from <beans>
to <yarn>
. This is useful for configuration
composed mainly of Hadoop components as it avoids declaring the prefix.
To achieve this, simply swap the namespace prefix declaration above:
<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns="http://www.springframework.org/schema/yarn" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/yarn http://www.springframework.org/schema/yarn/spring-yarn.xsd"> <beans:bean id ... > <configuration ...> </beans:beans>
The default namespace declaration for this XML file points to the Spring for Apache Yarn namespace. | |
The beans namespace prefix declaration. | |
Bean declaration using the | |
Bean declaration using the |
In order to use Hadoop and Yarn, one needs to first configure it namely by
creating a YarnConfiguration
object. The configuration holds
information about the various parameters of the Yarn system.
Note | |
---|---|
Configuration for |
In its simplest form, the configuration definition is a one liner:
<yarn:configuration />
The declaration above defines a YarnConfiguration
bean (to be precise a factory bean of type ConfigurationFactoryBean
)
named, by default, yarnConfiguration
. The default name
is used, by conventions, by the other elements that require
a configuration - this leads to simple and very concise configurations as the
main components can automatically wire themselves up without
requiring any specific configuration.
For scenarios where the defaults need to be tweaked, one can pass in additional configuration files:
<yarn:configuration resources="classpath:/custom-site.xml, classpath:/hq-site.xml">
In this example, two additional Hadoop configuration resources are added to the configuration.
Note | |
---|---|
Note that the configuration makes use of Spring's |
In addition to referencing configuration resources, one can tweak
Hadoop settings directly through Java Properties
.
This can be quite handy when just a few options need to be changed:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:yarn="http://www.springframework.org/schema/yarn" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/yarn http://www.springframework.org/schema/yarn/spring-yarn.xsd"> <yarn:configuration> fs.defaultFS=hdfs://localhost:9000 hadoop.tmp.dir=/tmp/hadoop electric=sea </yarn:configuration> </beans>
One can further customize the settings by avoiding the so called hard-coded values by externalizing them so they can be replaced at runtime, based on the existing environment without touching the configuration:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:yarn="http://www.springframework.org/schema/yarn" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/yarn http://www.springframework.org/schema/yarn/spring-yarn.xsd"> <yarn:configuration> fs.defaultFS=${hd.fs} hadoop.tmp.dir=file://${java.io.tmpdir} hangar=${number:18} </yarn:configuration> <context:property-placeholder location="classpath:hadoop.properties" /> </beans>
Through Spring's property placeholder support, SpEL and the environment
abstraction (available in Spring 3.1). one can externalize
environment specific properties from the main code base easing the
deployment across multiple machines. In the example above, the default
file system is replaced based on the properties available in
hadoop.properties
while the temp dir is determined
dynamically through SpEL
. Both approaches offer a lot
of flexbility in adapting to the running environment - in fact we use this
approach extensivly in the Spring for Apache Hadoop test suite to cope with
the differences between the different development boxes and the CI server.
Additionally, external Properties
files can be loaded, Properties
beans (typically declared through Spring's
util
namespace).
Along with the nested properties declaration, this allows customized configurations to be easily declared:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:yarn="http://www.springframework.org/schema/yarn" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd http://www.springframework.org/schema/yarn http://www.springframework.org/schema/yarn/spring-yarn.xsd"> <!-- merge the local properties, the props bean and the two properties files --> <yarn:configuration properties-ref="props" properties-location="cfg-1.properties, cfg-2.properties"> star=chasing captain=eo </yarn:configuration> <util:properties id="props" location="props.properties"/> </beans>
When merging several properties, ones defined locally win. In the example
above the configuration properties are the primary source, followed by
the props
bean followed by the external properties
file based on their defined order. While it's not typical for a configuration
to refer to use so many properties, the example showcases the various options available.
Note | |
---|---|
For more properties utilities, including using the System as a source or
fallback, or control over the merging order, consider using Spring's
PropertiesFactoryBean (which is what Spring for
Apache Hadoop Yarn and util:properties use underneath). |
It is possible to create configuration
based on existing ones - this allows one to create dedicated configurations, slightly
different from the main ones, usable for certain jobs (such as streaming - more
on that below). Simply use
the configuration-ref
attribute to refer to
the parent configuration - all its properties
will be inherited and overridden as specified by the child:
<!-- default name is 'yarnConfiguration' --> <yarn:configuration> fs.defaultFS=${hd.fs} hadoop.tmp.dir=file://${java.io.tmpdir} </yarn:configuration> <yarn:configuration id="custom" configuration-ref="yarnConfiguration"> fs.defaultFS=${custom.hd.fs} </yarn:configuration> ...
Make sure though you specify a different name since otherwise, since both definitions will have the same name, the Spring container will interpret this as being the same definition (and will usually consider the last one found).
Last but not least a reminder that one can mix and match all these options to her preference. In general, consider externalizing configuration since it allows easier updates without interfering with the application configuration. When dealing with multiple, similar configuration use configuration composition as it tends to keep the definitions concise, in sync and easy to update.
Table 11.1. yarn:configuration
attributes
Name | Values | Description |
---|---|---|
configuration-ref | Bean Reference | Reference to existing Configuration bean |
properties-ref | Bean Reference | Reference to existing Properties bean |
properties-location | Comma delimited list | List or Spring Resource paths |
resources | Comma delimited list | List or Spring Resource paths |
fs-uri | String | The HDFS filesystem address. Equivalent to fs.defaultFS property. |
rm-address | String | The Yarn Resource manager address. Equivalent to yarn.resourcemanager.address property. |
scheduler-address | String | The Yarn Resource manager scheduler address. Equivalent to yarn.resourcemanager.scheduler.address property. |
When Application Master or any other Container is run in a hadoop cluster, there are usually dependencies to various application and configuration files. These files needs to be localized into a running Container by making a physical copy. Localization is a process where dependent files are copied into node's directory structure and thus can be used within the Container itself. Yarn itself tries to provide isolation in a way that multiple containers and applications would not clash.
In order to use local resources, one needs to create an implementation
of ResourceLocalizer
interface. In its
simplest form, resource localizer can be defined as:
<yarn:localresources> <yarn:hdfs path="/path/in/hdfs/my.jar"/> </yarn:localresources>
The declaration above defines a ResourceLocalizer
bean (to be precise a factory bean of type
LocalResourcesFactoryBean
) named, by default,
yarnLocalresources. The default name is used, by
conventions, by the other elements that require a reference to a
resource localizer. It's explained later how this reference
is used when container launch context is defined.
It is also possible to define path as pattern. This makes it easier to pick up all or subset of files from a directory.
<yarn:localresources> <yarn:hdfs path="/path/in/hdfs/*.jar"/> </yarn:localresources>
Behind the scenes it's not enough to simple have a reference to file in a hdfs file system. Yarn itself when localizing resources into container needs to do a consistency check for copied files. This is done by checking file size and timestamp. This information needs to passed to yarn together with a file path. Order to do this the one who defines these beans needs to ask this information from hdfs prior to sending out resouce localizer request. This kind of behaviour exists to make sure that once localization is defined, Container will fail fast if dependant files were replaced during the process.
On default the hdfs base address is coming from a Yarn configuration and
ResourceLocalizer
bean will use configuration named
yarnLocalresources. If there is a need to use something else
than the default bean, configuration parameter
can be used to make a reference to other defined configurations.
<yarn:localresources configuration="yarnConfiguration"> <yarn:hdfs path="/path/in/hdfs/my.jar"/> </yarn:localresources>
For example, client defining a launch context for
Application Master needs to access dependent hdfs entries. The one
defining and using ResourceLocalizer
bean
may have a different hdfs address than the Node Manager preparing the
Container. Effectively hdfs entry given to resource localizer needs to be
accessed from a Node Manager.
To overcome this problem, parameters local and remote can be used to define a different hdfs base entries.
<yarn:localresources local="hdfs://0.0.0.0:9000" remote="hdfs://10.10.10.10:9000"> <yarn:hdfs path="/app/multi-context/multi-context-1.0.0.M1.jar"/> <yarn:hdfs path="/app/spring-yarn-core-1.0.0.BUILD-SNAPSHOT.jar"/> </yarn:localresources>
Yarn resource localizer is using additional parameters to define entry type and visibility. Usage is described below:
<yarn:localresources> <yarn:hdfs path="/path/in/hdfs/my.jar" type="FILE" visibility="APPLICATION"/> </yarn:localresources>
For convenience it is possible to copy files into hdfs during the localization process using a yarn:copy tag. Currently base staging directory is /syarn/staging/xx where xx is a unique identifier per application instance.
<yarn:localresources> <yarn:copy src="file:/local/path/to/files/*jar" staging="true"/> <yarn:hdfs path="/*" staging="true"/> </yarn:localresources>
Table 11.2. yarn:localresources
attributes
Name | Values | Description |
---|---|---|
configuration | Bean Reference | A reference to configuration bean name, default is yarnConfiguration |
local | HDFS Base URL | Global default if not defined in entry level |
remote | HDFS Base URL | Global default if not defined in entry level |
type | ARCHIVE , FILE , PATTERN | Global default if not defined in entry level |
visibility | PUBLIC , PRIVATE , APPLICATION | Global default if not defined in entry level |
Table 11.3. yarn:hdfs
attributes
Name | Values | Description |
---|---|---|
path | HDFS Path | Path in hdfs |
local | HDFS Base URL | Path accessible by a running container |
remote | HDFS Base URL | Path accessible by a client |
type | ARCHIVE , FILE (default), PATTERN | ARCHIVE - automatically unarchived by the Node Manager, FILE - regular file, PATTERN - hybrid between archive and file. |
visibility | PUBLIC , PRIVATE , APPLICATION (default) | PUBLIC - Shared by all users on the node, PRIVATE - Shared among all applications of the same user on the node, APPLICATION - Shared only among containers of the same application on the node |
staging | true , false (default) | Internal temporary stagind directory. |
Table 11.4. yarn:copy
attributes
Name | Values | Description |
---|---|---|
src | Copy sources | Comma delimited list of resource patterns |
staging | true , false (default) | Internal temporary stagind directory. |
One central concept in Yarn is to use environment variables which then can be read from a container. While it's possible to read those variable at any time it is considered bad design if one chooce to do so. Spring Yarn will pass variable into application before any business methods are executed, which makes things more clearly and testing becomes much more easier.
<yarn:environment/>
The declaration above defines a Map
bean (to be precise a factory bean of type
EnvironmentFactoryBean
) named, by default,
yarnEnvironment. The default name is used, by
conventions, by the other elements that require a reference to a
environment variables.
For conveniance it is possible to define a classpath entry directly into an environment. Most likely one is about to run some java code with libraries so classpath needs to be defined anyway.
<yarn:environment include-system-env="false"> <yarn:classpath default-yarn-app-classpath="true" delimiter=":"> ./* </yarn:classpath> </yarn:environment>
If default-yarn-app-classpath parameter is set to true(default value) a default yarn entries will be added to classpath automatically. Resulting entries are shown below:
$HADOOP_CONF_DIR: $HADOOP_COMMON_HOME/*: $HADOOP_COMMON_HOME/lib/*: $HADOOP_COMMON_HOME/share/hadoop/common/*: $HADOOP_COMMON_HOME/share/hadoop/common/lib/*: $HADOOP_HDFS_HOME/*: $HADOOP_HDFS_HOME/lib/*: $HADOOP_HDFS_HOME/share/hadoop/hdfs/*: $HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*: $YARN_HOME/*: $YARN_HOME/lib/*: $HADOOP_YARN_HOME/share/hadoop/yarn/*: $HADOOP_YARN_HOME/share/hadoop/yarn/lib/*
Note | |
---|---|
Be carefull if passing environment variables between different systems. For example if running a client on Windows and passing variables to Application Master running on Linux, execution wrapper in Yarn may silently fail. |
Table 11.5. yarn:environment
attributes
Name | Values | Description |
---|---|---|
include-system-env | true (default), false | Defines whether system environment variables are actually added to this bean. |
Table 11.6. classpath
attributes
Name | Values | Description |
---|---|---|
default-yarn-app-classpath | true (default), false | Defines whether default yarn entries are added to classpath. |
delimiter | Delimiter string, default is ":" | Defines delimiter used in a classpath string |
Client is always your entry point when interacting with a Yarn system whether one is about to submit a new application instance or just querying Resource Manager for running application(s) status. Currently support for client is very limited and a simple command to start Application Master can be defined. If there is just a need to query Resource Manager, command definition is not needed.
<yarn:client app-name="customAppName"> <yarn:master-command> <![CDATA[ /usr/local/java/bin/java org.springframework.yarn.am.CommandLineAppmasterRunner appmaster-context.xml yarnAppmaster container-count=2 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr ]]> </yarn:master-command> </yarn:client>
The declaration above defines a YarnClient
bean (to be precise a factory bean of type
YarnClientFactoryBean
) named, by default,
yarnClient. It also defines a command launching
an Application Master using
<master-command>
entry which is also a way
to define the raw commands. If this yarnClient
instance is used to submit an application, its name would come from
a app-name attribute.
<yarn:client app-name="customAppName"> <yarn:master-runner/> </yarn:client>
For a convinience entry <master-runner>
can be used to define same command entries.
<yarn:client app-name="customAppName"> <util:properties id="customArguments"> container-count=2 </util:properties> <yarn:master-runner command="java" context-file="appmaster-context.xml" bean-name="yarnAppmaster" arguments="customArguments" stdout="<LOG_DIR>/AppMaster.stdout" stderr="<LOG_DIR>/AppMaster.stderr" /> </yarn:client>
All previous three examples are effectively identical from Spring Yarn point of view.
Note | |
---|---|
The <LOG_DIR> refers to Hadoop's dedicated log directory for the running container. |
<yarn:client app-name="customAppName" configuration="customConfiguration" resource-localizer="customResources" environment="customEnv" priority="1" virtualcores="2" memory="11" queue="customqueue"> <yarn:master-runner/> </yarn:client>
If there is a need to change some of the parameters for the
Application Master submission, memory
and virtualcores
defines the container settings.
For submission, queue
and priority
defines how submission is actually done.
Table 11.7. yarn:client
attributes
Name | Values | Description |
---|---|---|
app-name | Name as string, default is empty | Yarn submitted application name |
configuration | Bean Reference | A reference to configuration bean name, default is yarnConfiguration |
resourcelocalizer | Bean Reference | A reference to resource localizer bean name, default is yarnLocalresources |
environment | Bean Reference | A reference to environment bean name, default is yarnEnvironment |
template | Bean Reference | A reference to a bean implementing
ClientRmOperations |
memory | Memory as integer, default is "64" | Amount of memory for appmaster resource |
virtualcores | Cores as integer, default is "1" | Number of appmaster resource virtual cores |
priority | Priority as integer, default is "0" | Submission priority |
queue | Queue string, default is "default" | Submission queue |
Table 11.8. yarn:master-command
Name | Values | Description |
---|---|---|
Entry content | List of commands | Commands defined in this entry are aggregated into a single command line |
Table 11.9. yarn:master-runner
attributes
Name | Values | Description |
---|---|---|
command | Main command as string, default is "java" | Command line first entry |
context-file | Name of the Spring context file, default is "appmaster-context.xml" | Command line second entry |
bean-name | Name of the Spring bean, default is "yarnAppmaster" | Command line third entry |
arguments | Reference to Java's Properties | Added to command line parameters as key/value pairs separated by '=' |
stdout | Stdout, default is "<LOG_DIR>/AppMaster.stdout" | Appended with 1> |
stderr | Stderr, default is "<LOG_DIR>/AppMaster.stderr" | Appended with 2> |
Application master is responsible for container allocation, launching and monitoring.
<yarn:master> <yarn:container-allocator hosts="host1,host2" racks="rack1,rack2" virtualcores="1" memory="64" priority="0"/> <yarn:container-launcher username="whoami"/> <yarn:container-command> <![CDATA[ /usr/local/java/bin/java org.springframework.yarn.container.CommandLineContainerRunner container-context.xml 1><LOG_DIR>/Container.stdout 2><LOG_DIR>/Container.stderr ]]> </yarn:container-command> </yarn:master>
The declaration above defines a YarnAppmaster
bean (to be precise a bean of type
StaticAppmaster
) named, by default,
yarnAppmaster. It also defines a command launching
a Container(s) using
<container-command>
entry, parameters
for allocation using <container-allocator>
entry and finally a launcher parameter using
<container-launcher>
entry.
Currently there is a simple implementation of
StaticAppmaster
which is able to allocate and
launch a number of containers. These containers are monitored
by querying resource manager for container execution completion.
<yarn:master> <yarn:container-runner/> </yarn:master>
For a convinience entry <container-runner>
can be used to define same command entries.
<yarn:master> <util:properties id="customArguments"> some-argument=myvalue </util:properties> <yarn:container-runner command="java" context-file="container-context.xml" bean-name="yarnContainer" arguments="customArguments" stdout="<LOG_DIR>/Container.stdout" stderr="<LOG_DIR>/Container.stderr" /> </yarn:master>
Table 11.10. yarn:master
attributes
Name | Values | Description |
---|---|---|
configuration | Bean Reference | A reference to configuration bean name, default is yarnConfiguration |
resourcelocalizer | Bean Reference | A reference to resource localizer bean name, default is yarnLocalresources |
environment | Bean Reference | A reference to environment bean name, default is yarnEnvironment |
Table 11.11. yarn:container-allocator
attributes
Name | Values | Description |
---|---|---|
hosts | List of hosts | Preferred hostname of nodes for allocation. |
racks | List of racks | Preferred name of racks for allocation. |
virtualcores | Integer | number of virtual cpu cores of the resource. |
memory | Integer, as of MBs. | memory of the resource. |
priority | Integer | Assigned priority of a request. |
Table 11.12. yarn:container-launcher
attributes
Name | Values | Description |
---|---|---|
username | String | Set the user to whom the container has been allocated. |
Table 11.13. yarn:container-runner
attributes
Name | Values | Description |
---|---|---|
command | Main command as string, default is "java" | Command line first entry |
context-file | Name of the Spring context file, default is "container-context.xml" | Command line second entry |
bean-name | Name of the Spring bean, default is "yarnContainer" | Command line third entry |
arguments | Reference to Java's Properties | Added to command line parameters as key/value pairs separated by '=' |
stdout | Stdout, default is "<LOG_DIR>/Container.stdout" | Appended with 1> |
stderr | Stderr, default is "<LOG_DIR>/Container.stderr" | Appended with 2> |
There is very little what Spring Yarn needs
to know about the Container in terms of its configuration.
There is a simple contract
between org.springframework.yarn.container.CommandLineContainerRunner
and a bean it's trying to run on default. Default bean name
is yarnContainer.
There is a simple interface
org.springframework.yarn.container.YarnContainer
which container needs to implement.
public interface YarnContainer { void run(); void setEnvironment(Map<String, String> environment); void setParameters(Properties parameters); }
There are few different ways how Container can be defined in Spring xml configuration. Natively without using namespaces bean can be defined with a correct name:
<bean id="yarnContainer" class="org.springframework.yarn.container.TestContainer">
Spring Yarn namespace will make it even more simpler. Below example just defines class which implements needed interface.
<yarn:container container-class="org.springframework.yarn.container.TestContainer"/>
It's possible to make a reference to existing bean. This is usefull if bean cannot be instantiated with default constructor.
<bean id="testContainer" class="org.springframework.yarn.container.TestContainer"/> <yarn:container container-ref="testContainer"/>
It's also possible to inline the bean definition.
<yarn:container> <bean class="org.springframework.yarn.container.TestContainer"/> </yarn:container>
It is fairly easy to create an application which launches a few containers and then leave those to do their tasks. This is pretty much what Distributed Shell example application in Yarn is doing. In that example a container is configured to run a simple shell command and Application Master only tracks when containers have finished. If only need from a framework is to be able to fire and forget then that's all you need, but most likely a real-world Yarn application will need some sort of collaboration with Application Master. This communication is initiated either from Application Client or Application Container.
Yarn framework itself doesn't define any kind of general communication API for Application Master. There are APIs for communicating with Container Manager and Resource Manager which are used on within a layer not necessarily exposed to a user. Spring Yarn defines a general framework to talk to Application Master through an abstraction and currently a JSON based rpc system exists.
This chapter concentrates on developer concepts to create a custom services for Application Master, configuration options for built-in services can be found from sections below - Appmaster Service and Appmaster Service Client.
Having a communication framework between Application Master and Container/Client involves few moving parts. Firstly there has to be some sort of service running on an Application Master. Secondly user of this service needs to know where it is and how to connect to it. Thirtly, if not creating these services from scratch, it'd be nice if some sort of abstraction already exist.
Contract for appmaster service is very simple,
Application Master Service needs to implement
AppmasterService
interface be registered
with Spring application context. Actual appmaster instance will then
pick it up from a bean factory.
public interface AppmasterService { int getPort(); boolean hasPort(); String getHost(); }
Application Master Service framework currently provides integration for services acting as service for a Client or a Container. Only difference between these two roles is how the Service Client gets notified about the address of the service. For the Client this information is stored within the Hadoop Yarn resource manager. For the Container this information is passed via environment within the launch context.
<bean id="yarnAmservice" class="AppmasterServiceImpl" /> <bean id="yarnClientAmservice" class="AppmasterClientServiceImpl" />
Example above shows a default bean names, yarnAmservice and yarnClientAmservice respectively recognised by Spring Yarn.
Interface AppmasterServiceClient
is
currently an empty interface just marking class to be
a appmaster service client.
public interface AppmasterServiceClient { }
Default implementations can be used to exchange messages using a simple domain classes and actual messages are converted into json and send over the transport.
<yarn-int:amservice service-impl="org.springframework.yarn.integration.ip.mind.TestService" default-port="1234"/> <yarn-int:amservice-client service-impl="org.springframework.yarn.integration.ip.mind.DefaultMindAppmasterServiceClient" host="localhost" port="1234"/>
@Autowired AppmasterServiceClient appmasterServiceClient; @Test public void testServiceInterfaces() throws Exception { SimpleTestRequest request = new SimpleTestRequest(); SimpleTestResponse response = (SimpleTestResponse) ((MindAppmasterServiceClient)appmasterServiceClient). doMindRequest(request); assertThat(response.stringField, is("echo:stringFieldValue")); }
When default implementations for Application master services are exchanging messages, converters are net registered automatically. There is a namespace tag converters to ease this configuration.
<bean id="mapper" class="org.springframework.yarn.integration.support.Jackson2ObjectMapperFactoryBean" /> <yarn-int:converter> <bean class="org.springframework.yarn.integration.convert.MindObjectToHolderConverter"> <constructor-arg ref="mapper"/> </bean> </yarn-int:converter> <yarn-int:converter> <bean class="org.springframework.yarn.integration.convert.MindHolderToObjectConverter"> <constructor-arg ref="mapper"/> <constructor-arg value="org.springframework.yarn.batch.repository.bindings"/> </bean> </yarn-int:converter>
This section of this document is about configuration, more about general concepts for see a Section 11.8, “Application Master Services”.
Currently Spring Yarn have support for services using Spring Integration tcp channels as a transport.
<bean id="mapper" class="org.springframework.yarn.integration.support.Jackson2ObjectMapperFactoryBean" /> <yarn-int:converter> <bean class="org.springframework.yarn.integration.convert.MindObjectToHolderConverter"> <constructor-arg ref="mapper"/> </bean> </yarn-int:converter> <yarn-int:converter> <bean class="org.springframework.yarn.integration.convert.MindHolderToObjectConverter"> <constructor-arg ref="mapper"/> <constructor-arg value="org.springframework.yarn.integration.ip.mind"/> </bean> </yarn-int:converter> <yarn-int:amservice service-impl="org.springframework.yarn.integration.ip.mind.TestService"/>
If there is a need to manually configure the server side dispatch channel, a little bit more configuration is needed.
<bean id="serializer" class="org.springframework.yarn.integration.ip.mind.MindRpcSerializer" /> <bean id="deserializer" class="org.springframework.yarn.integration.ip.mind.MindRpcSerializer" /> <bean id="socketSupport" class="org.springframework.yarn.integration.support.DefaultPortExposingTcpSocketSupport" /> <ip:tcp-connection-factory id="serverConnectionFactory" type="server" port="0" socket-support="socketSupport" serializer="serializer" deserializer="deserializer"/> <ip:tcp-inbound-gateway id="inboundGateway" connection-factory="serverConnectionFactory" request-channel="serverChannel" /> <int:channel id="serverChannel" /> <yarn-int:amservice service-impl="org.springframework.yarn.integration.ip.mind.TestService" channel="serverChannel" socket-support="socketSupport"/>
Table 11.14. yarn-int:amservice
attributes
Name | Values | Description |
---|---|---|
service-impl | Class Name | Full name of the class implementing a service |
service-ref | Bean Reference | Reference to a bean name implementing a service |
channel | Spring Int channel | Custom message dispatching channel |
socket-support | Socket support reference | Custom socket support class |
This section of this document is about configuration, more about general concepts for see a Section 11.8, “Application Master Services”.
Currently Spring Yarn have support for services using Spring Integration tcp channels as a transport.
<bean id="mapper" class="org.springframework.yarn.integration.support.Jackson2ObjectMapperFactoryBean" /> <yarn-int:converter> <bean class="org.springframework.yarn.integration.convert.MindObjectToHolderConverter"> <constructor-arg ref="mapper"/> </bean> </yarn-int:converter> <yarn-int:converter> <bean class="org.springframework.yarn.integration.convert.MindHolderToObjectConverter"> <constructor-arg ref="mapper"/> <constructor-arg value="org.springframework.yarn.integration.ip.mind"/> </bean> </yarn-int:converter> <yarn-int:amservice-client service-impl="org.springframework.yarn.integration.ip.mind.DefaultMindAppmasterServiceClient" host="${SHDP_AMSERVICE_HOST}" port="${SHDP_AMSERVICE_PORT}"/>
If there is a need to manually configure the server side dispatch channel, a little bit more configuration is needed.
<bean id="serializer" class="org.springframework.yarn.integration.ip.mind.MindRpcSerializer" /> <bean id="deserializer" class="org.springframework.yarn.integration.ip.mind.MindRpcSerializer" /> <ip:tcp-connection-factory id="clientConnectionFactory" type="client" host="localhost" port="${SHDP_AMSERVICE_PORT}" serializer="serializer" deserializer="deserializer"/> <ip:tcp-outbound-gateway id="outboundGateway" connection-factory="clientConnectionFactory" request-channel="clientRequestChannel" reply-channel="clientResponseChannel" /> <int:channel id="clientRequestChannel" /> <int:channel id="clientResponseChannel" > <int:queue /> </int:channel> <yarn-int:amservice-client service-impl="org.springframework.yarn.integration.ip.mind.DefaultMindAppmasterServiceClient" request-channel="clientRequestChannel" response-channel="clientResponseChannel"/>
Table 11.15. yarn-int:amservice-client
attributes
Name | Values | Description |
---|---|---|
service-impl | Class Name | Full name of the class implementing a service client |
host | Hostname | Host of the running appmaster service |
port | Port | Port of the running appmaster service |
request-channel | Reference to Spring Int request channel | Custom channel |
response-channel | Reference to Spring Int response channel | Custom channel |
In this chapter we assume you are fairly familiar with concepts using Spring Batch. Many batch processing problems can be solved with single threaded, single process jobs, so it is always a good idea to properly check if that meets your needs before thinking about more complex implementations. When you are ready to start implementing a job with some parallel processing, Spring Batch offers a range of options. At a high level there are two modes of parallel processing: single process, multi-threaded; and multi-process.
Spring Hadoop contains a support for running Spring Batch jobs on a Hadoop cluster. For better parallel processing Spring Batch partitioned steps can be executed on a Hadoop cluster as remote steps.
Starting point running a Spring Batch Job is always the Application Master whether a job is just simple job with or without partitioning. In case partitioning is not used the whole job would be run within the Application Master and no Containers would be launched. This may seem a bit odd to run something on Hadoop without using Containers but one should remember that Application Master is also just a resource allocated from a Hadoop cluster.
Order to run Spring Batch jobs on a Hadoop cluster, few constraints exists:
Job Context - Application Master is the main entry point of running the job.
Job Repository - Application Master needs to have access to a repository which is located either in-memory or in a database. These are the two type natively supported by Spring Batch.
Remote Steps - Due to nature how Spring Batch partitioning works, remote step needs an access to a job repository.
Configuration for Spring Batch Jobs is very similar
what is needed for normal batch configuration because effectively
that's what we are doing. Only difference is a way a job is
launched which in this case is automatically handled by
Application Master. Implementation of
a job launching logic is very similar compared to
CommandLineJobRunner
found from a
Spring Batch.
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager"/> </bean> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository"/> </bean>
The declaration above define beans for
JobRepository
and
JobLauncher
. For simplisity
we used in-memory repository while it would be possible
to switch into repository working with a database if
persistence is needed. A bean named jobLauncher
is later used within the Application Master
to launch jobs.
<bean id="yarnEventPublisher" class="org.springframework.yarn.event.DefaultYarnEventPublisher"/> <yarn-batch:master/>
The declaration above defines BatchAppmaster
bean named, by default, yarnAppmaster
and
YarnEventPublisher
bean named yarnEventPublisher
which is not
created automatically.
Final step to finalize our very simple batch configuration is to define the actual batch job.
<bean id="hello" class="org.springframework.yarn.examples.PrintTasklet"> <property name="message" value="Hello"/> </bean> <batch:job id="job"> <batch:step id="master"> <batch:tasklet transaction-manager="transactionManager" ref="hello"/> </batch:step> </batch:job>
The declaration above defines a simple job and tasklet.
Job is named as job
which is the default job
name searched by Application Master. It
is possible to use different name by changing the launch
configuration.
Table 11.16. yarn-batch:master
attributes
Name | Values | Description |
---|---|---|
configuration | Bean Reference | A reference to configuration bean name, default is yarnConfiguration |
resourcelocalizer | Bean Reference | A reference to resource localizer bean name, default is yarnLocalresources |
environment | Bean Reference | A reference to environment bean name, default is yarnEnvironment |
job-name | Bean Name Reference | A name reference to Spring Batch job, default is job |
job-launcher | Bean Reference | A reference to job launcher bean name, default is
jobLauncher. Target is a normal
Spring Batch bean implementing
JobLauncher . |
Let's take a quick look how Spring Batch partitioning is
handled. Concept of running a partitioned job involves three things,
Remote steps, Partition Handler
and a Partitioner. If we do a little bit of
oversimplification a remote step is like any other step from a user
point of view. Spring Batch itself does not contain implementations for
any proprietary grid or remoting fabrics. Spring Batch does however
provide a useful implementation of PartitionHandler
that executes Steps locally in separate threads of execution,
using the TaskExecutor
strategy from Spring.
Spring Hadoop provides implementation to execute Steps remotely
on a Hadoop cluster.
Note | |
---|---|
For more background information about the Spring Batch Partitioning, read the Spring Batch reference documentation. |
As we previously mentioned a step executed on a remote host also need to access a job repository. If job repository would be based on a database instance, configuration could be similar on a container compared to application master. In our configuration example the job repository is in-memory based and remote steps needs access for it. Spring Yarn Batch contains implementation of a job repository which is able to proxy request via json requests. Order to use that we need to enable application client service which is exposing this service.
<bean id="jobRepositoryRemoteService" class="org.springframework.yarn.batch.repository.JobRepositoryRemoteService" > <property name="mapJobRepositoryFactoryBean" ref="&jobRepository"/> </bean> <bean id="batchService" class="org.springframework.yarn.batch.repository.BatchAppmasterService" > <property name="jobRepositoryRemoteService" ref="jobRepositoryRemoteService"/> </bean> <yarn-int:amservice service-ref="batchService"/>
he declaration above defines JobRepositoryRemoteService
bean named jobRepositoryRemoteService
which is then
connected into Application Master Service
exposing job repository via Spring Integration Tcp channels.
As job repository communication messages are exchanged via custom json messages, converters needs to be defined.
<bean id="mapper" class="org.springframework.yarn.integration.support.Jackson2ObjectMapperFactoryBean" /> <yarn-int:converter> <bean class="org.springframework.yarn.integration.convert.MindObjectToHolderConverter"> <constructor-arg ref="mapper"/> </bean> </yarn-int:converter> <yarn-int:converter> <bean class="org.springframework.yarn.integration.convert.MindHolderToObjectConverter"> <constructor-arg ref="mapper"/> <constructor-arg value="org.springframework.yarn.batch.repository.bindings"/> </bean> </yarn-int:converter>
Previously we made a choice to use in-memore job repository running inside the application master. Now we need to talk to this repository via client service. We start by adding same converters as in application master.
<bean id="mapper" class="org.springframework.yarn.integration.support.Jackson2ObjectMapperFactoryBean" /> <yarn-int:converter> <bean class="org.springframework.yarn.integration.convert.MindObjectToHolderConverter"> <constructor-arg ref="mapper"/> </bean> </yarn-int:converter> <yarn-int:converter> <bean class="org.springframework.yarn.integration.convert.MindHolderToObjectConverter"> <constructor-arg ref="mapper"/> <constructor-arg value="org.springframework.yarn.batch.repository.bindings"/> </bean> </yarn-int:converter>
We use general client implementation able to communicate with a service running on Application Master.
<yarn-int:amservice-client service-impl="org.springframework.yarn.integration.ip.mind.DefaultMindAppmasterServiceClient" host="${SHDP_AMSERVICE_HOST}" port="${SHDP_AMSERVICE_PORT}" />
Remote step is just like any other step.
<bean id="hello" class="org.springframework.yarn.examples.PrintTasklet"> <property name="message" value="Hello"/> </bean> <batch:step id="remoteStep"> <batch:tasklet transaction-manager="transactionManager" start-limit="100" ref="hello"/> </batch:step>
We need to have a way to locate the step from an application context. For this we can define a step locator which is later configured into running container.
<bean id="stepLocator" class="org.springframework.yarn.batch.partition.BeanFactoryStepLocator"/>
Spring Hadoop contains a custom job repository implementation which is able to talk back to a remote instance via custom json protocol.
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/> <bean id="jobRepository" class="org.springframework.yarn.batch.repository.RemoteJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager"/> <property name="appmasterScOperations" ref="yarnAmserviceClient"/> </bean> <bean id="jobExplorer" class="org.springframework.yarn.batch.repository.RemoteJobExplorerFactoryBean"> <property name="repositoryFactory" ref="&jobRepository" /> </bean>
Finally we define a Container understanding how to work with a remote steps.
<bean id="yarnContainer" class="org.springframework.yarn.batch.container.DefaultBatchYarnContainer"> <property name="stepLocator" ref="stepLocator"/> <property name="jobExplorer" ref="jobExplorer"/> <property name="integrationServiceClient" ref="yarnAmserviceClient"/> </bean>