6.5840 2024 Lecture 9: Zookeeper Case Study Reading: "ZooKeeper: wait-free coordination for internet-scale systems", Patrick Hunt, Mahadev Konar, Flavio P. Junqueira, Benjamin Reed. 2010 USENIX Annual Technical Conference. today's lecture considers ZooKeeper from two angles: * a simpler foundation for fault-tolerant applications. * high-performance in a real-life service built on Raft-like replication. ZooKeeper is also interesting because it's very widely used if we wanted to make a fault-tolerant service like MR coordinator, we could replicate with Raft, and that would be OK! [diagram: Raft-replicated MR coordinator, workers] but building directly on Raft is complex but a replicated state machine is awkward to program is there a simpler way? you can think of state machine replication (Raft) as replicating the computation; the state is replicated as a side-effect. can we replicate state without replicating computation? yes: use fault-tolerant storage, for example ZooKeeper [MR coord, workers, ZK x3] easier to write the MR coord than with replicated state machine ordinary straight-line code, plus "save" calls what might MR coord store in ZK? coord's IP addr, set of jobs, status of tasks, set of workers, assignments update data in ZK on each change (but big data itself in GFS, not ZK) ZK acting as a "configuration service" helps MR coord and worker find each other what if MR coord fails? we weren't replicating it on a backup coord server but we don't need one! just pick any computer, start MR coord s/w on it, have it read state from ZK. new coord can pick up where failed one left off. makes the most sense in a cloud easy to allocate a replacement server challenges detect MR coord failure elect new MR coord (one at a time! no split brain!) new coord needs to be able to recover/repair state read from ZK what if old coord crashed midway through complex update? what if old coord doesn't realize it's been replaced can it still read/write state in ZK? can it affect other entities incorrectly? e.g. tell workers to do things? danger of split brain! performance Zookeeper data model (Figure 1) the state: a file-system-like tree of znodes file names, file content, directories, path names directories help different apps avoid interfering each znode has a version number types of znodes: regular ephemeral sequential: name + seqno Operations (Section 2.2) s = openSession() create(s, path, data, flags) exclusive -- fails if path already exists exists(s, path, watch) watch=true asks for notification if path is later created/deleted getData(s, path, watch) -> data, version setData(s, path, data, version) if znode.version = version, then update getChildren(s, path, watch) these throw an exception if the ZK server says it has terminated the session so that application won't continue ZooKeeper API well tuned for concurrency and synchronization: + exclusive file creation; exactly one concurrent create returns success + getData()/setData(x, version) supports mini-transactions + sessions help cope with client failure (e.g. release locks) + sequential files create order among multiple clients + watches avoid costly repeated polling Example: MapReduce coordinator election this is the paper's Simple Lock in Section 2.4 s = openSession() while true: if create(s, "/mr/c", ephemeral=true) // we are the coordinator! setData(s, "/mr/ip", ...) else if exists(s, "/mr/c", watch=true) // we are not the coordinator wait for watch event note: exclusive create if multiple clients concurrently attempt, only one will succeed ephemeral znode coordinator failure automatically lets new coordinator be elected watch potential replacement coordinators can wait w/o polling what do we want to happen if the elected coordinator fails? * want to elect a replacement * must cope with crash in the middle of updating state in ZK * must cope with possibility that the coordinator *didn't* fail! even though /mr/c looks like a lock, the possibility of failure makes the situation very different from e.g. Go sync.Mutex what does ZK do? client failure -> client stops sending keep-alive messages to ZK no keep-alives -> ZK leader times out and terminates the session session termination -> ZK leader deletes session's ephemeral files and ignores further requests from that session ephemeral deletions are A-linearizable ZK ops now a new MR coordinator can elect itself what if the MR coordinator crashes while updating state in ZK? maybe store all data in a single ZK file individual setData() calls are atomic (all or nothing vs failure) what if there are multiple znodes containing state data? use paper's "ready" file scheme what if the coordinator is alive and thinks it is still coordinator? but ZK has decided it is dead and deleted its ephemeral /mr/c file? a new coordinator will likely be elected. will two computers think they are the coordinator? this could happen. can the old coordinator modify state in ZK? this cannnot happen! when ZK times out a client's session, two things happen atomically: ZK deletes the clients ephemeral nodes. ZK stops listening to the session -- will reject all operations. so old coordinator can no longer modify or read data in ZK! if it tries, its client ZK library will raise an exception forcing the client to realize it is no longer coordinator "Fencing" is a term for ignoring requests from a client declared dead even if it is actually alive an important pattern in distributed systems: a single entity (e.g. ZK) decides which computers are alive or dead sometimes called a failure detector it may not be correct, e.g. if the network drops messages but everyone obeys its decisions agreement is more important than being right, to avoid split brain but possibility of being wrong => may need to fence what if coordinator interacts with entities other than ZK? that don't know about the coordinator's ZK session state? e.g. coordinator talking to MapReduce workers. they may need to fence (i.e. ignore deposed coordinator) -- how? idea: worker could "watch" leader znode in ZK to learn of changes. not perfect: window between change and watch notification arrival. idea: each new coordinator gets an increasing "epoch" number. from a file in ZK (see below). coordinator sends epoch in each message to workers. workers remember highest epoch they have seen. workers reject messages with epochs smaller than highest seen. so they'll ignore a superseded coordinator once they see a newer coordinator. Example: allocate a unique epoch number next epoch number stored in a znode while true: e, version := getData("/epoch") if setData("/epoch", e + 1, version): break this is an atomic read-modify-write think about what happens if two clients execute this at the same time how is ZK designed for good performance? emphasis is on handling many reading/watching clients [diagram: leader, lots of followers, clients] 1) many ZK follower servers; clients are spread over them for parallelism client sends all operations to its ZK follower ZK follower executes reads locally, from its replica of ZK data to avoid loading the ZK leader ZK follower forwards writes to ZK leader 2) watch, not poll the ZK follower (not the ZK leader) does the work 3) clients of ZK can launch async operations i.e. send request; completion notification delivered to code separately unlike RPC a client can launch many writes without waiting ZK processes them efficiently in a batch; fewer msgs, disk writes client library numbers them, ZK executes them in that order e.g. to update a bunch of znodes then create "ready" znode a read may not see latest completed writes! since client's follower may be behind (not in write's majority) when is it ok to use stale data? when is it not ok? ZK has separate consistency guarantees for reads and writes writes: "A-linearizability" like linearizability but each client's async writes are ordered, not concurrent ZK leader picks an order for writes, all followers execute in that order reads: "client FIFO order" a client's operations fit into the overall write order in client issue order client FIFO order example: suppose a client issues these asynchronous operations: W1 W2 R3 W4 the client's ZK server (a follower) receives them and forwards the writes to the ZK leader the follower sees a stream of writes from the ZK leader: other client's writes, plus its own W W W1 ... the follower must wait for W2 to appear before executing R3 to obey client FIFO rule W W W W1 W W2 W R3 ... thus, R3 guaranteed to see any effects of W1 and W2 (but not W4) client FIFO order is guaranteed even if the client switches ZK servers Writer: Reader: write f create "done" exists("done") ... client switches from ZK server S1 to S2 read f the read of f is guaranteed to see the written value even if S2 is far enough behind that it hasn't seen either write! the reading client sends the zxid of create("done") to S2, and S2 then delays the read("f") until it has caught up to that zxid Efficiency depends on how clients use ZK! what's wrong with Simple Locks? (page 6) suppose 100s of clients are waiting for the lock? better: Locks without Herd Effect 1. create a "sequential" file 2. list files 3. if no lower-numbered, lock is acquired! 4. if exists(next-lower-numbered, watch=true) 5. wait for event... 6. goto 2 Q: could a lower-numbered file be created between steps 2 and 3? Q: can watch fire before it is the client's turn? A: yes lock-10 <- current lock holder lock-11 <- next one lock-12 <- my request if client that created lock-11 dies before it gets the lock, the watch will fire but it isn't my turn yet. Some implementation details aimed at high performance: Data must fit in memory, so reads are fast (no need to read disk). So you can't store huge quantities of data in ZooKeeper. Writes (log entries) must be written to disk, and waited for. So committed updates aren't lost in a crash or power failure. Hurts latency; batching can help throughput. Periodically, complete snapshots are written to disk. Fuzzy technique allows snapshotting concurrently with write operations. How is the performance? Figure 5 -- throughput. Overall, can handle 10s of thousands of operations / second. Is this a lot? Enough? Why do the lines go up as they move to the right? Why does the x=0 performance go down as the number of servers increases? Why does the "3 servers" line change to be worst at 100% reads? What might limit it to 20,000? Why not 200,000? Each op is a 1000-byte write... What about latency? Table 2 / Section 5.2 implies 1.2 milliseconds. For a single worker (client) waiting after each write request. Where might the 1.2 milliseconds come from? Disk writes? Communication? Computation? (How can it be this fast, given mechanical disk rotation times?) Why only ~2000 req/s rather than Figure 5's 20,000? What about recovery time? Figure 8 Follower failure -> just a decrease in total throughput. Leader failure -> a pause for timeout and election. Visually, on the order of a few seconds. ZooKeeper is very widely used. see ZooKeeper's Wikipedia page for a list of projects that use it often used as a kind of fault-tolerant name service what's the current coordinator's IP address? what workers exist? can be used to simplify overall fault-tolerance strategy store all state in ZK e.g. MR queue of jobs, status of tasks then service servers needn't themselves replicate References: https://zookeeper.apache.org/doc/r3.4.8/api/org/apache/zookeeper/ZooKeeper.html ZAB: http://dl.acm.org/citation.cfm?id=2056409 https://zookeeper.apache.org/ https://cs.brown.edu/~mph/Herlihy91/p124-herlihy.pdf (wait free, universal objects, etc.)