6.5840 2026 Lecture 1: Introduction 6.5840: Distributed Systems Engineering A "distributed system": a group of computers cooperating to provide a service Examples: popular apps' back-ends, e.g. for messaging big web sites cloud providers Focus here is distributed infrastructure: storage transaction systems "big data" processing frameworks Hard to build: concurrency complex interactions performance bottlenecks partial failure Why useful? to increase capacity via parallel processing to tolerate faults via replication to match distribution of physical devices e.g. sensors to increase security via isolation Why take this course? interesting -- hard problems, powerful solutions big demand -- driven by the rise of big Web sites active research area -- important unsolved problems challenging -- the labs COURSE STRUCTURE http://pdos.csail.mit.edu/6.5840 Course staff: Frans Kaashoek and Robert Morris, lecturers Baltasar Dinis, TA Ayana Alemayehu, TA Upamanyu Sharma, TA Yun-Sheng Chang, TA Danny Villanueva, TA Brian Shi, TA Nour Massri, TA Beshr Islam Bouli, TA Lectures: paper discussion, context, lab guidance Papers: one per lecture research papers, some classic, some new ideas, problems, implementation details, evaluation please read papers before class! web site has a question about each paper submit your answer before start of lecture optionally, submit a question for us Exams: Mid-term exam in class Final exam during finals week papers, lectures, and labs You must attend the exams! Labs: goal: deeper grasp of some important techniques goal: experience with distributed programming first lab is due a week from Friday one per week after that for a while Lab 1: distributed big-data framework (like MapReduce) Lab 2: client/server vs unreliable network Lab 3: fault tolerance using replication (Raft) Lab 4: a fault-tolerant database Lab 5: scalable database performance via sharding We grade the labs using a set of tests we give you all the tests; none are secret Optional final project at the end, in groups of 2 or 3. The final project substitutes for Lab 5. You think of a project and clear it with us. Code, short write-up, demo on last day. Warning: debugging the labs can be time-consuming start early ask questions on Piazza TA office hours MAIN TOPICS This is a course about infrastructure. * Storage. * Communication. * Computation. A big goal: hide the complexity of distribution from applications. Topic: fault tolerance 1000s of servers, big network -> constant failures We'd like to hide these failures. "High availability": service continues despite failures Big idea: replication. If one server crashes, can proceed using the other(s). Topic: consistency General-purpose infrastructure needs well-defined behavior. E.g. "read(x) yields the value from the most recent write(x)." Guaranteeing specified behavior is hard! e.g. "replica" servers are hard to keep identical. Topic: performance A common goal: scalable throughput Nx servers -> Nx total throughput via parallel CPU, RAM, disk, net. Scaling gets harder as N grows e.g. load imbalance. Topic: tradeoffs Fault-tolerance, consistency, and performance are enemies. Fault tolerance and consistency require communication e.g., send data to backup server e.g., check if cached data is up-to-date but communication is often slow and hard to scale up Many designs sacrifice consistency to gain speed. e.g. read(x) might *not* yield the latest write(x)! Painful for application programmers (or users). We'll see many consistency/performance design points. Topic: implementation RPC, threads, concurrency control. The labs... CASE STUDY: MapReduce Let's talk about MapReduce (MR) a good illustration of 6.5840's main topics hugely influential the focus of Lab 1 Context: multi-hour computations on multi-terabyte data-sets e.g. build search index, or sort, or analyze structure of web only practical with 1000s of computers A big goal: easy for non-specialist programmers programmer just defines Map and Reduce functions often simple sequential code MR manages, and hides, all aspects of distribution! MR is a framework / library; "application" is just Map()/Reduce() Abstract view of a MapReduce job -- word count Input1 -> Map -> a,1 b,1 Input2 -> Map -> b,1 Input3 -> Map -> a,1 c,1 | | | | | -> Reduce -> c,1 | -----> Reduce -> b,2 ---------> Reduce -> a,2 1) input is (already) split into M pieces 2) MR calls Map() for each input split, produces list of k,v pairs "intermediate" data each Map() call is a "task" 3) when Maps are done, MR gathers all intermediate v's for each k, and passes each key + values to a Reduce call 4) final output is set of pairs from Reduce()s Word-count code Map(d) chop d into words for each word w emit(w, "1") Reduce(k, v[]) emit(len(v[])) MapReduce scales well: N "worker" computers (might) get you Nx throughput. Maps()s can run in parallel, since they don't interact. Same for Reduce()s. Thus more computers -> more throughput -- very nice! MapReduce hides much complexity: sending map+reduce code to servers tracking which tasks have finished "shuffling" intermediate data from Maps to Reduces balancing load over servers recovering from crashed servers To get these benefits, MapReduce restricts applications: Only one pattern (Map -> shuffle -> Reduce). No interaction or state (other than via intermediate output). Only batch: no real-time or streaming processing. Some details (paper's Figure 1) Input and output are stored on the GFS cluster file system MR needs huge parallel input and output throughput. GFS splits files over many servers, many disks, in 64 MB chunks Maps read in parallel Reduces write in parallel GFS replicates data on 2 or 3 servers, for fault tolerance GFS is a big win for MapReduce MR writes Map() output to local disk MR splits into files by hash(key) mod R each "hash bucket" contains multiple keys The map workers all hash the same way The shuffle each Reduce task processes one hash bucket MR fetches each Reduce tasks' bucket from every Map worker merge, sort by key, call Reduce() for each key each Reduce task writes a separate output file on GFS The "Coordinator" manages all the steps in a job. tracks state of each task hands out tasks to worker machines What will limit performance? We care since that limit is the thing to optimize. CPU? memory? disk? network? In 2004 authors were limited by network speed. What does MR send over the network? Maps read input from GFS. Reduces fetch Map intermediate output. Often as large as input, e.g. for sorting. Reduces write output files to GFS. How fast was the paper's network? Section 5.1: 1800 machines, two-level switched network [diagram: root switch, 2nd level of switches, machines] each switch must have had ~42 ports (square root of 1800) MR's shuffle requires every worker to fetch data from every other Only 1/42nd stays in local switch So MR's shuffle sends most data through root switch. Paper's root switch: 100 to 200 gigabits/second, total 1800 machines, so ~55 megabits/second/machine. 55 is small: less than disk or RAM speed. How does MR minimize network use? Coordinator tries to run each Map task on GFS server that stores its input. All computers run both GFS and MR workers So Map input is usually read from GFS data on local disk, not over network. Intermediate data goes over network just once. Map worker writes to local disk. Reduce workers read from Map worker disks over the network. (Storing it in GFS would require at least two trips over the network.) How does MR get good load balance? Why do we care about load balance? If one server has more work than others, or is slower, then other servers will lie idle (wasted) at the end, waiting. So ideally MR divides work so that all workers finish at same time. But tasks vary in size, and computers vary in speed. Solution: many more tasks than worker machines. Coordinator hands out new tasks to workers who finish previous tasks. So faster servers do more tasks than slower ones. And slow servers are given less work, reducing impact on total time. What about fault tolerance? What if a worker computer crashes? We want MR framework to hide failures. Does MR have to re-run the whole job from the beginning? Why not? Coordinator re-runs just the failed Map()s and Reduce()s. Suppose MR runs a Map task twice, one Reduce sees first run's output, but another Reduce sees the second run's output? The two Map executions had better produce identical intermediate output! Map and Reduce should be pure deterministic functions: they are only allowed to look at their arguments/input. no state, no file I/O, no interaction, no external communication, no random numbers. Programmer is responsible for ensuring this determinism. Other failures/problems: * What if the coordinator gives two workers the same Map() task? perhaps the coordinator incorrectly thinks one worker died. it will tell Reduce workers about only one of them. * What if the coordinator gives two workers the same Reduce() task? they will both try to write the same output file on GFS! atomic GFS rename prevents mixing; one complete file will be visible. * What if a single worker is very slow -- a "straggler"? perhaps due to flakey hardware. coordinator starts a second copy of last few tasks. * What if a worker computes incorrect output, due to broken h/w or s/w? too bad! MR assumes "fail-stop" CPUs and software. * What if the coordinator crashes? Performance? Figure 2 X-Axis is time Y-Axis is total rate at which a "grep"-style job reads its input A terabyte (1000 GB) of input 1764 workers 30,000 MB/s (30 GB/s) is huge! Why 30,000 MB/s? 17 MB/s per worker machine -- 140 megabits/second more than our guess (55 mbit/s) of net bandwidth input probably read direct from two local GFS disks so each disk probably could read at about 9 MB/second Why is the main period of activity about 30 seconds? Why does it take 50 seconds for throughput to reach maximum? Current status? Hugely influential (Hadoop, Spark, Lab 1, &c). Probably no longer in use at Google. Replaced by Flume / FlumeJava (see paper by Chambers et al). GFS replaced by Colossus (no good description), and BigTable. Next lecture: Programming: Go, Threads, RPC