SHDP provides basic support for Cascading library through the
spring-cascading sub-project. All support
is provided in the
org.springframework.data.hadoop.cascading
package - one
can create Flow
s or Cascade
s, either
through XML or/and Java and execute them, either in a simplistic manner or
as part of a Spring Batch job. In addition, dedicated
Tap
s for Spring environments are available.
As Cascading is aimed at code configuration, typically one would
configure the library programatically. Such code can easily be integrated
into Spring in various ways - through
factory
methods
or @Configuration
and
@Bean
(see this
chapter for more information). In short one uses Java code (or any
JVM language for that matter) to create beans.
The Cascading support provides a dedicated namespace in addition to the regular namespace for Spring for Apache Hadoop. You can include this namespace using the following declaration:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop" xmlns:casc="http://www.springframework.org/schema/cascading" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd http://www.springframework.org/schema/cascading http://www.springframework.org/schema/cascading/spring-cascading.xsd"> <bean id ... > <casc:cascading-runner id="runner" ...> </beans>
Spring for Cascading namespace prefix. Any name can do but
throughout the reference documentation, | |
The namespace URI. | |
The namespace URI location. Note that even though the location points to an external address (which exists and is valid), Spring will resolve the schema locally as it is included in the Spring Cascading library. | |
Declaration example for the Cascading namespace. Notice the prefix usage. |
For example, looking at the official Cascading sample (Cascading for the Impatient, Part2) one can simply call the Cascading setup method from within the Spring container (original vs updated):
public class Impatient { public static FlowDef createFlowDef(String docPath, String wcPath) { // create source and sink taps Tap docTap = new Hfs(new TextDelimited(true, "\t"), docPath); Tap wcTap = new Hfs(new TextDelimited(true, "\t"), wcPath); // specify a regex operation to split the "document" text lines into a token stream Fields token = new Fields("token"); Fields text = new Fields("text"); RegexSplitGenerator splitter = new RegexSplitGenerator(token, "[ \\[\\]\\(\\),.]"); // only returns "token" Pipe docPipe = new Each("token", text, splitter, Fields.RESULTS); // determine the word counts Pipe wcPipe = new Pipe("wc", docPipe); wcPipe = new GroupBy(wcPipe, token); wcPipe = new Every(wcPipe, Fields.ALL, new Count(), Fields.ALL); // connect the taps, pipes, etc., into a flow FlowDef flowDef = FlowDef.flowDef().setName("wc").addSource(docPipe, docTap).addTailSink(wcPipe, wcTap); return flowDef; } }
The entire Cascading configuration (defining the
Flow
) is encapsulated within one method, which can be
called by the container:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop" xmlns:casc="http://www.springframework.org/schema/cascading" xmlns:c="http://www.springframework.org/schema/c" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd http://www.springframework.org/schema/cascading http://www.springframework.org/schema/cascading/spring-cascading.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- factory-method approach called with two parameters available as property placeholders --> <bean id="flowDef" class="impatient.Main" factory-method="createFlowDef" c:_0="${in}" c:_1="${out}"/> <casc:cascading-flow id="wc" definition-ref="flowDef" write-dot="dot/wc.dot"/> <casc:cascading-cascade id="cascade" flow-ref="wc"/> <casc:cascading-runner unit-of-work-ref="cascade" run-at-startup="true"/> </beans>
Note that no jar needs to be setup - the Cascading namespace (in
particular cascading-flow
, backed by
HadoopFlowFactoryBean
) tries to automatically setup the
resulting job classpath. By default, it will automatically add the Cascading
library and its dependency to Hadoop DistributedCache
so
that when the job runs inside the Hadoop cluster, the jars are properly
found. When using custom jars (for example to add custom Cascading
functions) or when running against a cluster that is already provisioned,
one can customize this behaviour through the jar-setup
,
jar
and jar-by-class
. For Cascading
users, these settings are the equivalent of the
AppProps.setApplicationJarClass()
.
Furthermore, one can break down the configuration method in multiple
pieces which is useful for reusing the components between multiple
flows/cascades. This goes hand in hand with Spring
@Configuration
feature - see the example below that
configures a Cascade pipes and taps as individual beans (see the original
example):
@Configuration public class CascadingAnalysisConfig { // fields that act as placeholders for externalized values @Value("${cascade.sec}") private String sec; @Value("${cascade.min}") private String min; @Bean public Pipe tsPipe() { DateParser dateParser = new DateParser(new Fields("ts"), "dd/MMM/yyyy:HH:mm:ss Z"); return new Each("arrival rate", new Fields("time"), dateParser); } @Bean public Pipe tsCountPipe() { Pipe tsCountPipe = new Pipe("tsCount", tsPipe()); tsCountPipe = new GroupBy(tsCountPipe, new Fields("ts")); return new Every(tsCountPipe, Fields.GROUP, new Count()); } @Bean public Pipe tmCountPipe() { Pipe tmPipe = new Each(tsPipe(), new ExpressionFunction(new Fields("tm"), "ts - (ts % (60 * 1000))", long.class)); Pipe tmCountPipe = new Pipe("tmCount", tmPipe); tmCountPipe = new GroupBy(tmCountPipe, new Fields("tm")); return new Every(tmCountPipe, Fields.GROUP, new Count()); } @Bean public Map<String, Tap> sinks(){ Tap tsSinkTap = new Hfs(new TextLine(), sec); Tap tmSinkTap = new Hfs(new TextLine(), min); return Cascades.tapsMap(Pipe.pipes(tsCountPipe(), tmCountPipe()), Tap.taps(tsSinkTap, tmSinkTap)); } @Bean public String regex() { return "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*) [^ ]*\\\" ([^ ]*) ([^ ]*).*$"; } @Bean public Fields fields() { return new Fields("ip", "time", "method", "event", "status", "size"); } }
The class above creates several objects (all part of the Cascading package) (named after the methods) which can be injected or wired just like any other bean (notice how the wiring is done between the beans by point to their methods). One can mix and match (if needed) code and XML configurations inside the same application:
<!-- code configuration class --> <bean class="org.springframework.data.hadoop.cascading.CascadingAnalysisConfig"/> <!-- Tap created through XML rather then code (using Spring's 3.1 c: namespace)--> <bean id="tap" class="cascading.tap.hadoop.Hfs" c:fields-ref="fields" c:string-path-value="${cascade.input}"/> <!-- standard bean declaration used to showcase the container flexibility --> <!-- note the tap and sinks are imported from the CascadingAnalysisConfig bean --> <bean id="analysisFlow" class="org.springframework.data.hadoop.cascading.HadoopFlowFactoryBean" p:configuration-ref="hadoopConfiguration" p:source-ref="tap" p:sinks-ref="sinks"> <property name="tails"><list> <ref bean="tsCountPipe"/> <ref bean="tmCountPipe"/> </list></property> </bean> </list></property> </bean> <casc:cascading-cascade flow="analysisFlow" /> <casc:cascading-runner unit-of-work-ref="cascade" run-at-startup="true"/>
The XML above, whose main purpose is to illustrate possible ways of
configuring, uses SHDP classes to create a Cascade
with
one nested Flow
using the taps and sinks configured by
the code class. Additionally it also shows how the cascade is ran (through
cascading-runner
). The runner will trigger the execution
during the application start-up (notice the
run-at-startup
flag which is by default
false
). Do note that the runner will not run unless
triggered manually or if run-at-startup
is set to
true
. Additionally the runner (as in fact do all runners in SHDP) allows one or multiple
pre
and post
actions to be specified
to be executed before and after each run. Typically other runners (such as
other jobs or scripts) can be specified but any JDK
Callable
can be passed in. For more information on
runners, see the dedicated chapter.
Whether XML or Java config is better is up to the user and is usually
based on the type of the configuration required. Java config suits Cascading
better but note that the FactoryBean
s above handle the
lifecycle and some default configuration for both the
Flow
and Cascade
objects. Either way,
whatever option is used, SHDP fully supports it.
For Spring Batch environments, SHDP provides a dedicated tasklet
(similar to CascadeRunner
above) for executing
Cascade
or Flow
instances, on
demand, as part of a batch or workflow. The declaration is pretty
straightforward:
<casc:tasklet p:unit-of-work-ref="cascade" />
There are quite a number of DSLs built on top of Cascading, most noteably Cascalog (written in Clojure) and Scalding (written in Scala). This documentation will cover Scalding however the same concepts can be applied across the board to all DSLs.
As with the rest of the DSLs, Scalding offers a simplified, fluent
syntax for creating units of code that build on top of Cascading. This in
turn translates to Map Reduce jobs that get executed on Hadoop. Once
compiled, the DSL gets translated into actual JVM classes that get
executed by Scalding through its own Tool
instance
(namely com.twitter.scalding.Tool
). One has the
option of either deploy the Scalding jobs directly (by invoking the
aforementioned Tool
) or use Scalding's
scald.rb
script which does the same thing based on the
various attributes passed to it. Both approaches can be used in SHDP, the
former through the Tool support
(described below) and the latter by invoking the
scald.rb
script directly through the scripting feature.
For example, to run the tutorial examples (say Tutorial1), one can issue the following command:
scripts/scald.rb --local tutorial/Tutorial1.scala
which compiles Tutorial1, creates a bundled jar and runs it on a
local Hadoop instance. When using the Tool
support, the
compilation and the library provisioning are external tasks (just as in
the case of typical Hadoop jobs). The SHDP configuration to run the
tutorial looks as follows:
<!-- the tool automatically is injected with 'hadoopConfiguration' --> <hdp:tool-runner id="scalding" tool-class="com.twitter.scalding.Tool"> <hdp:arg value="tutorial/Tutorial1"/> <hdp:arg value="--local"/> </hdp:tool-runner>
Besides dedicated configuration support, SHDP also provides
read-only Tap
implementations
useful inside Spring environments. Currently they are meant for
local use only such as testing or single-node Hadoop
setups.
The Tap
s in
org.springframework.data.hadoop.cascading.tap.local
tap
(pun intended) into the rich resource support from Spring Framework and
Spring Integration allowing data to flow easily in and out of a Cascading
flow.
Below is a list of the type of Tap
s available and
their backing support.
Table 8.1. Local Tap
s
Tap Name | Tap Type | Backing Resource | Resource Description |
---|---|---|---|
ResourceTap
| Source | Spring
Resource
| classpath, file-system, URL-based or even in-memory content |
MessageSourceTap
| Source | Spring Integration MessageSource | Inbound adapter for anything from arbitrary streams, FTP or JDBC to RSS/Atom and Twitter |
MessageHandlerTap
| Sink | Spring Integration MessageHandler | The opposite of MessageSourceTap :
Outbound adapter for Files, JMS, TCP, etc... |
Note the Tap
s do not require any special
configuration and are fully compatible with the existing Cascading local
Scheme
s. To wit:
<bean id="cp-txt-files" class="org.springframework.data.hadoop.cascading.tap.local.ResourceTap"> <constructor-arg><bean class="cascading.scheme.local.TextLine"/></constructor-arg> <constructor-arg><value>classpath:/data/*.txt</value></constructor-arg> </bean>
The Tap
above reads all the text files in the
classpath, under data
folder, through Cascading
TextLine
. Simply wire that to a Cascading flow (as
described in the previous section) and you are good to go.