6.5840 2023 Lecture 17: Spark Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing Zaharia et al., NSDI 2012 today we're switching topics from storage to computation why are we looking at Spark? widely-used for datacenter computations generalizes MapReduce into dataflow better than MapReduce for iterative applications interactive exploration of a big dataset idea: dataflow graphs with RDDs three main topics: programming model execution strategy fault tolerance let's look at page-rank here's SparkPageRank.scala from the Spark source repository like the code in Section 3.2.2, with more detail 1 val lines = spark.read.textFile("in").rdd 2 val links1 = lines.map{ s => 3 val parts = s.split("\\s+") 4 (parts(0), parts(1)) 5 } 6 val links2 = links1.distinct() 7 val links3 = links2.groupByKey() 8 val links4 = links3.cache() 9 var ranks = links4.mapValues(v => 1.0) 10 11 for (i <- 1 to 10) { 12 val jj = links4.join(ranks) 13 val contribs = jj.values.flatMap{ 14 case (urls, rank) => 15 urls.map(url => (url, rank / urls.size)) 16 } 17 ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _) 18 } 19 20 val output = ranks.collect() 21 output.foreach(tup => println(s"${tup._1} has rank: ${tup._2} .")) page-rank is a classic big-data algorithm example invented by Google to estimate importance of web pages, based on web links Google uses page-rank to sort search results page-rank input has one line per link, extracted from a big web crawl from-url to-url the input is huge! stored in HDFS (like GFS), partitioned (split) into many chunks page-rank algorithm iterative, essentially simulates multiple rounds of users clicking links 85% chance of following one of the links from current page 15% chance of visiting a random page pushes probabilities along links on each iteration probabilities ("rank") gradually converge the multiple iterations are a bit awkward and inefficient in MapReduce my example input -- file "in": [diagram: the link graph] u1 u3 u1 u1 u2 u3 u2 u2 u3 u1 I'll run page-rank in Spark (local machine, not a cluster): ./bin/run-example SparkPageRank in 10 u2 has rank: 0.2610116705534049 . u3 has rank: 0.9999999999999998 . u1 has rank: 1.7389883294465944 . apparently u1 is the most important page. let's run some of the page-rank code in the Spark/Scala interpreter ./bin/spark-shell val lines = spark.read.textFile("in").rdd -- "in" is a file on HDFS (like GFS) -- what is lines? does it contain the content of file "in"? lines.collect() -- lines yields a list of strings, one per line of input -- if we run lines.collect() again, it re-reads file "in" val links1 = lines.map{ s => val parts = s.split("\\s+"); (parts(0), parts(1)) } links1.collect() -- need to separate from and to strings -- map, split, tuple -- acts on each line in turn -- parses each string "x y" into tuple ( "x", "y" ) -- map runs in parallel for each input partition -- looks at just one record (line) at a time -- [diagram: partitioned RDD, parallel map execution] val links2 = links1.distinct() -- need to eliminate duplicate from/to pairs -- distinct() sorts or hashes to bring duplicates together val links3 = links2.groupByKey() -- need to consider all links out of each page together -- groupByKey() sorts or hashes to bring instances of each key together val links4 = links3.cache() -- we're going to use links in each iteration -- the default is to (re-)compute an RDD each time we use it -- cache() == persist in memory var ranks = links4.mapValues(v => 1.0) -- this will be the output -- iteratively updated, these are starting values -- we'll compute a new version of ranks on each iteration -- now for first loop iteration val jj = links4.join(ranks) -- the join brings each page's link list and current rank together -- to generate a single record with the info needed to process one page -- page's url, outgoing urls, page's current rank -- why do we have to explicitly construct this record? -- why not look info up as needed in the ranks and links4 tables? -- we cannot look at global tables in this model! -- the model restricts us to thinking about one record at a time. -- so we need to construct single records that have all the info we need. val contribs = jj.values.flatMap{ case (urls, rank) => urls.map(url => (url, rank / urls.size)) } -- for each link, the "from" page's rank divided by number of its links -- the key is the "to" url (not the "from" url) ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _) -- sum up the links that lead to each page -- reduceByKey shuffles to bring together all contributions to each url and sums -- mapValues just modifies each sum -- second loop iteration val jj2 = links4.join(ranks) val contribs2 = jj2.values.flatMap{ case (urls, rank) => urls.map(url => (url, rank / urls.size)) } ranks = contribs2.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _) -- we're producing a new ranks RDD here, not modifying ranks. -- the loop &c just creates a lineage graph. -- it does not do any real work. val output = ranks.collect() -- collect() is an action. -- it causes the whole computation to execute! output.foreach(tup => println(s"${tup._1} has rank: ${tup._2} .")) for multi-step computation, more convenient than MapReduce we get to say what the stages are! when to map, when to shuffle, when to reduce until the final collect(), this code just creates a lineage graph it does not process the data "lazy" what does the lineage graph look like? Figure 3 it's a graph of transform stages -- a data-flow graph it's a complete recipe for the computation note that the loop added to the graph -- there is not actually a cycle there's a *new* ranks/contribs for each loop iteration note the re-use of the links RDD why is this neat? - easy program control over construction of lineage graph - a simple program can drive a huge computation - programmer doesn't have to worry about details of distribution - programmer is guided to using constructs that are efficient - nice integration between language and cluster computation - the maps inject a closure and program data into the cluster computation - RDDs use language data types, like arrays - easy to get results back from the cluster computation what does execution look like? [diagram: driver, partitioned input file, workers] -- Figure 2 * Scala code runs in the "driver" machine * driver uses lineage graph to tell workers what to do * input in HDFS (like GFS) * input data files are already "partitioned" over many storage servers first 1,000,000 lines in one partition, next lines in another, &c. * more partitions than machines, for load balance * each worker machine takes a partition, applies lineage graph in order narrow dependencies computations that are independent on different partitions like map() a worker can "pipeline" a series of narrow transformations excellent parallel speedup! like MapReduce's Map phase. what about distinct()? groupByKey()? join()? reduceByKey()? these need to look at data from *all* partitions, not just one because all records with a given key must be considered together these are the paper's "wide" dependencies how are wide dependencies implemented? [diagram] the driver knows where the wide dependencies are e.g. between the map() and the distinct() in page-rank upstream transformation, downstream transformation the data must be "shuffled" into new partitions e.g. bring all of a given key together after the upstream transformation: split output up by shuffle criterion (typically some key) arrange into buckets in memory, one per downstream partition before the downstream transformation: (wait until upstream transformation completes -- driver manages this) each worker fetches its bucket from each upstream worker now the data is partitioned in a different way just like Map->Reduce shuffle wide is expensive! all data is moved across the network it's a barrier -- all workers must wait until all are done what if shuffle output is re-used? e.g. links4 in our page-rank by default, must be re-computed, all the way from original input persist() and cache() cause links to be saved in memory for re-use re-using persisted data is a big advantage over MapReduce Spark can optimize using its view of the whole lineage graph stream records, one at a time, though sequence of narrow transformations increases locality, good for CPU data caches avoids having to store entire partition of records in memory eliminate shuffles when inputs already partitioned in the same way can do this b/c it lazily produced lineage graph before starting computation what about fault tolerance? 1000s of workers -> frequent worker failures! failure recovery strategy important for ordinary performance drives the RDD immutable / deterministic design choice main trick: recompute rather than replicate what goes wrong if one worker crashes? we lose worker's partition(s) of the work done (RDDs) so far [diagram: narrow dependencies] driver re-runs transformations on crashed machine's partitions on other machines usually each machine is responsible for many partitions so work can be spread thus re-computation is pretty fast for narrow dependencies, only lost partitions have to be re-executed what about failures when there are wide dependencies? [diagram: wide dependency] re-computing one failed partition requires information from *all* partitions so *all* partitions may need to re-execute from the start! even though they didn't fail Spark supports checkpoints to HDFS to cope with this driver only has to recompute along lineage from latest checkpoint for page-rank, perhaps checkpoint ranks every 10th iteration what about the evaluation section? what do we expect? what does the paper promise? operate out of memory, rather than from HDFS/GFS optimize complex graphs but note it's hard to evaluate programmer convenience or flexibility figure 7 / Section 6.1 what are they comparing Hadoop HadoopBinMem Spark why do they separate first iteration from later ones? it includes initial read from HDFS -- slow, and same for all why is spark 20x faster for later iterations? vs Hadoop? Hadoops reads/write HDFS on every iteration vs HadoopBinMem? lots of overhead for Hadoop to get at even HDFS in memory Spark keeps data as native Java objects in memory why is Spark's advantage less for K-Means than for Logistic Regression? Spark wins mostly by avoiding Hadoop's HDFS reads/writes if computation time is large compared to HDFS ops, less room to win take-away: it's faster to keep data in memory than on disk figure 10 / Section 6.2 this is PageRank 2x to 3x faster than Hadoop, presumably due to mem vs disk why not 20x? we don't know. maybe time is dominated by shuffles which are the same for both? Spark + Controlled Partitioning eliminates one shuffle by specifying partition 2x improvement probably 2x b/c each iteration has two shuffles: join, and reduceByKey and maybe the shuffle dominates performance limitations? geared up for batch processing of huge data all records treated the same way -- data-parallel not useful for e.g. web site interactions, shopping cart, &c no notion of reading or updating individual data items not useful for steady streams of data (but spark has subsequently evolved) summary Spark improves expressivity and performance vs MapReduce what were the keys to success? application gets to describe the data-flow graph spark can optimize using prior knowledge of data-flow graph leave data in memory between transformations, vs write to GFS then read Spark successful, widely used ---- install demo: $ docker pull apache/spark $ docker run -u root -it apache/spark bash # cd .. # cat > in u1 u3 u1 u1 u2 u3 u2 u2 u3 u1 # ... #