6.824 2012 Lecture 5: Distributed Programming: MapReduce, Piccolo Topic: Continuing theme of support for distributed systems This time: distributed big-data computation Off-line applications: index generation, web graph analysis, machine learning, &c run in hours rather than days not for interactive use! Old idea -- parallel databases Re-energized by Google's MapReduce Good news: cluster computing easy to buy/rent 1000s of computers connected by a LAN split up data+computation among machines communicate as needed Good news: some computations parallelize easily E.g. index generation: Input: doc 25: red cat doc 26: big cat Output: big: 26 cat: 25, 26 red: 25 Computation: Scan documents in parallel -> intermediate data red 25 cat 25 big 26 cat 26 Sort on first column Both stages parallelize well Goal: make easy parallelism easy What do we want from a distributed computation framework? Simplify programmer's job Don't constrain programmer High performance (always the same set of systems infrastructure goals!) Challenges: Network is a bottleneck RAM 1000*1 GB/sec, disk 1000*0.1 GB/sec, net cross-section 10 GB/sec Minimize communication! Bring computation to data Expressing parallelism Does programmer or compiler/system discover parallelism? for(i = 0; i < 1000; i++) { G(i); } Must the programmer understand races, locks, &c? Load balance Some machines faster than others (other work, broken, &c) Some parts of job take more CPU than others *One* machine 2x slower -> *entire* cluster half wasted Failures With 1,000 machines, how frequent will failures be? This is a big deal, really separates 5-machine systems from 1000s Do better than re-start entire computation? MapReduce Diagram (Figure 1 from MR paper): * input split over GFS * M map worker machines * local intermediate storage partitioned by reduce key * R reduce worker machines: each responsible for some set of reduce keys pulls its keys from intermediate data of *all* map workers call reduce function on each key + value set * reduce output split over GFS * master controls all Programmer interface: map(split of input) -> set of k/v pairs called once per split of input reduce(key, set of values) -> per-worker output file called once per key, with all values for that key Example: word count split input file into big pieces map(input) parse into words emit a series of word/1 pairs ... the system collects all the values for each word ... reduce(word, set) set of "1"s from maps for this word add them output word and sum Where is parallel speedup coming from? Maybe most of the work is Map scanning/parsing text Each 1000th of input text takes 1/1000th the time to scan/parse So 1000x speedup (optimistically) Why a nice programming model? The programmer does not have to think about concurrency Parallel MR gets same result as running Maps+Reduces on one machine Cannot express a race -- so no locks needed Map/Reduce are functional: just read arguments, emit results No shared r/w data Where does communication occur? No input->Map communication input already split, Maps run locally this locality is critical for performance! No Reduce->output communication Each writes its part of output locally Can Reduce run on the same machine as a Map to avoid communication? No: reduce("the") must count *all* instances of "the" So communication is fundamental for word count Must move all Map output across the network! May be huge: same size as input How to reduce communication costs? Local reduction on Map machine (partial sum) Network much more efficient for big msgs than small So buffer+batch: each Map emits a lot for each Reduce machine Communication is implicit in keys Map emits Programmer need not be aware of distribution Will MR scale? 2x machines -> 1/2 run-time, indefinitely? Start/barrier/stop overhead, network b/w, failures, master What if a worker fails while running Map? Can we restart just that Map on another machine? GFS keeps copy of each input split on 3 machines What if Map had started to produce output? Will some Reduces see Map's output twice? And thus produce e.g. word counts that are too high? What if a worker fails while running Reduce? Where can a restarted worker find Reduce input? Load balance What if some Map machines are faster than others? Or some input splits take longer to process? Don't want lots of idle machines and lots of work left to do! Many more input splits than machines Master hands more Map tasks as machines finish Finish at higher rate -> Master gives you work at a higher rate But there's a constraint: Want to run Map task on machine that stores input data There are 3 copies of all input data splits So three efficient choices of where to run each Map task Stragglers Often one machine is slow at finishing very last task h/w or s/w wedged, overloaded with some other work Or maybe the last task just takes a lot of CPU time Above load balance only balances newly assigned tasks Solution: schedule multiple copies of very last tasks! Constraints on Map and Reduce fns help failures and load balance Functional, so correct to run them more than once Master handles automatically -- no programmer burden Retrospective Very successful inside Google they claim lets novices write big parallel programs Hadoop very popular outside Google Reasonably simple *** Does MR impose significant restrictions on programs? Just two phases Map can see only its split Reduce sees just one key at a time What's the Piccolo programming model? any # of phases -- determined by controller global key/value tables store intermediate data computation proceeds in rounds: run lots of kernal instances each computes, with gets/puts mixed in barrier ... Why might Piccolo be more flexible than MR? 1) iteration rather than two fixed phases 2) memory-like read/write model for intermediate data 3) tables are random-access -- can read any intermediate data Sounds much more flexible, like threads and shared memory! Figure 2 example: what is PageRank? Goal: assign "importances" to web pages Important = lots of Important pages link to you The definition is recursive, the algorithms are iterative diagram: pages, links Diagram to go w/ Figure 2 PageRank code (don't worry about partitioning yet) Tables: graph, curr, next Workers Where are the put()s and get()s? get_iterator yields page, outlinks reads through this worker's split of input get curr[page] (page is a URL) update next[target] (target is a URL) Q: Are the get()s likely to be fast or slow? How do they ensure that they are fast? All tables must have same partitioning... Q: Is the update() likely to be fast or slow? How do they ensure that they are fast? Batching, accumulation/reduction Q: Does the example have races? Will output differ from run to run depending on interleaving? curr -> kernel -> next ; barrier ; swap Q: Will arbitrary put()/get() produce races? Example (Figure 4): crawler inserts new URLs to crawl as it parses fetched pages workers read those URLs as they are being inserted races, but maybe doesn't matter much to correctness Would the PageRank example be hard in MR? You would have to run a series of MR jobs Carefully arrange that output from one had right partitioning for next Maps have to read *two* inputs (graph and last round's ranks) Possible but not directly supported within MR Section 6.4 claims three apps possible in Piccolo but not MR (really Hadoop): web crawler n-body force field simulator matrix multiply What's the basis of the claim? Q: Why does paper say Piccolo tables must fit in memory? What would go wrong if table were big and lived on disk? random-access single-key put/get would be terribly slow Q: How does MR avoid in-memory restriction? Everything streams -- no random put/get Map reads input partition linearly Map->Reduce communication moves big batches Sort before Reduce can be done efficiently even for disk Reduce streams through keys in order, values of key in order Q: If worker crashes, can Piccolo just re-start its work somewhere else? * Kernels are *not* functional -- may have already exposed updates * Worker stored part of tables -- with other workers' input/output Q: How does Piccolo recover from crash of a worker machine? Q: Why is checkpoint correct? What would "correct" mean? Example danger for crawler: Crawler on W1 fetches url U1, marks "done", put()s link(s) to W2 &c W1 checkpoint saves "done", but W2 doesn't save new links Now those links may never be fetched! Why does example checkpoint correctly? W1 uses marks to ensure W2 logs the put() messages Q: What does Piccolo do if some workers/tasks are slow? steal work + migrate table partitions can't just run any task on any worker migration requires care since other workers may be updating Q: What about stragglers -- can it run multiple instances of last tasks? Can we sum up +s and -s, relative to MR? + put/get any part of intermediate data - doesn't really support arbitrary put/get, always local/batched - arbitrary put/get risks races - full ckpt rather than just restart one task - more machines -> higher % time wasted in full restart - load balance is more work, must move data - data must fit in memory Evaluation -- what do we want to know? Quantify the costs Are put/get expensive? Are checkpoints expensive? Is more machines -> more failures a serious problem? Is data migration for load balance expensive? Quantify the benefits More expressive than MR? Better performance than MR? Evaluation of programming model They demonstrate you can implement various things They claim Hadoop can't do web crawler, n-body, and matrix multiply It is hard to evaluate "easy to program"! What could they have done? Performance: scaling Good scaling = more hardware => more performance What do we expect? What does Figure 6 show? Why not perfect? What does Figure 7 show? Performance: vs Hadoop, on PageRank, Section 6.4 Do we expect Piccolo to beat Hadoop? Why? Why does it beat Hadoop by a whopping 11x? Does this show the put/get programming model is a win? I don't think they expected to win by much They mention: C++ vs Java sorting keys in the map phase serialize / deserialize (to disk?) read / write disk What is the sorting about? Maybe to align Reduce output with Map partition of next iteration? It's hard to see why this all adds up to 11x Is Piccolo practical on huge clusters, given full ckpt restore on failure? *** Lab 2: What is Fuse? file system operations are translated in fuse messages sent to yfs_client there is a fuse module inside the OS kernel that does this. app, kernel, yfs_client, extent_server high-level division of labor fuse.cc just glue to call yfs_client methods yfs_client calls ec->get() / put() for low-level storage yfs_client knows about directory format, rename, locking, &c most of your code will be in yfs_client typically pairs of fuse.cc and yfs_client.cc methods look at fuse.cc getattr() for an example calls yfs->getfile() calls ec->getattr() you have to do create for lab 2 fuseserver_create(...) fuseserver_createhelper(parent inum, name) yfs->create(p, n, &i) allocate new inum ec->get(p) add name/inum ec->put(p) return inum and getattr() to fill in attributes