THOMAS BREDILLET
- Machine Learning, Statistics, Computer Science

aka 'tnb'

Thomas

Apache Spark and Machine Learning

Jul 28, 2014

I’ve recently started using quite heavily Apache Spark and I wanted to write a few of my impressions.

What’s Apache Spark ?

Apache Spark is “a fast and general engine for large-scale data processing” basicaly this means that Spark helps you do computations on big data. You can use it by installing spark on a cluster of servers, or even locally, there are a tons of scripts to do that and most of them are provided by Spark themselves. Once you’ve got your cluster up and running you can simple run a Scala, Java or Python app that uses the Apache Spark library to run some specific operations that harnest the full power of your cluster.

Example : A simple way to count words in a bunch of files (in Scala)

  
    val files = spark.textFile("s3n://myawesomebucket/alotoftxtfiles/*.txt") 
    val counts = files.flatMap(line => line.split(" "))
                .map(word => (word, 1))
                .reduceByKey(_ + _)
    counts.saveAsTextFile("s3n://myawesomebucket/result")
  

If you want to understand how this codes work there’s nothing better than the spark documentation.

How to use Spark.

First you’ll need to set up a brand new spark cluster ( again this can be done as a standalone locally if you want ).

I like EC2 so I’m going to use the ec2-script, it words great.

Once you have your cluster up and running, simply enough you’ll need to write a spark application. I personally use SBT. My project structure is as follows :

  
  	./build.sbt
	./src/main/scala/WordCountExample.scala
  	
  

With my WordCountExample.scala :

  
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

object WordCountExample {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Aggregator")
    .set("spark.storage.memoryFraction", "0.2")
    .set("spark.shuffle.memoryFraction", "0.5")
    val sc = new SparkContext(conf)

    val files = spark.textFile("s3n://myawesomebucket/alotoftxtfiles/*.txt") 
    val counts = files.flatMap(line => line.split(" "))
                .map(word => (word, 1))
                .reduceByKey(_ + _)
    counts.saveAsTextFile("s3n://myawesomebucket/result")
    }
  }

  

And my build.sbt :

  
name := "WordCountExample"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.1"

resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/"

  

You now have everything you need! Just run sbt compile and send the jar to your cluster.

Once you’re done I use the following command to run the jar :

  
nohup ./bin/spark-submit --class "WordCountExample" > /tmp/WordCount.log 2>&1 &
  

And that’s it your application will be running you can follow it in the /tmp/WordCount.log file.

You’ll see how it splits the calculations per number of files you have in your bucket, then for each file it will run the reduceByKey and aggregate it all back at the end.

What I’ve Used it for and what I’ve learned

Now that the basic stuff is out of the way I can tell you a bit more about how I used it. Basically I ran two ‘benchmark’ jobs on spark. The first one is going through hundreds of millions of lines of text to match against a specific regex and counts the number of matches. The second one is training a neural network ( backpropagation ) by first generating a random training set and then benchmarking the NN performance over the whole data set. In both case I ran this against the same cluster, 4 slaves and all machines were m3.2xlarge. Also for the NN I used an Akka application on my macbook with remote actors on my master than ran my jobs, overwhole that was very easy to setup.

A few things I’ve learning :

  • There are multiple ways to do the same thing ( you can do the word count using a countByValue for example ) think carefuly about which function is going to distribute the job to the cluster and which function may create a bottleneck somewhere ( one machine used for a heavy aggregation for example ).
  • Spark is easier to use than Hadoop
  • Don’t use the ec2 script on production it’s better to create your own AMIs. Also the ec2 scripts security groups are very open so be mindful.
  • There are a LOT of built in functionalities already in Spark it’s basicaly limitless
  • Spark is super fast
  • Spark works very well as an Akka actor

Parting words

Spark is awesome indeed, fantastic potential but there are still some things that bugs me. The ec2 script does its job well but it’s not at all flexible, you can start new machine in your cluster or remove exisiting slaves for example. It would be a LOT better if the whole thing was done with AWS Cloudformation where the slaves could be in a nice auto scaling group.