6.824 Lecture 5: Distributed Programming: MapReduce, Dryad, and DryadLINQ Outline: Why, designs, challenges MapReduce (Google) Dryad (Microsoft) DryadLINQ Lab 2 Parallel programming can be hard complexity: threads, shared data, mutexes, RPC, failure dependencies: when can F() and G() run in parallel? vs when must G() follow F()? does it require experts? can the details be hidden? One application area: big data-parallel computation huge data set natural parallelism: can work on different parts of data independently image processing grep indexing many more Cluster computing many machines in one place partition data among machines each machine computes on its part of the data machines communicate when required cheaper than old plan: big shared-memory multiprocessors but less communication capacity Challenges: Parallelize application Where to place input and output data? What parts of the computation to place on what machines? How to avoid network bottleneck? How to write the application? Does the programmer have to indicate what parts can be parallel? Or can the system figure it out? Can the system apply optimizations? Balance computations of an application across computers Statically (e.g., doable when designer knows how much work there is) Dynamically Handle failures of nodes during computation With a 1,000 machines, is a failure likely in a 1 hour period? Often easier than with say banking applications (or YFS lock server) Many computations have no "harmful" side-effects and clear commit points Scheduling several applications who want to share infrastructure Time-sharing Strict partitioning MapReduce Design Partition large data set into M split a split is equal to a 64 Mbyte part of the input typically Run map on each partition, which produces R local partitions using a partition function R Run reduce on each intermediate partition, which produces R output files Programmer interface: map(key, value) -> set of k/v pairs reduce(key, set of values) reduce called once per key, with all values for that key Example: word count split input files into big pieces map(split) takes one split as input parses into words returns list of word/1 pairs reduce(word, set) set of "1"s from maps for this word adds them outputs the number of occurences of each word Implementation diagram: (Figure 1 from MR paper) input partitioned over GFS M map worker machines: read from one split of input write to local intermediate storage each intermediate store partitioned by reduce key R reduce worker machines: pull intermediate data from *all* map workers via RPC local sort by key call reduce function on each key + value set output to GFS output partitioned over GFS where is parallel speedup coming from? will it scale? be N times as fast w/ N machines, indefinitely? what might limit scaling? look at WC implementation from MR paper Appendix Map input is a string with many words split into words at space boundaries one Emit(w, "1") per word Reduce sums up "1"'s of the key's values just one Emit at end main() set up input files (must be in GFS) specify names of output files R is 100 combiner: Reduce is associative can run it on temporary output of each Map worker will reduce communication set # of machines then run! it's all pretty simple Map/Reduce are sequential code no locks, threads, RPC, &c Implementation details: MR system handles all management create workers &c master process keeps track of which parts of work are done keeps track of workers assigns workers map and redece jobs handles failures of workers map workers communicate locations of R partitions to master reducer works asks master for locations sorts input keys run reduce operation when all workers are finished, master returns result to caller Fault tolerance -- what if a worker fails? assign map and reduce jobs to another worker may have to re-run completed map jobs since worker crash also lost intermediate map output Load balance: what's the right number of map input splits (M)? == the number of worker machines? some will take longer than others want to limit damage from any one failure want to split failure-recovery load among multiple workers challenge: stragglers Why MR's particular setup? Can we argue that Map then Reduce perhaps covers many situations? High level: only two things going on in cluster computing 1. partitioned computing (Map) 2. reshuffuling data (Reduce) Retrospective Google claims MR made huge parallel computing accessible to novices Maybe you have to shoe-horn into MR model but you win in scalability what you might lose in performance Dryad Claim: MR is not flexible enough there are computations that should work well on cluster but are awkward to express in MR Example: 1. score web pages by the words they contain 2. score web pages by # of incoming links 3. combine the two scores 4. sort by combined score Example is awkward in MR multiple MR runs -- maybe one for each step programmer must glue together step 3 has two inputs -- MR has only one Dryad idea: programmer specifices arbitrary graph Vertices are computations Edges are communication channels Each vertex can have several input and output channels Most interesting when programmed w/ DryadLINQ... DryadLINQ Goals: allow high-level programming of Dryad graphs good integration with programming language Look at word frequency handout (from the DryadLINQ tech report) count occurences of each word return top 3 (so requires final sort by frequency) public static IQueryable Histogram(input, k){ var words = input.SelectMany(x => x.Split(' ')); var groups = words.GroupBy(x => x); var counts = groups.Select(x => new Pair(x.Key, x.Count())); var ordered = counts.OrderByDescending(x => x.Count); var top = ordered.Take(k); return top; } What does each statement do? input: "A line of words of wisdom" SelectMany: ["A", "line", "ofˇ, "words", "of", "wisdom"] GroupBy: [["A"], ["line"], ["of", "of"], ["words"], ["wisdom"]] Select: [ {"A", 1}, {"line", 1}, {"of", 2}, {"words", 1}, {"wisdom", 1}] OrderByDescending: [{"of", 2}, {"A", 1}, {"line", 1}, {"words", 1}, {"wisdom", 1}] Take(3): [{"of", 2}, {"A", 1}, {"line", 1}] How is this executed? (a dryad graph, see figure 7) original input was stored in four partitions computation proceeds in three stages stage 1: four machines, each reading one partition split into words hash(w) and send over network to one of GroupBys stage 2: each GroupBy works on a (new) partition by words e.g. a-g, h-m, n-t, s-z counts # of occurences of each word it's responsible for sorts by # occurences this is only a partial sort, to reduce cost of final sort stage 3: look at top few words from each partitioned computation pick top 3 why is this cool? program was pretty high level -- much shorter than in MR system figure out a good graph, and managed execution automatic optimization: move most of sort early, so parallel what optimizations can DryadLINQ do? (page 7) runs initial stages on/near input file source knows if data is already partitioned in the required way e.g. if input to WC was already sorted by word this works when input was itself computed by DryadLINQ samples to produce balanced range-partitions for e.g. sort (fig 5) moves associative aggregation up above re-distributions (fig 6) e.g. summing for word count, move up to Map side dynamic aggregation tree to keep net traffic local (fig 6) DryadLINQ performance Table 1 / Figure 7? N machines, sorting 4N gigabytes of data does it scale well? what do we expect? could we hope for a flat line? 2x data, 2x machines, 1x time? close to limits of hardware? how long does it take to send data over net? 40 seconds to send 4 GB if from/to same machine, maybe 80 seconds to snd+rcv sorting 4 GB seems to take 119 (N=1, no network I/O) Lab 2: What is Fuse? file system operations are translated in fuse messages sent to yfs_client there is a fuse module inside the OS kernel that does this. app, kernel, yfs_client, extent_server high-level division of labor fuse.cc just glue to call yfs_client methods yfs_client calls ec->get() / put() for low-level storage yfs_client knows about directory format, rename, locking, &c most of your code will be in yfs_client typically pairs of fuse.cc and yfs_client.cc methods look at fuse.cc getattr() for an example calls yfs->getfile() calls ec->getattr() you have to do create for lab 2 fuseserver_create(...) fuseserver_createhelper(parent inum, name) yfs->create(p, n, &i) allocate new inum ec->get(p) add name/inum ec->put(p) return inum and getattr() to fill in attributes