--- title: CRDTs subtitle: Guaranteed Eventual Consistency author: Levi Pearson ... # Introduction ## Goals - To Understand + What makes a CRDT + Theoretical Impact + Practical Relevance + Some Existing CRDTs ## Source Paper + *A Comprehensive Study of Convergent and Commutative Replicated Data Types* 2011 INRIA Research Report by Shapiro, Preguiça, Baquero, Zawirski + Many related publications on CRDTs from same authors + Zawirski's Ph.D. Thesis is related to size-optimizing CRDTs ## Outline + Basic CRDT Concept + Detailed Model of CRDTs + Portfolio of CRDT Specifications + Garbage Collection Issues + A Shopping Cart Example # CRDTs in a Nutshell ## Motivations + **Eventual Consistency** for scalability + Improve theoretical grounding + Weaken coordination requirements ## Eventual Consistency + Updates are executed at replicas + Later, sent to all other replicas + Eventually, all replicas see all updates + They apply asynchronously and possibly in different orders + Concurrent updates may conflict and require *arbitration* ## Strong Eventual Consistency + Formal model of Eventual Consistency + No conflict-resolution or roll-back required; *Conflict-Freedom* + Extremely scalable and fault-tolerant + Based on mathematical properties ## Two Related Formulations + *Convergent* Replicated Data Types (CvRDT) - State-based - Weak network requirements - Based on *monotonic join semilattices* + *Commutative* Replicated Data Types (CmRDT) - Operation-based - Requires reliable broadcast with *delivery order* - Based on *commutativity of concurrent operations* + Both can always simulate one another # System Model of CRDTs ## Distributed Systems + Processes interconnected by asynchronous network + Network can partition and recover + Nodes can operate disconnected for some time + Processes may crash and recover, memory survives crashes + Assumption of non-Byzantine behavior ## Atoms and Objects + Processes store **atoms** and **objects** + Atoms are base immutable data; directly comparable values + Objects are mutable, replicated data; collections of objects and atoms + Objects with the same identity but different location are **replicas** + Objects are independent; no transactions ## Operations + There are **clients** that query and modify objects via **operations** + A client acts on one replica, the **source** replica + The operation executes locally at the source and then remotely - *Phase 1*: Operation called at source; possibly some processing occurs - *Phase 2*: Update transmitted asynchronously to **downstream** replicas ## State-Based Replication + Update happens entirely at source replica via **update** + Modified payload transmitted downstream + Receivers use **merge** operation to update their payload + Replicas can be compared via **compare** operation + Operations guarded by **source pre-conditions** ## State-Based Causal History Model + Initially, set $C(x_i)$ of each replica $x_i$ is $\emptyset$ + After update $f$, $C(f(x_i))$ is $C(x_i) \cup \{f\}$ + After merge of $C(x_i)$ and $C(x_j)$, the state is $C(x_i) \cup C(x_j)$ + The "happens-before" relation $f \to g \Leftrightarrow C(f) \subset C(g)$ ## State-Based Network Assumptions + States transmit between replicas at unspecified times + Transmissions occur infinitely often + Communication forms a connected graph ## Operation-Based Replication + Phase 1 of **update** is local to source replica - Precondition guard must be true - Executes immediately and atomically - No side-effects - May return results + Phase 2 of **update** executes asynchronously on downstream replicas - Downstream precondition must be true - Can't return results - Updates downstream state - Arguments prepared by Phase 1 - Executes atomically ## Operation-Based Causal History Model + Initially, set $C(x_i)$ of each replica $x_i$ is $\emptyset$ + After downstream phase, $f$, $C(f(x_i))$ is $C(x_i) \cup \{f\}$ + The "happens-before" relation $f \to g \Leftrightarrow C(f) \subset C(g)$ + **causal delivery** means that if $f \to g$, then $f$ is delivered before $g$ ## Operation-Based Network Assumptions + Presence of a reliable broadcast mechanism + Delivery of every update in a **delivery order** + The delivery order maintains downstream preconditions + A delivery order of causal delivery is always sufficient ## Definition of Convergence + Two replicas $x_i$ and $x_j$ of $x$ *converge eventually* if: - Safety: $\forall{i,j}: C(x_i) = C(x_j)$ implies payloads of $i$ and $j$ are equivalent - Liveness: $\forall{i,j}: f \in C(x_i)$ implies *eventually* $f \in C(x_j)$ + States are equivalent when all **query** operations return the same values ## Convergent Replicated Data Type (CvRDT) + Based on concept of **join semilattice** - A *set* of values equipped with - a *partial order* relation ($\leq$ or $\sqsubseteq$) on them - and a *least upper bound* (LUB or $\sqcup$) operation, aka *join* + LUB: if $m = x \sqcup y$ then $m$ is a LUB under $\leq$ **iff** - $x \leq m \land y \leq m$ - There is no $m' \leq m$ where $x \leq m' \land y \leq m'$ - The "smallest" $m$ in the set that is "at least as big" as both $x$ and $y$ - LUB is *commutative*, *idempotent*, and *associative* + An *ordered set* is a *join semilattice* if for all pairs, a LUB exists + For CvRDTs, **merge** is the LUB operation on the payloads + All operations are *non-decreasing* with respect to the ordering ## Example CvRDT + An integer counter; our set is $\mathbb{Z}$ + Our ordering is numeric $\leq$ + For merge, we use $max()$ + Our payload, `x`, is initialized to `0` + Operation **value** returning an integer $j$ is `let j = x` + Operation **compare(a,b)** is `a.x ` $\leq$ ` b.x` + Operation **inc** is `x := x + 1` + Operation **merge(a,b)** is `max(a,b)` ## Commutative Replicated Data Type (CmRDT) + Based on a reliable broadcast strong enough to guarantee **delivery order** + **Delivery order**, $<$, is specified per CmRDT + Some operations are not ordered; they are *concurrent*, $\|$ + $f \| g \Leftrightarrow f \nless g \land g \nless f$ + All replicas converge to the same state when: - all concurrent operations commute, so - all execution orders consistent with delivery order are equivalent + Reliable causal delivery - doesn't require agreement - is immune to partitioning (connected subsets still make deliveries) - eventually delivers all updates to all nodes + Delivery order is never stricter than causal delivery ## Relationship of CvRDT and CmRDT + State-based CvRDT are: - easy to reason about (relatively) - requires only weak network properties, no membership tracking - may be inefficient for large objects - used in NFS, AFS, Coda, Dynamo, Riak + Operation-based CmRDT are: - harder to reason about; requires reasoning about history - have greater expressive power - require an underlying reliable broadcast protocol, need membership tracking - used in Bayou, Rover, IceCube, Telex + They can always emulate one another (although not necessarily efficiently) # Portfolio of CRDTs ## Categories + Counters + Registers + Sets + Graphs + Co-op Text Editing ## State-Based Outline | **payload** *Payload type; instantiated at all replicas* | **initial** *Inital value* | **query** *Query*( *arguments* ) : *returns* | **pre** *Precondition* | **let** *Evaluate synchronously, no side effects* | **update** *Source-local operation*( *arguments* ) : *returns* | **pre** *Precondition* | **let** *Evaluate at source, synchronously* | *Side effects at source to execute synchronously* | **compare** (value1, value2) : boolean *b* | *Is value1* $\leq$ *value2 in semilattice?* | **merge** (value1, value2) : payload mergedValue | *LUB merge of value1 and value2, at any replica* ## Op-Based Outline | **payload** *Payload type; intstantiated at all replicas* | **initial** *Initial value* | **query** *Source-local operation*( *arguments* ) : *returns* | **pre** *Precondition* | **let** *Execute at source, synchronously, no side-effects* | **update** *Global update*( *arguments* ) : *returns* | **atSource**( *arguments* ) : *returns* | **pre** *Precondition at source* | **let** *1st phase: synchronous, at source, no side-effects* | **downstream** ( *arguments passed downstream* ) | **pre** *Precondition against downstream state* | *2nd phase, asynchronous, side-effects to downstream state* ## Op-based Counter | **payload** integer $i$ | **initial** $0$ | **query** *value*() : integer $j$ | **let** $j = i$ | **update** *increment*() | **downstream**() | $i := i + 1$ | **update** *decrement*() | **downstream**() | $i := i - 1$ ## G-Counter | **payload** integer[$n$] $P$ | **initial** $[0,0,\dots,0]$ | **update** *increment*() | **let** $g = myID()$ | $P[g] := P[g] + 1$ | **query** *value*() : integer $v$ | **let** $v = \sum_i{P[i]}$ | **compare**($X, Y$) : boolean $b$ | **let** $b = (\forall i \in [0, n-1] : X.P[i] \leq Y.P[i])$ | **merge**($X, Y$) : payload $Z$ | **let** $\forall i \in [0, n - 1] : Z.P[i] = max(X.P[i], Y.P[i])$ ## PN-Counter | **payload** integer[$n$] $P$, integer[$n$] $N$ | **initial** $[0,0,\dots,0], [0,0,\dots,0]$ | **update** *increment*() | **let** $g = myID()$; $P[g] := P[g] + 1$ | **update** *decrement*() | **let** $g = myID()$; $N[g] := N[g] + 1$ | **query** *value*() : integer $v$ | **let** $v = \sum_i{P[i]} - \sum_i{N[i]}$ | **compare**($X, Y$) : boolean $b$ | **let** $b = (\forall i \in [0, n-1] : X.P[i] \leq Y.P[i] \land \forall i \in [0,n - 1] : X.N[i] \leq Y.N[i])$ | **merge**($X, Y$) : payload $Z$ | **let** $\forall i \in [0, n - 1] : Z.P[i] = max(X.P[i], Y.P[i])$ | **let** $\forall i \in [0, n - 1] : Z.N[i] = max(X.N[i], Y.N[i])$ ## LWW-Register | **payload** $X$ $x$, timestamp $t$ | **initial** $\bot, 0$ | **update** *assign*( $X$ $w$ ) | $x, t := w, now()$ | **query** *value*() : $X$ $w$ | **let** $w = x$ | **compare**($R, R'$) : boolean $b$ | **let** $b = (R.t \leq R'.t)$ | **merge**($R, R'$) : payload $R''$ | **if** $R.t \leq R'.t$ **then** $R''.x, R''.t = R'.x, R'.t$ | **else** $R''.x, R''.t = R.x, R.t$ ## LWW-Register (Op-based) | **payload** $X$ $x$, timestamp $t$ | **initial** $\bot, 0$ | **query** *value*() : $X$ $w$ | **let** $w = x$ | **update** *assign*( $X$ $x'$ ) | **atSource**() $t'$ | **let** $t' = now()$ | **downstream**($x', t'$) | **if** $t < t'$ **then** $x, t := x', t'$ ## MV-Register | **payload** set $S$ of $(x, V)$ pairs; $x \in X$; $V$ its version vector | **initial** $\{(\bot, [0,\dots,0])\}$ | **query** *incVV*() : integer[$n$] $V'$ | **let** $g = myID()$ | **let** $\mathcal{V} = \{V \mid \exists x : (x,V) \in S\}$ | **let** $V' = [ max_{V \in \mathcal{V}}(V[j]) ]_{j \neq g}$ | **let** $V'[g] = max_{V \in \mathcal{V}}(V[g]) + 1$ | **update** *assign*( set $R$ ) | **let** $V = incVV()$ | $S := R \times \{V\}$ ## MV-Register (cont.) | **query** *value*() : set $S'$ | **let** $S' = S$ | **compare**($A, B$) : boolean $b$ | **let** $b = (\forall(x,V) \in A, (x', V') \in B : V \leq V')$ | **merge**($A, B$) : payload $C$ | **let** $A' = \{(x,V) \in A \mid \forall(y,W) \in B : V \| W \lor V \leq W\}$ | **let** $B' = \{(y,W) \in B \mid \forall(x,V) \in A : W \| V \lor W \leq V\}$ | **let** $C = A' \cup B'$ ## G-Set | **payload** set $A$ | **initial** $\emptyset$ | **update** *add*(element $e$) | $A := A \cup \{e\}$ | **query** *lookup*(element $e$) : boolean $b$ | **let** $b = (e \in A)$ | **compare**($S, T$) : boolean $b$ | **let** $b = (S.A \subseteq T.A)$ | **merge**($S, T$) : payload $U$ | **let** $U.A = S.A \cup T.A$ ## 2P-Set | **payload** set $A$, set $R$ | **initial** $\emptyset, \emptyset$ | **query** *lookup*(element $e$) : boolean $b$ | **let** $b = (e \in A \land e \notin R)$ | **update** *add*(element $e$) | $A := A \cup \{e\}$ | **update** *remove*(element $e$) | **pre** $lookup(e)$ | $R := R \cup \{e\}$ | **compare**($S, T$) : boolean $b$ | **let** $b = (S.A \subseteq T.A \lor S.R \subseteq T.R)$ | **merge**($S, T$) : payload $U$ | **let** $U.A = S.A \cup T.A$ | **let** $U.R = S.R \cup T.R$ ## U-Set | **payload** set $S$ | **initial** $\emptyset$ | **query** *lookup*(element $e$) : boolean $b$ | **let** $b = (e \in S)$ | **update** *add*(element $e$) | **atSource**($e$) | **pre** $e$ is unique | **downstream**($e$) | $S := S \cup \{e\}$ ## U-Set (cont.) | **update** *remove*(element $e$) | **atSource**($e$) | **pre** $lookup(e)$ | **downstream**($e$) | **pre** $add(e)$ has been delivered | $S := S \setminus \{e\}$ # Thanks!