6.5840 2024 Lecture 19: Boki Boki: Stateful Serverless Computing with Shared logs by Zhihpeng Jia and Emmett Witchel, SOSP 2021 Why are we reading this paper? Building applications using a log is a powerful idea logs useful for replicated state machine (e.g., Raft) but can also be used in other ways (e.g., exactly-once durable functions) make a log available to applications as a first-class abstraction Clever sharded, fault-tolerant implementation of shared log Example of recent research paper Motivation: serverless functions Serverless computing is convenient for developers Provider runs function; developer doesn't have to provision machines But some challenges: State lives in cloud storage each function invocation must fetch it Boki supports local state between function invocations through its LogBook API Provider reexecutes a serverless function that crashed ok, if function is idempotent With Boki's LogBookAPI, can make function workflows exactly once External storage example def inc(x): v = read(x) # step: reads from db and appends result to log # (see workloads/workflow/boki/pkg/cayonlib/dblib.go) write(x, v+1) # step: as in 6(a) return v+1 Outcome: assume x starts 0, then inc should return 1 And not 2, if re-executed Goal: exactly-once execution approach: build both on a shared log shared log also good for other purposes e.g., implementing transactions Boki's shared log: LogBooks one shared log with several LogBooks one LogBook for a workflow (one bookid) app() shares tail index with function 1 and 2 LogBook API: struct LogRecord { seqnum uint64 tags []tag // 64-bit int to identify streams within LogBook } // returns log index of appended record logAppend(tags []tag, data []byte, r *LogRecord) uint64 Ordering LogBooks are merged in a shared log thus, index returned by logAppend increases by jumps 4 appends may return: 0, 3, 9, 12 to traverse a LogBook: logReadNext w. min seq#, logReadPrev w. max seq# Why not a private log per app? An application may want to use multiple log books Ordering across log books Using the log tags to identify step for inc() what if inc crashes just before it returns: log before crash: ["read", x, v, tag_1], ["write", x, v+1, tag_2] re-execute inc(): append read logReadNext returns result from first execution for read (seq 1) append write logReadNext returns rec from first execution for write (seq 2) conditional rawDBWrite fails (v# in DB is rec.seqnum) what if inc() crashes between logAppend() and rawDBWrite() in inc()'s write? read returns prev result from log write writes it this time to DB after success, trim log Two concurrent executions of inc() option 1: use locks 6(b) log index used to determine winner option 2: use OCC transactions Locks: fig 6(b) and 7 lock(me): rec = findTail() # "first" tail of log (i.e., w. first valid prev) if rec["holder"] = "E": logappend(data:{"holder": me, "prev": rec.seqnum}) rec = findTail() if rec["holder"] == me: return rec return None Object store: fig 6() Set(x): append log record with tag x Get(x): replay log records with tag x Ex log: [Set, "x", 20], [Set, "x", 10] Fault tolerance from obj store comes from shared log State-machine replication (as in lab 4) Append commands to log Functions read the log to build the state How to implement shared log efficiently? Many functions append to the log concurrently Replay the log for reads results in long delays for reads Approach: multiplex many LogBooks over a *sharded* physical log LogBooks: [LB A] [LB B].....[LB i]... Log with n shards: [A0 A1 B0 B1 F0] [A2 C0 C1 D0 E0] ... [ ... ] Records of one log book in several shards (e.g., LB A's records) Each engine uses its own shard Allows parallel appends to a log book Challenges: appends: sequencing of records across shards reads: finding shard with record and avoid replay Why not one physical log per log book? some LogBooks are small, and come and go wasteful to allocate a physical log for each one Sequencing using metalog Log ordering within shard Each shards sends periodically its tail index to sequencer shard 0's tail | V Metalog: [2, 3, ..., n][3, 4,...,n] (log of progress vector) || Order [A0 A1 A2 C0 C1..][B0 D0 ...] first two records from shard 0 (A0 and A1), first 3 from shard 1 (A C C), etc. System diagram LogBook engine on each node only ephemeral state (log index, record cache) Storage nodes store records Sequencer nodes store metalogs seq# = (term_id, log_id, pos) term_id is like Raft's term reconfiguring is as in Grove Append engine writes to its shard shard is replicated on 3 nodes w. primary-back protocol each nodes appends to a different shard periodically storage nodes report to sequencer 300usec in benchmarks? engines learn from sequencer the sequence number for appended record Read each function node builds an index from LogBook to sequence numbers A: [0 1 2 .. 12...] Re-constructed from progress vector and shards Updated on append LogRead(A, min_seq=5) returns record for 12 Challenge: read your own writes Two functions in workflow func 1 runs on node 1 and func 2 runs on node 2 func 2 must see 1's writes Sol: func 2 inherits metalog position from 1 read's block until node 2 catches up How well does Boki perform? See Figure 11 Summary Shared log to build applications Sharded log implementation