10. Apache Spark integration

Starting with Spring for Apache Hadoop 2.3 we have added a new Spring Batch tasklet for launching Spark jobs in YARN. This support requires access to the Spark Assembly jar that is shipped as part of the Spark distribution. We recommend copying this jar file to a shared location in HDFS. In the example below we chave already copied this jar file to HDFS with the path hdfs:///app/spark/spark-assembly-1.5.0-hadoop2.6.0.jar. You also need your Spark app built and ready to be executed. In the example below we are referencing a pre-built app jar file named spark-hashtags_2.10-0.1.0.jar located in an app directory in our project. The Spark job will be launched using the Spark YARN integration so there is no need to have a separate Spark cluster for this example.

10.1 Simple example for running a Spark YARN Tasklet

The example Spark job will read an input file containing tweets in a JSON format. It will extract and count hashtags and then print the top 10 hashtags found with their counts. This is a very simplified example, but it serves its purpose for this example.

/* Hashtags.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import scala.util.parsing.json._

object Hashtags {
  def main(args: Array[String]) {
    val tweetFile = args(0)
    val top10Dir = args(1)

    val conf = new SparkConf().setAppName("Hashtags")
    val sc = new SparkContext(conf)

    val tweetdata = sc.textFile(tweetFile)
    val tweets = tweetdata.map(line => JSON.parseFull(line).get.asInstanceOf[Map[String, Any]])

    val hashTags = tweets.flatMap(map => map.get("text").toString().split(" ").filter(_.startsWith("#")))
    val hashTagsCounts = hashTags.map((_, 1)).reduceByKey((a, b) => a + b)
    val top10 = hashTagsCounts.map{case (t, c) => (c, t)}.sortByKey(false).map{case (c, t) => (t, c)}.take(10)

    val top10HashTags = sc.parallelize(top10)
    top10HashTags.saveAsTextFile(top10Dir)

    println("Top 10 hashtags:")
    top10.foreach(println)

    sc.stop()
  }
}

We can build this app and package it in a jar file. In this example it is placed in an app directory in our Spring project.

We create a Spring Boot project to host our Java code for this example. The Spring configuration file is the following, first the Hadoop configuration, the application property values and the Job configuration:

@Configuration
public class SparkYarnConfiguration {

  @Autowired
  private org.apache.hadoop.conf.Configuration hadoopConfiguration;

  @Value("${example.inputDir}")
  String inputDir;

  @Value("${example.inputFileName}")
  String inputFileName;

  @Value("${example.inputLocalDir}")
  String inputLocalDir;

  @Value("${example.outputDir}")
  String outputDir;

  @Value("${example.sparkAssembly}")
  String sparkAssembly;

  // Job definition
  @Bean
  Job tweetHashtags(JobBuilderFactory jobs, Step initScript, Step sparkTopHashtags) throws Exception {
      return jobs.get("TweetTopHashtags")
          .start(initScript)
          .next(sparkTopHashtags)
          .build();
  }

Our batch job consist of two steps. First we run an init script to copy the data file to HDFS using an HdfsScriptRunner:

  // Step 1 - Init Script
  @Bean
    Step initScript(StepBuilderFactory steps, Tasklet scriptTasklet) throws Exception {
    return steps.get("initScript")
        .tasklet(scriptTasklet)
            .build();
    }

  @Bean
  ScriptTasklet scriptTasklet(HdfsScriptRunner scriptRunner) {
    ScriptTasklet scriptTasklet = new ScriptTasklet();
    scriptTasklet.setScriptCallback(scriptRunner);
    return scriptTasklet;
  }

  @Bean HdfsScriptRunner scriptRunner() {
    ScriptSource script = new ResourceScriptSource(new ClassPathResource("fileCopy.js"));
    HdfsScriptRunner scriptRunner = new HdfsScriptRunner();
    scriptRunner.setConfiguration(hadoopConfiguration);
    scriptRunner.setLanguage("javascript");
    Map<String, Object> arguments = new HashMap<>();
    arguments.put("source", inputLocalDir);
    arguments.put("file", inputFileName);
    arguments.put("indir", inputDir);
    arguments.put("outdir", outputDir);
    scriptRunner.setArguments(arguments);
    scriptRunner.setScriptSource(script);
    return scriptRunner;
  }

The HdfsScriptRunner uses the following JavaScript:

if (fsh.test(indir)) {
  fsh.rmr(indir);
}
if (fsh.test(outdir)) {
  fsh.rmr(outdir);
}
fsh.copyFromLocal(source+'/'+file, indir+'/'+file);

The second step is to configure and execute the SparkYarnTasklet:

  // Step 2 - Spark Top Hashtags
  @Bean
    Step sparkTopHashtags(StepBuilderFactory steps, Tasklet sparkTopHashtagsTasklet) throws Exception {
    return steps.get("sparkTopHashtags")
        .tasklet(sparkTopHashtagsTasklet)
            .build();
    }

  @Bean
  SparkYarnTasklet sparkTopHashtagsTasklet() throws Exception {
    SparkYarnTasklet sparkTasklet = new SparkYarnTasklet();
    sparkTasklet.setSparkAssemblyJar(sparkAssembly);
    sparkTasklet.setHadoopConfiguration(hadoopConfiguration);
    sparkTasklet.setAppClass("Hashtags");
    File jarFile = new File(System.getProperty("user.dir") + "/app/spark-hashtags_2.10-0.1.0.jar");
    sparkTasklet.setAppJar(jarFile.toURI().toString());
    sparkTasklet.setExecutorMemory("1G");
    sparkTasklet.setNumExecutors(1);
    sparkTasklet.setArguments(new String[]{
        hadoopConfiguration.get("fs.defaultFS") + inputDir + "/" + inputFileName,
        hadoopConfiguration.get("fs.defaultFS") + outputDir});
    return sparkTasklet;
  }
}

For the SparkYarnTasklet, we set the following properties:

  • sparkAssemblyJar: the path to the Spark Assembly jar file
  • hadoopConfiguration: a refernce of the standard Spring Hadoop Configuration that we are having autowired by Spring Boot
  • appClass: the name of the Spark application class, in this case "Hashtags"
  • appJar: the path to the Spark application jar file
  • executorMemory: the memory for the executoor, "1G" in this example
  • numExecutors: the number of Spark executors, we use only 1 for this small example
  • arguments: app arguments as a String array, for this app the argument is the HDFS path to the data file

We are now ready to build and run this application example. The Spring Boot driver application is the following:

@SpringBootApplication
@EnableBatchProcessing
public class SparkYarnApplication implements CommandLineRunner {

  @Autowired
  JobLauncher jobLauncher;

  @Autowired
  Job tweetTopHashtags;

  public static void main(String[] args) {
    SpringApplication.run(SparkYarnApplication.class, args);
  }

  @Override
  public void run(String... args) throws Exception {
    System.out.println("RUNNING ...");
    jobLauncher.run(tweetTopHashtags, new JobParametersBuilder().toJobParameters());
  }

}

We used the @EnableBatchProcessing annotation to enable the batch features for Spring Boot.

This can now be built using the following Maven POM file:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.springdeveloper.demo</groupId>
  <artifactId>batch-spark</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>batch-spark</name>
  <description>Demo project for Spring Batch SparkYarn tasklet</description>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.3.1.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.7</java.version>
    <spring-data-hadoop.version>2.3.0.RELEASE</spring-data-hadoop.version>
    <spark.version>1.5.0</spark.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-batch</artifactId>
      <exclusions>
        <exclusion>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-logging</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-log4j</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.data</groupId>
      <artifactId>spring-data-hadoop-boot</artifactId>
      <version>${spring-data-hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.data</groupId>
      <artifactId>spring-data-hadoop-batch</artifactId>
      <version>${spring-data-hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.data</groupId>
      <artifactId>spring-data-hadoop-spark</artifactId>
      <version>${spring-data-hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-yarn_2.10</artifactId>
      <version>${spark.version}</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>

</project>

We are using the spring-data-hadoop-spark and spring-data-hadoop-batch artifacts for bringing in the batch features we need. We are also using the spring-data-hadoop-boot artifact to enable Boot to autoconfigure our Hadoop configuration.

Application configuration is provided in our application.yml file:

spring:
  batch:
    job:
      enabled: false
  hadoop:
    fsUri: hdfs://borneo:8020
    resourceManagerHost: borneo
example:
  inputLocalDir: data
  inputFileName: tweets.dat
  inputDir: /tmp/hashtags/input
  outputDir: /tmp/hashtags/output
  sparkAssembly: hdfs:///app/spark/spark-assembly-1.5.0-hadoop2.6.0.jar

We are using configuration settings that work with the SpringOne-2015-Edition Vagrant hadoop installation available here https://github.com/trisberg/hadoop-install.

To build and run this example use

mvn clean package
java -jar target/batch-spark-0.0.1-SNAPSHOT.jar