10. Apache Spark integration

Starting with Spring for Apache Hadoop 2.3 we have added a new Spring Batch tasklet for launching Spark jobs in YARN. This support requires access to the Spark Assembly jar that is shipped as part of the Spark distribution. We recommend copying this jar file to a shared location in HDFS. In the example below we chave already copied this jar file to HDFS with the path hdfs:///app/spark/spark-assembly-1.5.0-hadoop2.6.0.jar. You also need your Spark app built and ready to be executed. In the example below we are referencing a pre-built app jar file named spark-hashtags_2.10-0.1.0.jar located in an app directory in our project. The Spark job will be launched using the Spark YARN integration so there is no need to have a separate Spark cluster for this example.

10.1 Simple example for running a Spark YARN Tasklet

The example Spark job will read an input file containing tweets in a JSON format. It will extract and count hashtags and then print the top 10 hashtags found with their counts. This is a very simplified example, but it serves its purpose for this example.

/* Hashtags.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import scala.util.parsing.json._

object Hashtags {
  def main(args: Array[String]) {
    val tweetFile = args(0)
    val top10Dir = args(1)

    val conf = new SparkConf().setAppName("Hashtags")
    val sc = new SparkContext(conf)

    val tweetdata = sc.textFile(tweetFile)
    val tweets = tweetdata.map(line => JSON.parseFull(line).get.asInstanceOf[Map[String, Any]])

    val hashTags = tweets.flatMap(map => map.get("text").toString().split(" ").filter(_.startsWith("#")))
    val hashTagsCounts = hashTags.map((_, 1)).reduceByKey((a, b) => a + b)
    val top10 = hashTagsCounts.map{case (t, c) => (c, t)}.sortByKey(false).map{case (c, t) => (t, c)}.take(10)

    val top10HashTags = sc.parallelize(top10)

    println("Top 10 hashtags:")


We can build this app and package it in a jar file. In this example it is placed in an app directory in our Spring project.

We create a Spring Boot project to host our Java code for this example. The Spring configuration file is the following, first the Hadoop configuration, the application property values and the Job configuration:

public class SparkYarnConfiguration {

  private org.apache.hadoop.conf.Configuration hadoopConfiguration;

  String inputDir;

  String inputFileName;

  String inputLocalDir;

  String outputDir;

  String sparkAssembly;

  // Job definition
  Job tweetHashtags(JobBuilderFactory jobs, Step initScript, Step sparkTopHashtags) throws Exception {
      return jobs.get("TweetTopHashtags")

Our batch job consist of two steps. First we run an init script to copy the data file to HDFS using an HdfsScriptRunner:

  // Step 1 - Init Script
    Step initScript(StepBuilderFactory steps, Tasklet scriptTasklet) throws Exception {
    return steps.get("initScript")

  ScriptTasklet scriptTasklet(HdfsScriptRunner scriptRunner) {
    ScriptTasklet scriptTasklet = new ScriptTasklet();
    return scriptTasklet;

  @Bean HdfsScriptRunner scriptRunner() {
    ScriptSource script = new ResourceScriptSource(new ClassPathResource("fileCopy.js"));
    HdfsScriptRunner scriptRunner = new HdfsScriptRunner();
    Map<String, Object> arguments = new HashMap<>();
    arguments.put("source", inputLocalDir);
    arguments.put("file", inputFileName);
    arguments.put("indir", inputDir);
    arguments.put("outdir", outputDir);
    return scriptRunner;

The HdfsScriptRunner uses the following JavaScript:

if (fsh.test(indir)) {
if (fsh.test(outdir)) {
fsh.copyFromLocal(source+'/'+file, indir+'/'+file);

The second step is to configure and execute the SparkYarnTasklet:

  // Step 2 - Spark Top Hashtags
    Step sparkTopHashtags(StepBuilderFactory steps, Tasklet sparkTopHashtagsTasklet) throws Exception {
    return steps.get("sparkTopHashtags")

  SparkYarnTasklet sparkTopHashtagsTasklet() throws Exception {
    SparkYarnTasklet sparkTasklet = new SparkYarnTasklet();
    File jarFile = new File(System.getProperty("user.dir") + "/app/spark-hashtags_2.10-0.1.0.jar");
    sparkTasklet.setArguments(new String[]{
        hadoopConfiguration.get("fs.defaultFS") + inputDir + "/" + inputFileName,
        hadoopConfiguration.get("fs.defaultFS") + outputDir});
    return sparkTasklet;

For the SparkYarnTasklet, we set the following properties:

  • sparkAssembly: the path to the Spark Assembly jar file
  • hadoopConfiguration: a refernce of the standard Spring Hadoop Configuration that we are having autowired by Spring Boot
  • appClass: the name of the Spark application class, in this case "Hashtags"
  • appJar: the path to the Spark application jar file
  • executorMemory: the memory for the executoor, "1G" in this example
  • numExecutors: the number of Spark executors, we use only 1 for this small example
  • arguments: app arguments as a String array, for this app the argument is the HDFS path to the data file

We are now ready to build and run this application example. The Spring Boot driver application is the following:

public class SparkYarnApplication implements CommandLineRunner {

  JobLauncher jobLauncher;

  Job tweetTopHashtags;

  public static void main(String[] args) {
    SpringApplication.run(SparkYarnApplication.class, args);

  public void run(String... args) throws Exception {
    System.out.println("RUNNING ...");
    jobLauncher.run(tweetTopHashtags, new JobParametersBuilder().toJobParameters());


We used the @EnableBatchProcessing annotation to enable the batch features for Spring Boot.

This can now be built using the following Maven POM file:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">


  <description>Demo project for Spring Batch SparkYarn tasklet</description>

    <relativePath/> <!-- lookup parent from repository -->





We are using the spring-data-hadoop-spark and spring-data-hadoop-batch artifacts for bringing in the batch features we need. We are also using the spring-data-hadoop-boot artifact to enable Boot to autoconfigure our Hadoop configuration.

Application configuration is provided in our application.yml file:

      enabled: false
    fsUri: hdfs://borneo:8020
    resourceManagerHost: borneo
  inputLocalDir: data
  inputFileName: tweets.dat
  inputDir: /tmp/hashtags/input
  outputDir: /tmp/hashtags/output
  sparkAssembly: hdfs:///app/spark/spark-assembly-1.5.0-hadoop2.6.0.jar

We are using configuration settings that work with the SpringOne-2015-Edition Vagrant hadoop installation available here https://github.com/trisberg/hadoop-install.

To build and run this example use

mvn clean package
java -jar target/batch-spark-0.0.1-SNAPSHOT.jar