6.824 2013 Lecture 17: MapReduce Why MapReduce? Practical solution to real problems Programming/managing off-line processing of "big data" Index generation, web graph analysis, machine learning, &c A triumph of simplicity Sparked lots of work, very influential (Hadoop &c) Continuing source of much follow-on research Cluster computing for Big Data easy to buy/rent 1000s of computers connected by a LAN split up data+computation among machines communicate as needed Physical view: 1000 machines, each with a disk tree of network switches multi-terabyte data striped over all disks Example: entire crawled web partitioned over disks what's a web crawler? each disk holds e.g. 0.1% of all web pages what are the 10 most popular words on the web? each machine looks in locally stored partition then they all send their top-10 lists to master machine master merges runs 1000x as fast as on one machine simple in principle, but a lot of work from scratch Challenges: Network is a bottleneck RAM 1000*1 GB/sec, disk 1000*0.1 GB/sec, net cross-section 10 GB/sec Explain host link b/w vs net cross-section b/w! tree of switches. FDS/CLOS. Compute on locally stored data! Minimize communication! Expressing parallelism Does programmer or compiler/system discover parallelism? Must the programmer understand races, locks, &c? Load balance Some machines faster than others (other work, broken, &c) Some parts of job take more CPU than others *One* machine 2x slower -> *entire* cluster half wasted Failures Likely to be frequent w/ 1000s of machines Paper says average of one failure per job! Do better than re-start entire computation! What does the programmer really need to express? At a high level the cluster does only two things: 1. Computation on locally stored data -- straightforward -- Map 2. Movement between machines -- not so clear What's a good way for the programmer to express data movement? Needs to be pretty explicit, since Very Expensive But somehow abstract away details! Idea: do as much work locally as possible -- Map Map tags "intermediate" items with destination move intermediate data according to tags -- really a shuffle run "Reduce" phase on moved intermediate data MR abstracts this: Map tags each intermediate item with a key All intermediate items with a given key are Reduced on same machine So Map output key implies data movement But programmer need not understand the details Programmer interface: map(split of input) -> set of k/v pairs called once per split of input reduce(key, set of values) -> per-worker output file called once per key, with iterator for all values for that key Diagram: * input partitioned into M splits on GFS, w/ replicas: A, B, C, ... * Maps read local split, produce R local intermediate files (I_A0, I_A1 .. I_AR) * Reduce # = hash(key) % R * Reduce task i fetches I_Ai, I_Bi, I_Ci -- from every Map worker * Sort the fetched I_ files to bring same key together * Call Reduce function * Write output to GFS * Master controls all: Map task list Reduce task list Location of intermediate data (which Map worker run which Map task) Example: word count -- paper Section 2.1 (and Appendix) split input file into big pieces map(input) parse into words emit a series of word/1 pairs ... each map worker accumulates R intermediate files ... reduce(word, set) set of "1"s from maps for this word add them output word and sum Why a nice programming model? The programmer does not have to think about concurrency Map and Reduce are serial functions: input -> output No shared mem, threads, locks, RPC, failure handling, &c What restrictions does MapReduce impose on programmer? Just the two passes Map and Reduce must be pure functions -- can't share Must be deterministic for fault-tolerance -- re-execution Will MR scale? 2x machines -> 1/2 run-time, indefinitely? Map calls probably scale 2x machines -> each Map's input 1/2 as big -> done in 1/2 the time limited since input may not be infinitely partitionable Reduce calls probably scale 2x machines -> each handles 1/2 as many keys -> done in 1/2 the time limited by number of keys (if only N keys, can harness <= N machines) Bottlenecks: Will cross-section net b/w scale with # of machines? If not, network will ultimately dominate run-time If all time spent in Map->Reduce shuffle, adding more machines won't help Start/barrier/stop overhead i.e. if too little data or too many machines, may never fully ramp up Failures may prevent or delay completion Imbalanced load -- all machines idle waiting for slowest What's a not-so-scalable application? Create a text index ("inverted index") Input: A collection of documents, e.g. crawled copy of entire web doc 31: I am aaron doc 32: aaron at 8:00 am Output: Aaron: 31/3 32/1 ... am: 31/2 32/4 ... Map: read file for document i split into words for each offset j emit key=word[j] value=i/j Reduce(word, list of d/o) emit word, list of d/o Index construction is equivalent to distributed sort Need to bring info about each word together MR takes care of sort (shuffle, then local intermediate sort) Note that index is the same size as the original input How long will this take? Suppose: 10 terabytes of data (10,000 GBytes) network cross-section 10 GByte/sec 1000 machines disks run at 100 MB/sec fast CPUs Map reads all data from disk each machine reads 10 GByte takes 100 seconds wall-clock time intermediate output as big as input so 200 seconds, including writing intermediate to disk Shuffle moves all data moving 10 terabytes takes 1,000 seconds Reduce just writes, about 100 seconds Conclusion: disk: 300 seconds ; net: 1,000 seconds Bottlenecked by net More machines/disks won't help Can we avoid the shuffle for index generation? Do all computations require a big shuffle? Word count -- partial reduction on Map side "Combiner function", 4.3 x 1, x 1, x 1, y 1 -> x 3, y 1 Result: much less data to shuffle What if a worker fails while running Map? Can we restart just that Map on another machine? GFS keeps copy of each input split on 3 machines If a Map finishes, then worker fails, do we need to re-run? Where is the Map's intermediate output stored? What if Map had started to produce output? Will some Reduces see Map's output twice? And thus produce e.g. word counts that are too high? How do Reduces know where to find re-computed Map output? Why does Map write intermediate data to local disk? Why not pro-actively send to Reduce machine? Why not store in GFS for availability? What if a worker fails while running Reduce? Where can a restarted worker find Reduce input? If a Reduce finishes, then worker fails, do we need to re-run? Where is the Reduce's output stored? Load balance What if some Map machines are faster than others? Or some input splits take longer to process? Don't want lots of idle machines and lots of work left to do! Many more input splits than machines Master hands out more Map tasks as machines finish Finish at higher rate -> Master gives you work at a higher rate But there's a constraint: Want to run Map task on machine that stores input data There are 3 copies of all input data splits So only three efficient choices of where to run each Map task Stragglers Often one machine is slow at finishing very last task h/w or s/w wedged, overloaded with some other work Load balance only balances newly assigned tasks Solution: always schedule multiple copies of very last tasks! How many Map/Reduce tasks vs workers should we have? More => finer grained load balance. More => less redundant work for straggler reduction. More => farm tasks of failed worker over more machines, re-execute faster. More => overlap Map and shuffle, shuffle and Reduce. Less => fewer intermediate files, less master overhead. They use M = 10x number of workers, R = 2x. M and R also maybe constrained by how data laid out in GFS. Constraints on Map and Reduce fns help failures and load balance Pure functions, so correct to run them more than once No side effects, no communication, just input and output Same input => same output Master handles automatically -- no programmer burden Let's look at performance Figure 2 / Section 5.2 A simple grep, just Map, essentially no shuffle or reduce One terabyte of input 1000 machines Figure 2 x-axis is time, y-axis is input read rate Why does it go up, then go down? Why does it go up to 30,000 MB/s? Why not 3,000 or 300,000? What limits the peak rate? Why does it take 60 seconds to reach the peak rate? Why doesn't it go up to 30,000 immediately? Figure 3(a) / Section 5.3 sorting a terabyte Top graph -- Input rate Why peak of 10,000 MB/s? Why less than Figure 2's 30,000 MB/s? Why does read phase last abt 100 seconds? Middle graph -- Shuffle rate How is shuffle able to start before Map phase finishes? Why does it peak at 5,000 MB/s? Why a gap, then starts again? Lower graph -- Reduce output rate How can reduces start before shuffle has finished? Why is output rate so much lower than input rate? Why the gap between apparent end of output and vertical "Done" line? Are there computations not well suited to MapReduce? page-rank iteratively adjust score based on scores of linking pages not just two phases; random access to intermediate results n-body gravity simulation iterative forces depend on positions of all other bodies again, many phases, random access to intermediate data Lots of research MR follow-ons (FDS, Berkeley Spark) But: More flexibility => more communication, pressure on network More flexibility => more difficult to recover from failure To re-execute, must re-construct input from previous iterations Retrospective Still very successful inside Google Hadoop very popular outside Google Lots of tuning, but same model despite its simplicity