Friday, 3 June 2016

How to write Spark jobs in Java for Spark Job Server

In the previous post, we learnt about setting up Spark job server, and running the spark jobs. So far, we have used Scala programs to run on job server. Now we’ll see, how to write the Spark jobs in java to run on job server.

As in Scala, job must implement the SparkJob trait.  So the job looks like this:

object SampleJob  extends SparkJob {
    override def runJob(sc:SparkContext, jobConfig: Config): Any = ???
    override def validate(sc:SparkContext, config: Config): SparkJobValidation = ???
}

What these methods are:
  • runJob method contains the implementation of the Job. The SparkContext is managed by the JobServer and will be provided to the job through this method. This relieves the developer from the boiler-plate configuration management that comes with the creation of a Spark job and allows the Job Server to manage and re-use contexts.
  • validate method allows for an initial validation of the context and any provided configuration. If the context and configuration are OK to run the job, returning spark.jobserver.SparkJobValid will let the job execute, otherwise returning spark.jobserver.SparkJobInvalid(reason) prevents the job from running and provides means to convey the reason of failure. In this case, the call immediately returns an HTTP/1.1 400 Bad Request status code.
    validate helps preventing running jobs that will eventually fail due to missing or wrong configuration and save both time and resources.

In Java, we need to extend JavaSparkJob class. It has following methods which will be overridden in the program:
  • runJob(jsc: JavaSparkContext, jobConfig: Config)
  • validate(sc: SparkContext, config: Config)
  • invalidate(jsc: JavaSparkContext, config: Config)

JavaSparkJob class is available in job-server-api package. Build the job-server-api source code and add this jar to your project.  Add spark and other required dependencies in your pom.xml. 

Let’s start with the basic WordCount example:

WordCount.java:

package spark.jobserver;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

import org.apache.commons.lang.StringUtils;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import spark.jobserver.JavaSparkJob;
import spark.jobserver.SparkJobInvalid;
import spark.jobserver.SparkJobValid$;
import spark.jobserver.SparkJobValidation;
import com.typesafe.config.Config;
public class Wordcount extends JavaSparkJob implements Serializable { 
       private static final long serialVersionUID = 1L;
private static final Pattern SPACE = Pattern.compile(" ");
static String fileName = StringUtils.EMPTY;

       public Object runJob(JavaSparkContext jsc, Config config) {
              try {
                     JavaRDD<String> lines = jsc.textFile(
                                  config.getString("input.filename"), 1);
                     JavaRDD<String> words = lines
                                  .flatMap(new FlatMapFunction<String, String>() {
                                         public Iterable<String> call(String s) {
                                                return Arrays.asList(SPACE.split(s));
                                         }
                                  });
                     JavaPairRDD<String, Integer> counts = words.mapToPair(
                                  new PairFunction<String, String, Integer>() {
                                         public Tuple2<String, Integer> call(String s) {
                                                return new Tuple2<String, Integer>(s, 1);
                                         }
                                  }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                           public Integer call(Integer i1, Integer i2) {
                                  return i1 + i2;
                           }
                     });
                     List<Tuple2<String, Integer>> output = counts.collect();
                     System.out.println(output);
                     return output;
              } catch (Exception e) {
                     e.printStackTrace();
                     return null;
              }
       }

       public SparkJobValidation validate(SparkContext sc, Config config) {
              String filename = config.getString("input.filename");
              if (!filename.isEmpty()) {
                     return SparkJobValid$.MODULE$;
              } else {
                     return new SparkJobInvalid(
                                  "Input paramerter is missing. Please mention the filename");
              }
       }

       public String invalidate(JavaSparkContext jsc, Config config) {
              return null;
       }
}

Next step is : compile the code and build the jar. Then upload it to the Job server.

So your Spark job is ready to run on JobServer....!!!