Starting with Spring for Apache Hadoop 2.3 we have added a new Spring Batch tasklet for launching Spark jobs in YARN. This support requires access to the Spark Assembly jar that is shipped as part of the Spark distribution. We recommend copying this jar file to a shared location in HDFS. In the example below we chave already copied this jar file to HDFS with the path hdfs:///app/spark/spark-assembly-1.5.0-hadoop2.6.0.jar
. You also need your Spark app built and ready to be executed. In the example below we are referencing a pre-built app jar file named spark-hashtags_2.10-0.1.0.jar
located in an app
directory in our project. The Spark job will be launched using the Spark YARN integration so there is no need to have a separate Spark cluster for this example.
The example Spark job will read an input file containing tweets in a JSON format. It will extract and count hashtags and then print the top 10 hashtags found with their counts. This is a very simplified example, but it serves its purpose for this example.
/* Hashtags.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkConf import scala.util.parsing.json._ object Hashtags { def main(args: Array[String]) { val tweetFile = args(0) val top10Dir = args(1) val conf = new SparkConf().setAppName("Hashtags") val sc = new SparkContext(conf) val tweetdata = sc.textFile(tweetFile) val tweets = tweetdata.map(line => JSON.parseFull(line).get.asInstanceOf[Map[String, Any]]) val hashTags = tweets.flatMap(map => map.get("text").toString().split(" ").filter(_.startsWith("#"))) val hashTagsCounts = hashTags.map((_, 1)).reduceByKey((a, b) => a + b) val top10 = hashTagsCounts.map{case (t, c) => (c, t)}.sortByKey(false).map{case (c, t) => (t, c)}.take(10) val top10HashTags = sc.parallelize(top10) top10HashTags.saveAsTextFile(top10Dir) println("Top 10 hashtags:") top10.foreach(println) sc.stop() } }
We can build this app and package it in a jar file. In this example it is placed in an app
directory in our Spring project.
We create a Spring Boot project to host our Java code for this example. The Spring configuration file is the following, first the Hadoop configuration, the application property values and the Job configuration:
@Configuration public class SparkYarnConfiguration { @Autowired private org.apache.hadoop.conf.Configuration hadoopConfiguration; @Value("${example.inputDir}") String inputDir; @Value("${example.inputFileName}") String inputFileName; @Value("${example.inputLocalDir}") String inputLocalDir; @Value("${example.outputDir}") String outputDir; @Value("${example.sparkAssembly}") String sparkAssembly; // Job definition @Bean Job tweetHashtags(JobBuilderFactory jobs, Step initScript, Step sparkTopHashtags) throws Exception { return jobs.get("TweetTopHashtags") .start(initScript) .next(sparkTopHashtags) .build(); }
Our batch job consist of two steps. First we run an init script to copy the data file to HDFS using an HdfsScriptRunner
:
// Step 1 - Init Script @Bean Step initScript(StepBuilderFactory steps, Tasklet scriptTasklet) throws Exception { return steps.get("initScript") .tasklet(scriptTasklet) .build(); } @Bean ScriptTasklet scriptTasklet(HdfsScriptRunner scriptRunner) { ScriptTasklet scriptTasklet = new ScriptTasklet(); scriptTasklet.setScriptCallback(scriptRunner); return scriptTasklet; } @Bean HdfsScriptRunner scriptRunner() { ScriptSource script = new ResourceScriptSource(new ClassPathResource("fileCopy.js")); HdfsScriptRunner scriptRunner = new HdfsScriptRunner(); scriptRunner.setConfiguration(hadoopConfiguration); scriptRunner.setLanguage("javascript"); Map<String, Object> arguments = new HashMap<>(); arguments.put("source", inputLocalDir); arguments.put("file", inputFileName); arguments.put("indir", inputDir); arguments.put("outdir", outputDir); scriptRunner.setArguments(arguments); scriptRunner.setScriptSource(script); return scriptRunner; }
The HdfsScriptRunner
uses the following JavaScript:
if (fsh.test(indir)) { fsh.rmr(indir); } if (fsh.test(outdir)) { fsh.rmr(outdir); } fsh.copyFromLocal(source+'/'+file, indir+'/'+file);
The second step is to configure and execute the SparkYarnTasklet
:
// Step 2 - Spark Top Hashtags @Bean Step sparkTopHashtags(StepBuilderFactory steps, Tasklet sparkTopHashtagsTasklet) throws Exception { return steps.get("sparkTopHashtags") .tasklet(sparkTopHashtagsTasklet) .build(); } @Bean SparkYarnTasklet sparkTopHashtagsTasklet() throws Exception { SparkYarnTasklet sparkTasklet = new SparkYarnTasklet(); sparkTasklet.setSparkAssemblyJar(sparkAssembly); sparkTasklet.setHadoopConfiguration(hadoopConfiguration); sparkTasklet.setAppClass("Hashtags"); File jarFile = new File(System.getProperty("user.dir") + "/app/spark-hashtags_2.10-0.1.0.jar"); sparkTasklet.setAppJar(jarFile.toURI().toString()); sparkTasklet.setExecutorMemory("1G"); sparkTasklet.setNumExecutors(1); sparkTasklet.setArguments(new String[]{ hadoopConfiguration.get("fs.defaultFS") + inputDir + "/" + inputFileName, hadoopConfiguration.get("fs.defaultFS") + outputDir}); return sparkTasklet; } }
For the SparkYarnTasklet
, we set the following properties:
We are now ready to build and run this application example. The Spring Boot driver application is the following:
@SpringBootApplication @EnableBatchProcessing public class SparkYarnApplication implements CommandLineRunner { @Autowired JobLauncher jobLauncher; @Autowired Job tweetTopHashtags; public static void main(String[] args) { SpringApplication.run(SparkYarnApplication.class, args); } @Override public void run(String... args) throws Exception { System.out.println("RUNNING ..."); jobLauncher.run(tweetTopHashtags, new JobParametersBuilder().toJobParameters()); } }
We used the @EnableBatchProcessing
annotation to enable the batch features for Spring Boot.
This can now be built using the following Maven POM file:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.springdeveloper.demo</groupId> <artifactId>batch-spark</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>batch-spark</name> <description>Demo project for Spring Batch SparkYarn tasklet</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.3.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.7</java.version> <spring-data-hadoop.version>2.3.0.RELEASE</spring-data-hadoop.version> <spark.version>1.5.0</spark.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j</artifactId> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop-boot</artifactId> <version>${spring-data-hadoop.version}</version> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop-batch</artifactId> <version>${spring-data-hadoop.version}</version> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop-spark</artifactId> <version>${spring-data-hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-yarn_2.10</artifactId> <version>${spark.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
We are using the spring-data-hadoop-spark
and spring-data-hadoop-batch
artifacts for bringing in the batch features we need.
We are also using the spring-data-hadoop-boot
artifact to enable Boot to autoconfigure our Hadoop configuration.
Application configuration is provided in our application.yml
file:
spring: batch: job: enabled: false hadoop: fsUri: hdfs://borneo:8020 resourceManagerHost: borneo example: inputLocalDir: data inputFileName: tweets.dat inputDir: /tmp/hashtags/input outputDir: /tmp/hashtags/output sparkAssembly: hdfs:///app/spark/spark-assembly-1.5.0-hadoop2.6.0.jar
We are using configuration settings that work with the SpringOne-2015-Edition
Vagrant hadoop installation available here https://github.com/trisberg/hadoop-install.
To build and run this example use
mvn clean package java -jar target/batch-spark-0.0.1-SNAPSHOT.jar