عجفت الغور

paxos

algorithms, distributed systems

  • FLP impossiblity result means that even if one crash is tolerated in an async system, consensus cannot be reached

Tradeoffs Table

Algorithm Leader Election RTTs Voting RTTs (General Case) Voting RTTs (Optimistic Case
Fast Paxos n/a (leaderless) 2 1
multi-paxos 1 2 n/a
epaxos n/a (leaderless) 2 n/a
raft 1 2 n/a

Implementation of Paxos

  • Paxos is generally used to keep track of a log, and decide which values are appropriate to commit to a log
  • Divded into proposers, acceptors, and learners
    • Proposers send their proposals to other replica group members, and tries to have a majority of members support their proposal.
    • Acceptors are the group members who receive proposals or reject them
    • Learners keep track of accepted values, and mark them as chosen. Learners are basically voters that help consensus

Steps

  • Two steps, first a majority voting step and and commit steps.
  • The reason there’s two steps is because of the potential for a split vote, where the acceptor only accepts the first value, and no majority is chosen
    • Conflicting choices can also happen and result in ABA
    • However, two phases is not enough to enforce safety, we must have additional members
  • Proposal numbers play a vital role, acting as logical clocks.

Actual steps:

  1. Handshake phase - prepare-promise message, a prepare request is sent by each proposer to all acceptors. prepare(proposal_number) is called, everyone either accepts or rejects the proposal
    1. If the acceptor hasn’t yet promised to any other prepare request, it records the proposal number and sends a promise to the proposer
    2. If the acceptor also already promised to a pepare request with lower proposal number, it makes a promise with the newer one and breaks the promise with the already promised request.
    3. If the acceptor has already promised to a prepare request with the higher numbered proposal, it simply requests the current one
    4. If the acceptor has already accepted an accept request with a lower proposal number, it makes a promise to the current rprepare, and sends back the accepted proposal number and the corresponding value
  2. Value acceptance phase - accept-accepted messages
    • If the proposer receieves a promise from the majority of acceptors, it sends an accept request accept(proposal_number, value)
    • This includes the value so that
      1. If the proposer didn’t recieve any accepted proposals along with the promise message in response to the prepare requests, then it will propose a value of its choice
      2. If the proposer received already accepted proposals by the acceptors, along with the promise messages in response to the prepare requests, then it has to select the value with the highest proposal number
    • An acceptor, on recieving an accept request, responds to the proposer
      1. If the latest proposal number promised by the acceptor is the same as in the accept request, it will accept the request and respond to the proposer with an accepted message. If the proposer recieves majority acceptance of the proposed value, the value is chosen and broadcast to everyone
      2. If the acceptor has promised to a new proposal with the higher proposal number, it rejects the accept request, and the protol has to start again

Liveness Issues

  • Since an acceptor always promises on a proposal, it can lead to a situatiion where accept rejects sent by a proposer are always being rejected by the acceptors if there is lag.
  • If every time the acceptors recieve a prepare request with a higher proposal number before receiving an accept requests against a proposal that the acceptors promised earlier, none of the accept requests will be accepted.
  • To solve this, only single proposers are allowed to issue proposals (aka leaders)

Other Considerations

  • Why majority instead of highest number? Could result in ties
  • Why two steps? Having a majority always choose a value is bad because then you could get overmatching writes

Multipaxos

  • Basically between paxos rounds, if you have the same leader, you can just just propose and commit, since you know there’s no competing proposer

Boxwood

Chubby

  • Google locking system that predates Zookeeper
  • Used by GFS (Google File System) to appoint a manager server and stores small amounts of data
  • Used by BigTable to elect bigtable amanger, discover servers, enable clients to locate Bigtable manager, and low volume storage

Requirements

  • Provides 3 APIs: whole file reads and writes, advisory locks, and notification of events
  • Coarse grained locking service, you can lock with minimal overhead, and is a reliable low volume storage
  • Also needs to be available and reliable, and have decent throughput

Design

Chubby Cell

  • Chubby cell is multiple serbers (usually 5), which replicate with each other
  • Each server has a namespace composed of directions and files, with ACLs
  • One replica is always elected as the primary, which initiates read and write operations
  • Replicas copy the database using a consensus protocol and elect new primaries as needed
  • Clients discover the primary by asking any server within the cell which is the primary, and caches it on the client side. It uses this as the primary until the server stops being the primary
  • Two types of requests
    • Writes: propagated to all the replica servers, are async and only acked when a quorum responds
    • Reads: serviced by the primary replica
  • Each file or directory within a chubby server is knownas a node (similar to a inodes)
    • Each node has a unique name, there’s emphemeral and permanent nodes
    • Ephemeral nodes are similar to temp files, they get deleted when no client has them open
  • Uses ACLs in the directory that keeps track of authorized names

Chubby Library

  • Each client communicates with chubby cells via the client
  • Keeps track of the primary replicas to communicate with

Locking

  • multiple clients can hold a lock in reader mode, only a single client can hold a writer lock
  • Locks are adivsory, which means you don’t need a lock to read a file. Clients are required to cooperate for conflicts, but advisory locks are more scalable rather than mandatory locks, and clients can emulate strict locking easily (just have them explicitly check for locks)
  • If clients are holding a read lock, a write lock cannot be acquired

Sequencers and lock delays

  • If you hold a lock and never release it, then we have issues
  • Sequence numbers are introduced after a lock is acquired
  • Locks that are not released are expired, and we use sequence numbers to note the current state (similar to a lamport clock)
  • If a lock becomes free after an expiration, the lock server does not let any other client claim it for a specific time period. This is used to create a backoff from faulty clients claiming a lock, then releasing, then claiming it again.
  • To allow systems to know what is happening, Chubby sends events: modifying file contents, modifying nodes, replica failover, handles, locks, conflicts, etc

Caching

  • Primaries keeps a list of data that clients are caching and sends invalidations to the clients to keep them consistent
  • If data or metadata needs to be changed, the primary blocks modifications and sends invalidations to clients with the relevant data cached
  • When a client receives an invalidation, it flushes the invalid state and acks it with a keepalive
    • If there’s no acks for invalidations, then the primary keeps the node uncachable

Sessions

  • A client and a cell keep track of each other with sessions that are held with keepalives
  • A session comes with a lease, which is defined as a time period where the primary will tell the client with updates and will not terminate the connection unilaterally
  • The lease is used by the local client to know if something has gone wrong, if it’s missing keepalives
    • If a local lease times out, then the client
      • Empties and disables the cache
      • Waits for a grace period, and tries keepalives
      • If it connects it, then enable the cache
      • If nothing, then assume terminated

Failovers

  • If a node cannot communicate with the primary after the lease ends, it starts and election
  • Periodically, the primaries keep a keepalive for the leadership
  • Each cell’s primary takes snapshots and backs it up
  • Mirroring can also occur across different regions, and a mirror that cannot be accessed remains unaltered until communication is reestablished. Updated files are located by contrasting their checksums
  • Failure steps for replicas that do not recover after a few hours
    • A replacement system is used to provision a new replica
    • It initaties the lock server binary
    • DNS tables are updated
    • Current primary replica also has to have this info, which it polls for
    • Cell DB is updated
    • Replica servers update themselves with the new member
    • New server recieves information
    • Afterwards, it is permitted to vote in an election after it processes a write

Proxies and Partitioning

  • Proxies act as LB’s by allowing KeepAlives to be decreased to the main server
  • Partitioning allows us to shard chubby’s namespace, where different namespaces set different replicas

Design Decisions

  • Use a lock service for centrally managed things
  • Permit a huge number of clients to access a Chubby file without using many servers
  • Use event notification instead of polling because clients and replicas may want to know when things have changed
  • Cache the data on the client side
  • Use consistent caching because developers get confused by non-intuitive caching semantics
  • Redirecting all reads and writes via a single node was the way to provide strong consistentcy. Caches and proxies make up for the loss in R/W throughput

Differences from Boxwood

  • Chubby’s aspects (lock system, small file storage, and session/lease management) are one thing, whereas boxwood had three different peices
  • Chubby has a more advanced interface than boxwood
  • Boxwood had a 200ms lease period, whereas Chubby’s is 12
  • Chubby has a grace period to prevent losing locks

Coarse grained vs fine-grained locking

  • Coarse grained locks will need much less load on the lock server, they are rarely acquired
  • However, transferring locks from one client to another needs expensive recovery procedures
  • Fine grained locks are frequently accessed, which means availability would become critical
  • Time penalty for dropping locks would not be severe, since locks are only held for a short period

Zookeeper

API

  • create(path, data[], mode, flag)
    • Creates a znode, like an inode, but a unit of abstraction to lock on it
  • setData(path, data[], version)
    • sets the data[] in the znode at the specified flag
  • getData(path, watch)
    • returns data[], also allows clients to watch
  • getChildren(path, watch)
    • Returns all the children names of the znode at the specified path
  • exists(path, watch)
    • Checks whether a znode exists
  • delete(path, version)
    • Deletes a znode. Must match the monotonically increasing versions

Server

  • Fully replicated, like etcd, into a set called a ZooKeeper ensemble
  • Elected leaders and others become followers
  • Unlike chubby, clients can connect to any
    • Higher availablity, not as strong consistency
    • Use sync() to assure that you have the highest consistency
  • Leader broadcasts operations to the followers, and performs write operation on the coordination data placed in its memory
  • Follower can also recieve and respond to write requests. Multiple writes can be batched, but only the request needs to forwarded to the leader, the leader broadcasts all the requests to other followers
    • Basically a follower gets a write, sends a request to the leader, the leader broadcasts the state to everyone after it’s done

Replicated Database

  • In memory copy of the database so that reads and writes can be done locally
  • ZK takes periodic snapshots of all the delivered messages as a WAL
    • Snapshots are fuzzy
    • Enables at-most-once execution
    • If a server dies before the next snapshot is taken, it does a depth first scan of the tree to read every znode’s metadata and data atomically then write that metadata and data from disk, extracted from the WAL
      • This is why ZK recovery is slow

Atomic Broadcast

  • Used by ZK to broadcast the write request to the replicated database on all servers
    • Followers forward writes to leaders
  • Uses the ZAB protocol
    • Two modes
      1. Broadcast is used to send messages
      2. Recovery is used to syncronize
    • Default is \(2f + 1\) for quorum where f is the number of faults
  • When a leader dies, the new leader is elected and ensures all the updates from the previous leader are incorporated into its replicated db before it broadcasts its own requests
  • At every transaction, the leader is broadcasting what it is working on into a replicated queue

Request Processor

  • Manager that keeps the transactions atomic, only the leader uses this
  • Transactions within ZK are idempotent
  • All requests are linearized on the leader, so the leader converts everything into a setDataTXN

Client-server interactino

  • getData() and getChildren() can both be performed locally without notifying others
  • Servers generate zxid for every read request and retrieves the most recently updated state of the server’s data
  • Writes have multistep
    1. Write the data
    2. Notify the client(s) who have set watches on that data
    3. Sent the write request to the leader so it can be updated in the replicated database
    4. Data is replicated among all connected servers in the ensemble
  • Everything is FIFO, including read requests when there’s a write
  • However if a read is in progress, multiple readers can read in parallel
  • Like a giant R/W lock
  • Default is async transmission of data, which comes at the cost of insconsistent or stale reads
  • the zxid allows the client to maintain a consistent view across different servers
  • Session timeouts and heartbeats are used to maintain connections from servers to clients for watches, which is decided by the leader

Primitives

  • https://zookeeper.apache.org/doc/r3.4.2/recipes.pdf

  • Config management and service discovery

    newZnode = create("/config/port", 8090, EMPHEMERAL) # set a port
    getData("/config/port", true) # get a watch
    setData("/config/port", value, version) # new version
    
  • Rendezvous

    newZnode = create("/rendezvous/candidateOne", "", EMPHEMERAL) # create a znode to write
    getData("/rendezvous/candidateOne", true) # get a watch
    setData("/rendevous/candidateOne", {"10.0.0.1", 8080})
    
  • Group Membership

    newZnode = create("/groups/member0001", EMPHEMERAL)
    newZnode = create("/groups/member0001/processOne", EMPHERAL,SEQUENTIAL) # set a sequential flag to generate a unique name
    childrenList = getChildren(/groups/member0001", true)
    
  • Locks

    newLock = create("/locks/lock-1", EMPHEMERAL)
    exists("/locks", true) # watch the lock tree
    delete(newLock) # to release the lock
    
    • issues
      • herding: when a lock is released, many clients stampede to get the lock

        • Solved with using SEQUENTIAL, at which point the lowest sequence number will hold the lock
          newZnode = create(path + "/lock-", EPHEMERAL, SEQUENTIAL)
          do
              childrenList = getChildren(path, false)
              if newZnode is lowest znode in children
                  exit
              newPath = znode in childrenList ordered just before newZnode
              if exists(newPath, true)
                  wait for event
          while true
          
      • exclusive lock only

        • Can implement R/W locks with
          newZnode = create(path + "/write-", EPHEMERAL, SEQUENTIAL)
          do
              childrenList = getChildren(path, false)
              if newZnode is lowest znode in children
                  exit
              newPath = znode in childrenList ordered just before newZnode
              if exists(newPath, true)
                  wait for event
          while true
          
        • create a read lock and then reads can happen with other reads, but blocks on writes, and writes block on reads
    • Barriers
      newZnode = create(path + "/" + processName)
      exists(path + "/ready", true)
      newZnodeChild = create(newZnode, EPHEMERAL)
      childrenList = getChildren(path, false)
      if fewer children in childrenList than barrierThreshold
        wait for watch event
      else
        create(path + "/ready", REGULAR)
      

Performance

  • Better load distribution than chubby

  • Atomic broadcast decreases performance, has a CPU cost

  • Failures of leaders will grind application to zero while it recovers

  • Solves chubby like problems with different tradeoffs, gives up consistency for a bit of throughput