| Copyright 2014 Cloudera, Inc. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| |
| This document introduces how Kudu will handle log replication and consistency |
| using an algorithm known as Viewstamped Replication (VS) and a series of |
| practical algorithms/techniques for recovery, reconfiguration, compactions etc. |
| This document introduces all the concepts directly related to Kudu, for any |
| missing information please refer to the original papers [1,3,4]. |
| |
| Quorums, in Kudu, are a set of collaborating processes that serve the purpose |
| of keeping a consistent, replicated log of operations on a given data set, e.g. |
| a tablet. This replicated consistent log, also plays the role of the Write |
| Ahead Log (WAL) for the tablet. Throughout this document we use config |
| participant and process interchangeably, these do not represent machines or OS |
| processes, as machines and or application daemons will participate in multiple |
| configs. |
| |
| ============================================================ |
| The write ahead log (WAL) |
| ============================================================ |
| |
| The WAL provides strict ordering and durability guarantees: |
| |
| 1) If calls to Reserve() are externally synchronized, the order in |
| which entries had been reserved will be the order in which they will |
| be committed to disk. |
| |
| 2) If fsync is enabled (via the 'log_force_fsync_all' flag -- see |
| log_util.cc; note: this is _DISABLED_ by default), then every single |
| transaction is guaranteed to be synchronized to disk before its |
| execution is deemed successful. |
| |
| Log uses group commit to increase performance primarily by allowing |
| throughput to scale with the number of writer threads while |
| maintaining close to constant latency. |
| |
| ============================================================ |
| Basic WAL usage |
| ============================================================ |
| |
| To add operations to the log, the caller must obtain the lock, and |
| call Reserve() with a collection of operations and pointer to the |
| reserved entry (the latter being an out parameter). Then, the caller |
| may release the lock and call the AsyncAppend() method with the |
| reserved entry and a callback that will be invoked upon completion of |
| the append. AsyncAppend method performs serialization and copying |
| outside of the lock. |
| |
| For sample usage see local_consensus.cc and mt-log-test.cc. |
| |
| ============================================================= |
| Group commit implementation details |
| ============================================================= |
| |
| Currently, the group implementation uses a blocking queue (see |
| Log::entry_queue_ in log.h) and a separate long-running thread (see |
| Log::AppendThread in log.cc). Since access to the queue is |
| synchronized via a lock and only a single thread removes the queue, |
| the order in which the elements are added to the queue will be the |
| same as the order in which the elements are removed from the queue. |
| |
| The size of the queue is currently based on the number of entries, but |
| this will eventually be changed to be based on size of all queued |
| entries in bytes. |
| |
| ============================================================= |
| Reserving a slot for the entry |
| ============================================================= |
| |
| Currently Reserve() allocates memory for a new entry on the heap each |
| time, marks the entry internally as "reserved" via a state enum, and |
| adds it to the above-mentioned queue. In the future, a ring-buffer or |
| another similar data structure could be used that would take the place |
| of the queue and make allocation unnecessary. |
| |
| ============================================================ |
| Copying the entry contents to the reserved slot |
| ============================================================ |
| |
| AsyncAppend() serializes the contents of the entry to a buffer field |
| in the entry object (currently the buffer is allocated at the same |
| time as the entry itself); this avoids contention that would occur if |
| a shared buffer was to be used. |
| |
| ============================================================ |
| Synchronizing the entry contents to disk |
| ============================================================ |
| |
| A separate appender thread waits until entries are added to the |
| queue. Once the queue is no longer empty, the thread grabs all |
| elements on the queue. Then for each dequeued entry, the appender |
| waits until the entry is marked ready (see "Copying the entry contents |
| to the reserved slot" above) and then appends the entry to the current |
| log segment without synchronizing the underlying file with filesystem |
| (env::WritableFile::Append()) |
| |
| Note: this could be further optimized by calling AppendVector() with a |
| vector of buffers from all of the consumed entries. |
| |
| Once all entries are successfully appended, the appender thread syncs |
| the file to disk (env::WritableFile::Sync()) and (again) waits until |
| more entries are added to the queue, or until the queue or the |
| appender thread are shut down. |
| |
| ============================================================ |
| Log segment files and asynchronous preallocation |
| ============================================================ |
| |
| Log uses PosixWritableFile() for underlying storage. If preallocation |
| is enabled ('--log_preallocate_segments' flag, defined in log_util.cc, |
| true by default), then whenever a new segment is created, the |
| underlying file is preallocated to a certain size in megabytes |
| ('--log_segment_size_mb', defined in log_util.cc, default 64). While |
| the offset in the segment file is below the preallocated length, |
| the cheaper fdatasync() operation is used instead of fsync(). |
| |
| When the size of the current segment exceeds the preallocated size, a |
| task is launched in a separate thread that begins preallocating the |
| underlying file for the new log segment; meanwhile, until the task |
| finishes, appends still go to the existing file. Once the new file is |
| preallocated, it is renamed to the correct name for the next segment |
| and is swapped in place of the current segment. |
| |
| When the current segment is closed without reaching the preallocated |
| size, the underlying file is truncated to the last written offset |
| (i.e., the actual size). |
| |
| ============================================================ |
| Quorums and roles within configs |
| ============================================================ |
| |
| A config in Kudu is a fault-tolerant, consistent unit that serves requests for |
| a single tablet. As long as there are 2f+1 participants available in a config, |
| where f is the number of possibly faulty participants, the config will keep |
| serving requests for its tablet and it is guaranteed that clients perceive a |
| fully consistent, linearizable view of both data and operations on that data. |
| The f parameter, defined table wide through configuration implicitly |
| defines the size of the config, f=0 indicates a single node config, f=1 |
| indicates a 3 node config, f=2 indicates a 5 node config, etc.. Quorums may |
| overlap in the sense that each physical machine may be participating in |
| multiple configs, usually one per each tablet that it serves. |
| |
| Within a single config, in steady state, i.e. when no peer is faulty, there |
| are two main types of peers. The leader peer and the follower peers. |
| The leader peer dictates the serialization of the operations throughout the |
| config, its version of the sequence of data altering requests is the "truth" |
| and any data altering request is only considered final (i.e. can be |
| acknowledged to the client as successful) when a majority of the config |
| acknowledges that they "agree" with the leader's view of the event order. |
| In practice this means that all write requests are sent directly to the |
| leader, which then replicates them to a majority of the followers before |
| sending an ACK to the client. Follower peers are completely passive in |
| steady state, only receiving data from the leader and acknowledging back. |
| Follower peers only become active when the leader process stops and one |
| of the followers (if there are any) must be elected leader. |
| |
| Participants in a config may be assigned the following roles: |
| |
| LEADER - The current leader of the config, receives requests from clients |
| and serializes them to other nodes. |
| |
| FOLLOWER - Active participants in the config, whose votes count towards |
| majority, replication count etc. |
| |
| LEARNER - Passive participants in the config, whose votes do not count |
| towards majority or replication count. New nodes joining the config |
| will have this role until they catch up and can be promoted to FOLLOWER. |
| |
| NON_PARTICIPANT - A peer that does not participate in a particular |
| config. Mostly used to mark prior participants that stopped being so |
| on a configuration change. |
| |
| The following diagram illustrates the possible state changes: |
| |
| +------------+ |
| | NON_PART +---+ |
| +-----+------+ | |
| Exist. RaftConfig? | | |
| +-----v------+ | |
| | LEARNER + | New RaftConfig? |
| +-----+------+ | |
| | | |
| +-----v------+ | |
| +-->+ FOLLOW. +<--+ |
| | +-----+------+ |
| | | |
| | +-----v------+ |
| Step Down +<--+ CANDIDATE | |
| ^ +-----+------+ |
| | | |
| | +-----v------+ |
| +<--+ LEADER | |
| +------------+ |
| |
| Additionally all states can transition to NON_PARTICIPANT, on configuration |
| changes and/or peer timeout/death. |
| |
| ============================================================ |
| Assembling/Rebooting a RaftConfig and RaftConfig States |
| ============================================================ |
| |
| Prior to starting/rebooting a peer, the state in WAL must have been replayed |
| in a bootstrap phase. This process will yield an up-to-date Log and Tablet. |
| The new/rebooting peer is then Init()'ed with this Log. The Log is queried |
| for the last committed configuration entry (A Raft configuration consists of |
| a set of peers (uuid and last known address) and hinted* roles). If there is |
| none, it means this is a new config. |
| |
| After the peer has been Init()'ed, Start(Configuration) is called. The provided |
| configuration is a hint which is only taken into account if there was no previous |
| configuration*. |
| |
| Independently of whether the configuration is a new one (new config) |
| or an old one (rebooting config), the config cannot start until a |
| leader has been elected and replicates the configuration through |
| consensus. This ensures that a majority of nodes agree that this is |
| the most recent configuration. |
| |
| The provided configuration will always specify a leader -- in the case |
| of a new config, it is chosen by the master, and in the case of a |
| rebooted one, it is the configuration that was active before the node |
| crashed. In either case, replicating this initial configuration |
| entry happens in the exact same way as any other config entry, |
| i.e. the LEADER will try and replicate it to FOLLOWERS. As usual if |
| the LEADER fails, leader election is triggered and the new LEADER will |
| try to replicate a new configuration. |
| |
| Only after the config has successfully replicated the initial configuration |
| entry is the config ready to accept writes. |
| |
| |
| Peers in the config can therefore be in the following states: |
| |
| BOOTSTRAPPING - The phase prior to initialization where the Log is being |
| replayed. If a majority of peers are still BOOTSTRAPPING, the config doesn't |
| exist yet. |
| |
| CONFIGURING: Until the current configuration is pushed though consensus. This |
| is true for both new configs and rebooting configs. The peers do not accept |
| client requests in this state. In this state, the Leader tries to replicate |
| the configuration. Followers run failure detection and trigger leader election |
| if the hinted leader doesn't successfully replicate within the configured |
| timeout period. |
| |
| RUNNING: The LEADER peer accepts writes and replicates them through consensus. |
| FOLLOWER replicas accepts writes from the leader and ACK. |
| |
| * The configuration provided on Start() can only be taken into account if there |
| is an appropriate leader election algorithm. This can be added later but is not |
| present in the initial implementation. Roles are hinted in the sense that the |
| config initiator (usually the master) might hint what the roles for the peers |
| in the config should be, but the config is the ultimate decider on whether that |
| is possible or not. |
| |
| ============================================================ |
| References |
| ============================================================ |
| [1] http://ramcloud.stanford.edu/raft.pdf |
| |
| [2] http://www.cs.berkeley.edu/~brewer/cs262/Aries.pdf |
| |
| [3] Viewstamped Replication: A New Primary Copy Method to Support |
| Highly-Available Distributed Systems. B. Oki, B. Liskov |
| http://www.pmg.csail.mit.edu/papers/vr.pdf |
| |
| [4] Viewstamped Replication Revisited. B. Liskov and J. Cowling |
| http://pmg.csail.mit.edu/papers/vr-revisited.pdf |
| |
| [5] Aether: A Scalable Approach to logging |
| http://infoscience.epfl.ch/record/149436/files/vldb10aether.pdf |