Starting with Spring for Apache Hadoop 2.3 we have added a new Spring Batch tasklet for launching Spark jobs in YARN. This support requires access to the Spark Assembly jar that is shipped as part of the Spark distribution. We recommend copying this jar file to a shared location in HDFS. In the example below we chave already copied this jar file to HDFS with the path hdfs:///app/spark/spark-assembly-1.5.0-hadoop2.6.0.jar. You also need your Spark app built and ready to be executed. In the example below we are referencing a pre-built app jar file named spark-hashtags_2.10-0.1.0.jar located in an app directory in our project. The Spark job will be launched using the Spark YARN integration so there is no need to have a separate Spark cluster for this example.
The example Spark job will read an input file containing tweets in a JSON format. It will extract and count hashtags and then print the top 10 hashtags found with their counts. This is a very simplified example, but it serves its purpose for this example.
/* Hashtags.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import scala.util.parsing.json._
object Hashtags {
def main(args: Array[String]) {
val tweetFile = args(0)
val top10Dir = args(1)
val conf = new SparkConf().setAppName("Hashtags")
val sc = new SparkContext(conf)
val tweetdata = sc.textFile(tweetFile)
val tweets = tweetdata.map(line => JSON.parseFull(line).get.asInstanceOf[Map[String, Any]])
val hashTags = tweets.flatMap(map => map.get("text").toString().split(" ").filter(_.startsWith("#")))
val hashTagsCounts = hashTags.map((_, 1)).reduceByKey((a, b) => a + b)
val top10 = hashTagsCounts.map{case (t, c) => (c, t)}.sortByKey(false).map{case (c, t) => (t, c)}.take(10)
val top10HashTags = sc.parallelize(top10)
top10HashTags.saveAsTextFile(top10Dir)
println("Top 10 hashtags:")
top10.foreach(println)
sc.stop()
}
}
We can build this app and package it in a jar file. In this example it is placed in an app directory in our Spring project.
We create a Spring Boot project to host our Java code for this example. The Spring configuration file is the following, first the Hadoop configuration, the application property values and the Job configuration:
@Configuration public class SparkYarnConfiguration { @Autowired private org.apache.hadoop.conf.Configuration hadoopConfiguration; @Value("${example.inputDir}") String inputDir; @Value("${example.inputFileName}") String inputFileName; @Value("${example.inputLocalDir}") String inputLocalDir; @Value("${example.outputDir}") String outputDir; @Value("${example.sparkAssembly}") String sparkAssembly; // Job definition @Bean Job tweetHashtags(JobBuilderFactory jobs, Step initScript, Step sparkTopHashtags) throws Exception { return jobs.get("TweetTopHashtags") .start(initScript) .next(sparkTopHashtags) .build(); }
Our batch job consist of two steps. First we run an init script to copy the data file to HDFS using an HdfsScriptRunner:
// Step 1 - Init Script @Bean Step initScript(StepBuilderFactory steps, Tasklet scriptTasklet) throws Exception { return steps.get("initScript") .tasklet(scriptTasklet) .build(); } @Bean ScriptTasklet scriptTasklet(HdfsScriptRunner scriptRunner) { ScriptTasklet scriptTasklet = new ScriptTasklet(); scriptTasklet.setScriptCallback(scriptRunner); return scriptTasklet; } @Bean HdfsScriptRunner scriptRunner() { ScriptSource script = new ResourceScriptSource(new ClassPathResource("fileCopy.js")); HdfsScriptRunner scriptRunner = new HdfsScriptRunner(); scriptRunner.setConfiguration(hadoopConfiguration); scriptRunner.setLanguage("javascript"); Map<String, Object> arguments = new HashMap<>(); arguments.put("source", inputLocalDir); arguments.put("file", inputFileName); arguments.put("indir", inputDir); arguments.put("outdir", outputDir); scriptRunner.setArguments(arguments); scriptRunner.setScriptSource(script); return scriptRunner; }
The HdfsScriptRunner uses the following JavaScript:
if (fsh.test(indir)) { fsh.rmr(indir); } if (fsh.test(outdir)) { fsh.rmr(outdir); } fsh.copyFromLocal(source+'/'+file, indir+'/'+file);
The second step is to configure and execute the SparkYarnTasklet:
// Step 2 - Spark Top Hashtags @Bean Step sparkTopHashtags(StepBuilderFactory steps, Tasklet sparkTopHashtagsTasklet) throws Exception { return steps.get("sparkTopHashtags") .tasklet(sparkTopHashtagsTasklet) .build(); } @Bean SparkYarnTasklet sparkTopHashtagsTasklet() throws Exception { SparkYarnTasklet sparkTasklet = new SparkYarnTasklet(); sparkTasklet.setSparkAssemblyJar(sparkAssembly); sparkTasklet.setHadoopConfiguration(hadoopConfiguration); sparkTasklet.setAppClass("Hashtags"); File jarFile = new File(System.getProperty("user.dir") + "/app/spark-hashtags_2.10-0.1.0.jar"); sparkTasklet.setAppJar(jarFile.toURI().toString()); sparkTasklet.setExecutorMemory("1G"); sparkTasklet.setNumExecutors(1); sparkTasklet.setArguments(new String[]{ hadoopConfiguration.get("fs.defaultFS") + inputDir + "/" + inputFileName, hadoopConfiguration.get("fs.defaultFS") + outputDir}); return sparkTasklet; } }
For the SparkYarnTasklet, we set the following properties:
We are now ready to build and run this application example. The Spring Boot driver application is the following:
@SpringBootApplication @EnableBatchProcessing public class SparkYarnApplication implements CommandLineRunner { @Autowired JobLauncher jobLauncher; @Autowired Job tweetTopHashtags; public static void main(String[] args) { SpringApplication.run(SparkYarnApplication.class, args); } @Override public void run(String... args) throws Exception { System.out.println("RUNNING ..."); jobLauncher.run(tweetTopHashtags, new JobParametersBuilder().toJobParameters()); } }
We used the @EnableBatchProcessing annotation to enable the batch features for Spring Boot.
This can now be built using the following Maven POM file:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.springdeveloper.demo</groupId> <artifactId>batch-spark</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>batch-spark</name> <description>Demo project for Spring Batch SparkYarn tasklet</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.3.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.7</java.version> <spring-data-hadoop.version>2.3.0.RELEASE</spring-data-hadoop.version> <spark.version>1.5.0</spark.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j</artifactId> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop-boot</artifactId> <version>${spring-data-hadoop.version}</version> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop-batch</artifactId> <version>${spring-data-hadoop.version}</version> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop-spark</artifactId> <version>${spring-data-hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-yarn_2.10</artifactId> <version>${spark.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
We are using the spring-data-hadoop-spark and spring-data-hadoop-batch artifacts for bringing in the batch features we need.
We are also using the spring-data-hadoop-boot artifact to enable Boot to autoconfigure our Hadoop configuration.
Application configuration is provided in our application.yml file:
spring:
batch:
job:
enabled: false
hadoop:
fsUri: hdfs://borneo:8020
resourceManagerHost: borneo
example:
inputLocalDir: data
inputFileName: tweets.dat
inputDir: /tmp/hashtags/input
outputDir: /tmp/hashtags/output
sparkAssembly: hdfs:///app/spark/spark-assembly-1.5.0-hadoop2.6.0.jar
We are using configuration settings that work with the SpringOne-2015-Edition Vagrant hadoop installation available here https://github.com/trisberg/hadoop-install.
To build and run this example use
mvn clean package java -jar target/batch-spark-0.0.1-SNAPSHOT.jar