7. Cascading integration

SHDP provides basic support for Cascading library through the org.springframework.data.hadoop.cascading package - one can create Flows or Cascades, either through XML or/and Java and execute them, either in a simplistic manner or as part of a Spring Batch job. In addition, dedicated Taps for Spring environments are available.

As Cascading is aimed at code configuration, typically one would configure the library programatically. This type of configuration is supported through Spring's @Configuration and @Bean (see this chapter for more information). In short one use Java code (or any JVM language for that matter) to create beans. Below is an example of using that to create various Cascading components (do refer to the Cascading examples for more context):

@Configuration
public class CascadingAnalysisConfig {
    // fields that act as placeholders for externalized values
    @Value("${cascade.sec}") private String sec;
    @Value("${cascade.min}") private String min;
    
    @Bean public Pipe tsPipe() {
        DateParser dateParser = new DateParser(new Fields("ts"), "dd/MMM/yyyy:HH:mm:ss Z");
        return new Each("arrival rate", new Fields("time"), dateParser);
    }

    @Bean public Pipe tsCountPipe() {
        Pipe tsCountPipe = new Pipe("tsCount", tsPipe());
        tsCountPipe = new GroupBy(tsCountPipe, new Fields("ts"));
        return new Every(tsCountPipe, Fields.GROUP, new Count());
    }

    @Bean public Pipe tmCountPipe() {
        Pipe tmPipe = new Each(tsPipe(),
                new ExpressionFunction(new Fields("tm"), "ts - (ts % (60 * 1000))", long.class));
        Pipe tmCountPipe = new Pipe("tmCount", tmPipe);
        tmCountPipe = new GroupBy(tmCountPipe, new Fields("tm"));
        return new Every(tmCountPipe, Fields.GROUP, new Count());
    }

    @Bean public Map<String, Tap> sinks(){
        Tap tsSinkTap = new Hfs(new TextLine(), sec);
        Tap tmSinkTap = new Hfs(new TextLine(), min);
        return Cascades.tapsMap(Pipe.pipes(tsCountPipe(), tmCountPipe()), Tap.taps(tsSinkTap, tmSinkTap));
    }

    @Bean public String regex() {
        return "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*) [^ ]*\\\" ([^ ]*) ([^ ]*).*$";
    }
    
    @Bean public Fields fields() {
        return new Fields("ip", "time", "method", "event", "status", "size");
    }
}

The class above creates several objects (all part of the Cascading package) (named after the methods) which can be injected or wired just like any other bean (notice how the wiring is done between the beans by point to their methods). One can mix and match (if needed) code and XML configurations inside the same application:

<!-- code configuration class -->
<bean class="org.springframework.data.hadoop.cascading.CascadingAnalysisConfig"/>

<!-- Tap created through XML rather then code (using Spring's 3.1 c: namespace)-->
<bean id="tap" class="cascading.tap.hadoop.Hfs" c:fields-ref="fields" c:string-path-value="${cascade.input}"/>

<bean id="cascade" class="org.springframework.data.hadoop.cascading.CascadeFactoryBean" p:configuration-ref="hadoopConfiguration">
  <property name="flows"><list>
    <bean class="org.springframework.data.hadoop.cascading.HadoopFlowFactoryBean"
      p:configuration-ref="hadoopConfiguration" p:source-ref="tap" p:sinks-ref="sinks">
     <property name="tails"><list>
         <ref bean="tsCountPipe"/>
         <ref bean="tmCountPipe"/>
     </list></property>
    </bean>
  </list></property>
</bean>
	
<bean id="cascade-runner" class="org.springframework.data.hadoop.cascading.CascadeRunner" p:unit-of-work-ref="cascade" p:run-at-startup="true" />

The XML above, whose main purpose is to illustrate possible ways of configuring, uses SHDP classes to create a Cascade with one nested Flow using the taps and sinks configured by the code class. Additionally it also shows how the cascade is ran (through CascadeRunner).

Whether XML or Java config is better is up to the user and is usually based on the type of the configuration required. Java config suits Cascading better but note that the FactoryBeans above handle the life-cycle and some default configuration for both the Flow and Cascade object. Either way, whatever option is used, SHDP fully supports it.

7.1 Using the Cascading tasklet

For Spring Batch environments, SHDP provides a dedicated tasklet (similar to CascadeRunner above) for executing Cascade or Flow instances, on demand, as part of a batch or workflow. The declaration is pretty straight forward:

<bean id="cascade-tasklet" class="org.springframework.data.hadoop.cascading.CascadeTasklet" p:unit-of-work-ref="cascade" />

7.2 Using Scalding

There are quite a number of DSLs built on top of Cascading, most noteably Cascalog (written in Clojure) and Scalding (written in Scala). This documentation will cover Scalding however the same concepts can be applied across the board to the DSLs.

As with the rest of the DSLs, Scalding offers a simplified, fluent syntax for creating units of code that built on top of Cascading. This in turn translate to Map Reduce jobs that get executed on Hadoop. Once compiled, the DSL gets translated into actual JVM classes that get executed by Scalding through its own Tool instance (namely com.twitter.scalding.Tool). One has the option or either deploy the Scalding jobs directly (by invoking the aforementioned Tool) or use Scalding's scald.rb script which does the same thing based on the various attributes passed to it. Both approaches can be used in SHDP, the former through the Tool support (described below) and the latter by invoking the scald.rb script directly through the scripting feature.

For example, to run the tutorial examples (say Tutorial1), one can issue the following command:

scripts/scald.rb --local tutorial/Tutorial1.scala

which compiles Tutorial1, creates a bundled jar and runs it on a local Hadoop instance. When using the Tool support, the compilation and the library provisioning are external tasks (just as in the case of typical Hadoop jobs). The SHDP configuration to run the tutorial looks as follows:

<!-- the tool automatically is injected with 'hadoopConfiguration' -->
<hdp:tool-runner id="scalding" tool-class="com.twitter.scalding.Tool">
   <hdp:arg value="tutorial/Tutorial1"/>
   <hdp:arg value="--local"/>
</hdp:tool-runner>

7.3 Spring-specific local Taps

Besides dedicated configuration support, SHDP also provides read-only Tap implementations useful inside Spring environments. Currently they are meant for local use only such as testing or single-node Hadoop setups.

The Taps in org.springframework.data.hadoop.cascading.tap.local tap (pun intended) into the rich resource support from Spring Framework and Spring Integration allowing data to flow easily in and out of a Cascading flow.

Below is a list of the type of Taps available and their backing support.

Table 7.1. Local Taps

Tap NameTap TypeBacking ResourceResource Description
ResourceTapSourceSpring Resourceclasspath, file-system, URL-based or even in-memory content
MessageSourceTapSourceSpring Integration MessageSourceInbound adapter for anything from arbitrary streams, FTP or JDBC to RSS/Atom and Twitter
MessageHandlerTapSinkSpring Integration MessageHandlerThe opposite of MessageSourceTap: Outbound adapter for Files, JMS, TCP, etc...

Note the Taps do not require any special configuration and are fully compatible with the existing Cascading local Schemes. To wit:

<bean id="cp-txt-files" class="org.springframework.data.hadoop.cascading.tap.local.ResourceTap">
	<constructor-arg><bean class="cascading.scheme.local.TextLine"/></constructor-arg>
	<constructor-arg><value>classpath:/data/*.txt</value></constructor-arg>
</bean>

The Tap above, reads all the text files in the classpath, under data folder, through Cascading TextLine. Simply wire that to a Cascading flow (as described in the previous section) and you are good to go.