SHDP provides basic support for Cascading library through the org.springframework.data.hadoop.cascading
package -
one can create Flow
s or Cascade
s, either through XML or/and Java and execute them, either in a simplistic manner or as part of a Spring Batch job.
In addition, dedicated Tap
s for Spring environments are available.
As Cascading is aimed at code configuration, typically one would configure the library programatically. Such code can easily be integrated into Spring in various ways - through
factory methods
or
@Configuration
and @Bean
(see this chapter
for more information). In short one use Java code (or any JVM language for that matter) to create beans.
For example, looking at the official Cascading sample (Cascading for the Impatient, Part2) one can simply call the Cascading setup method from within the Spring container ( original vs updated):
public class Impatient { public static FlowDef createFlowDef(String docPath, String wcPath) { // create source and sink taps Tap docTap = new Hfs(new TextDelimited(true, "\t"), docPath); Tap wcTap = new Hfs(new TextDelimited(true, "\t"), wcPath); // specify a regex operation to split the "document" text lines into a token stream Fields token = new Fields("token"); Fields text = new Fields("text"); RegexSplitGenerator splitter = new RegexSplitGenerator(token, "[ \\[\\]\\(\\),.]"); // only returns "token" Pipe docPipe = new Each("token", text, splitter, Fields.RESULTS); // determine the word counts Pipe wcPipe = new Pipe("wc", docPipe); wcPipe = new GroupBy(wcPipe, token); wcPipe = new Every(wcPipe, Fields.ALL, new Count(), Fields.ALL); // connect the taps, pipes, etc., into a flow FlowDef flowDef = FlowDef.flowDef().setName("wc").addSource(docPipe, docTap).addTailSink(wcPipe, wcTap); return flowDef; } }
The entire Cascading configuration (defining the Flow
) is encapsulated within one method, which can be called by the container:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop" xmlns:c="http://www.springframework.org/schema/c" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd" <!-- factory-method approach called with two parameters available as property placeholders --> <bean id="flowDef" class="impatient.Main" factory-method="createFlowDef" c:_0="${in}" c:_1="${out}"/> <hdp:cascading-flow id="wc" definition-ref="flowDef" write-dot="dot/wc.dot"/> <hdp:cascading-cascade id="cascade" flow-ref="wc"/> <hdp:cascading-runner unit-of-work-ref="cascade" run-at-startup="true"/> </beans>
Note that no jar needs to be setup - the Cascading namespace (in particular cascading-flow
, backed by HadoopFlowFactoryBean
) tries to automatically setup the
resulting job classpath. By default, it will automatically add the Cascading library and its dependency to Hadoop DistributedCache
so that when the job runs inside the Hadoop
cluster, the jars are properly found. When using custom jars (for example to add custom Cascading functions) that already include Cascading or when running against a cluster that is already provisioned,
one can customize this behaviour through the add-cascading-jars
, jar
and jar-by-class
attributes.
For Cascading users, these settings are the equivalent of the AppProps.setApplicationJarClass()
.
Further more, one can break-down the configuration method in multiple pieces which is useful for reusing the components between multiple flows/cascades. This goes hand in hand with Spring
@Configuration
feature - see the example below that configures a Cascade pipes and taps as individual beans
(see the original example):
@Configuration public class CascadingAnalysisConfig { // fields that act as placeholders for externalized values @Value("${cascade.sec}") private String sec; @Value("${cascade.min}") private String min; @Bean public Pipe tsPipe() { DateParser dateParser = new DateParser(new Fields("ts"), "dd/MMM/yyyy:HH:mm:ss Z"); return new Each("arrival rate", new Fields("time"), dateParser); } @Bean public Pipe tsCountPipe() { Pipe tsCountPipe = new Pipe("tsCount", tsPipe()); tsCountPipe = new GroupBy(tsCountPipe, new Fields("ts")); return new Every(tsCountPipe, Fields.GROUP, new Count()); } @Bean public Pipe tmCountPipe() { Pipe tmPipe = new Each(tsPipe(), new ExpressionFunction(new Fields("tm"), "ts - (ts % (60 * 1000))", long.class)); Pipe tmCountPipe = new Pipe("tmCount", tmPipe); tmCountPipe = new GroupBy(tmCountPipe, new Fields("tm")); return new Every(tmCountPipe, Fields.GROUP, new Count()); } @Bean public Map<String, Tap> sinks(){ Tap tsSinkTap = new Hfs(new TextLine(), sec); Tap tmSinkTap = new Hfs(new TextLine(), min); return Cascades.tapsMap(Pipe.pipes(tsCountPipe(), tmCountPipe()), Tap.taps(tsSinkTap, tmSinkTap)); } @Bean public String regex() { return "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*) [^ ]*\\\" ([^ ]*) ([^ ]*).*$"; } @Bean public Fields fields() { return new Fields("ip", "time", "method", "event", "status", "size"); } }
The class above creates several objects (all part of the Cascading package) (named after the methods) which can be injected or wired just like any other bean (notice how the wiring is done between the beans by point to their methods). One can mix and match (if needed) code and XML configurations inside the same application:
<!-- code configuration class --> <bean class="org.springframework.data.hadoop.cascading.CascadingAnalysisConfig"/> <!-- Tap created through XML rather then code (using Spring's 3.1 c: namespace)--> <bean id="tap" class="cascading.tap.hadoop.Hfs" c:fields-ref="fields" c:string-path-value="${cascade.input}"/> <!-- standard bean declaration used to showcase the container flexibility --> <!-- note the tap and sinks are imported from the CascadingAnalysisConfig bean --> <bean id="analysisFlow" class="org.springframework.data.hadoop.cascading.HadoopFlowFactoryBean" p:configuration-ref="hadoopConfiguration" p:source-ref="tap" p:sinks-ref="sinks"> <property name="tails"><list> <ref bean="tsCountPipe"/> <ref bean="tmCountPipe"/> </list></property> </bean> </list></property> </bean> <hdp:cascading-cascade flow="analysisFlow" /> <hdp:cascading-runner unit-of-work-ref="cascade" run-at-startup="true"/>
The XML above, whose main purpose is to illustrate possible ways of configuring, uses SHDP classes to create a Cascade
with one nested Flow
using the taps
and sinks configured by the code class. Additionally it also shows how the cascade is ran (through cascading-runner
).
The runner will trigger the execution during the application start-up (notice
the run-at-startup
flag which is by default false
). Do note that the runner will not run unless triggered manually or if run-at-startup
is set to true
.
Additionally the runner (as in fact do all runners in SHDP) allows one or
multiple pre
and post
actions to be specified to be executed before and after each run. Typically other runners (such as other jobs or scripts) can be specified but any JDK Callable
can be
passed in. For more information on runners, see the dedicated chapter.
Whether XML or Java config is better is up to the user and is usually based on the type of the configuration required. Java config suits Cascading better but note that the FactoryBean
s
above handle the life-cycle and some default configuration for both the Flow
and Cascade
object. Either way, whatever option is used, SHDP fully supports it.
For Spring Batch environments, SHDP provides a dedicated tasklet (similar to CascadeRunner
above) for executing Cascade
or Flow
instances, on demand, as part of a batch or workflow. The declaration is pretty straight forward:
<hdp:tasklet p:unit-of-work-ref="cascade" />
There are quite a number of DSLs built on top of Cascading, most noteably Cascalog (written in Clojure) and Scalding (written in Scala). This documentation will cover Scalding however the same concepts can be applied across the board to the DSLs.
As with the rest of the DSLs, Scalding offers a simplified, fluent syntax for creating units of code that built on top of Cascading. This in turn translate to Map Reduce jobs that get executed on Hadoop.
Once compiled, the DSL gets translated into actual JVM classes that get executed by Scalding through its own Tool
instance (namely com.twitter.scalding.Tool
).
One has the option or either deploy the Scalding jobs directly (by invoking the aforementioned Tool
) or use Scalding's scald.rb
script which does the same thing based
on the various attributes passed to it. Both approaches can be used in SHDP, the former through the Tool support (described below) and the latter by invoking the
scald.rb
script directly through the scripting feature.
For example, to run the tutorial examples (say Tutorial1), one can issue the following command:
scripts/scald.rb --local tutorial/Tutorial1.scala
which compiles Tutorial1, creates a bundled jar and runs it on a local Hadoop instance. When using the Tool
support, the compilation and the library provisioning are external tasks
(just as in the case of typical Hadoop jobs). The SHDP configuration to run the tutorial looks as follows:
<!-- the tool automatically is injected with 'hadoopConfiguration' --> <hdp:tool-runner id="scalding" tool-class="com.twitter.scalding.Tool"> <hdp:arg value="tutorial/Tutorial1"/> <hdp:arg value="--local"/> </hdp:tool-runner>
Besides dedicated configuration support, SHDP also provides read-only Tap
implementations useful inside Spring environments. Currently they are meant for
local use only such as testing or single-node Hadoop setups.
The Tap
s in org.springframework.data.hadoop.cascading.tap.local
tap (pun intended) into the rich resource support from Spring Framework and Spring Integration
allowing data to flow easily in and out of a Cascading flow.
Below is a list of the type of Tap
s available and their backing support.
Table 8.1. Local Tap
s
Tap Name | Tap Type | Backing Resource | Resource Description |
---|---|---|---|
ResourceTap | Source | Spring Resource | classpath, file-system, URL-based or even in-memory content |
MessageSourceTap | Source | Spring Integration MessageSource | Inbound adapter for anything from arbitrary streams, FTP or JDBC to RSS/Atom and Twitter |
MessageHandlerTap | Sink | Spring Integration MessageHandler | The opposite of MessageSourceTap : Outbound adapter for Files, JMS, TCP, etc... |
Note the Tap
s do not require any special configuration and are fully compatible with the existing Cascading local Scheme
s. To wit:
<bean id="cp-txt-files" class="org.springframework.data.hadoop.cascading.tap.local.ResourceTap"> <constructor-arg><bean class="cascading.scheme.local.TextLine"/></constructor-arg> <constructor-arg><value>classpath:/data/*.txt</value></constructor-arg> </bean>
The Tap
above, reads all the text files in the classpath, under data
folder, through Cascading TextLine
. Simply wire that to a Cascading flow
(as described in the previous section) and you are good to go.