6.5840 2024 Lecture 17: Ray Ownership: A Distributed Future System For Fine-Grained Tasks by Stephanie Wang et al., NSDI 2021 Why are we reading this paper? A modern version of MapReduce, Spark, etc. Moving large data efficiently with futures Managing distributed futures efficiently using ownership Widely-used open-source project (e.g., by OpenAI) anyscale Evolving parallel applications Combine functional and stateful Low latency requirements Examples: Model serving (3a) quick response large data (client images) router and model replicas maintain state between invocations On-line video processing (video stabilizing in 3b) compute trajectory of an object start processing frame before seeing next frame actor for storing the decoded frame Two ideas: futures (handle for the result of a computation) shard state about futures based on ownership Futures program can invoke a function asynchronously, which return a future program can pass the reference to a future as an argument to other functions programs can force a future to evaluate benefit: system can decide where to run future and when data is moved Ray Python example of remote execution with futures: # see https://docs.ray.io/en/latest/ray-core/tips-for-first-time.html import ray, time ray.init() @ray.remote def g(i): return i f = g.remote(10) # f is a future and ray invokes g asynchronously Ray futures (table 1) first-class futures borrower Example: f1 = C() f2 = C() f3 = Add(shared(f1), shared(f2)) # pass f1+f2 by reference c = get(f3) Example: def A(): x = B() y = C(shared(x)) # pass f by reference f1 := A() Implementation challenge many futures some run for a short time (a few ms) garbage collection of objects worker machine crashes while running a future Ray: transparent recovery of idempotent futures (fig 5) Strawman solution: centralized coordinator table with future, task, and loc scalability bottleneck one round-trip for invocation and get Example ID | Task | Refs | Locations ---------------------------------- x B() 1,3 2 y C(X) 1 3 where 1 is worker running A, 2 running B, and 3 running C refs for reference counting recovery: "lineage reconstruction" re-execute function and descendants which recreate futures, and further descendants ex: re-executing A, re-executes B and C too Alternative: sharding the table (e.g., by obj id) still one round-trip some ops require multiple shards (garbage collection) Ray solution: shard by ownership Example B owns "x" A and C "borrows" x it may pass it further does it reference counting The caller of function is the owner of returned future But value of object is stored at the worker that runs the task Ex. x: A is owner of x, value of x is store at the worker that runs B (2) advantage A can invoke C without any communication with B Example: Owner table at 1: ID Task Owner Ref Loc x B() 1 2, 3 2 y C(x) 1 3 Owner table at 2: ID Task Owner Ref Loc x B() 1 2 Owner table at 3: ID Task Owner Ref Loc x 1 2 y 1 3 Ownership implementation owner = (IP, port, workerid) taskId = parentid + task_index objectId = taskId + obj_index distributed scheduler (figure 8) schedule(t) id = local worker while true: ok, id = reserve @id using resource request if ok: l = lease(id) break table[t].Loc = l optimization: reuse leased worker Memory management (figure 9) API obj store: Create, Get, Pin, Release Get blocks until obj is created Initial Create pins obj tab[oid].Location = locations in object store may have secondary copies Failure recovery check locations in table loss of an owned object re-execute tasks following lineage use secondary copy to avoid recomputing loss of an owner: risk of dangling reference fate sharing example: if 2 crashes, 3 may have a dangling ref 3 fails and 1 fails Example: if worker 3 fails (after starting C but before finishing) worker 1 (A) will learn about it if is the owner of y (and has the Task info) and asks Ray to re-execute C(X) if worker 1 an 2 fail, worker 3 has a dangling ref to x it will never be resolved to a value worker 3 "shares fate" with the owner of "x" i.e., it terminates itself, pretending a crash the caller of A will re-submit A Homework: if C() in figure 6(a) is as follows: def C(x): z = D(X) return get(z) # return value of future z Suppose the worker than runs D fails before finishing, which worker would initiates the re-execution of D()? who owns z? the caller: C who resubmits D? C