| //! Represents the storage trait and example implementation. |
| //! |
| //! The storage trait is used to house and eventually serialize the state of the system. |
| //! Custom implementations of this are normal and this is likely to be a key integration |
| //! point for your distributed storage. |
| |
| // Copyright 2016 PingCAP, Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // Copyright 2015 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| use std::prelude::v1::*; |
| use std::sync::{Arc, SgxRwLock, SgxRwLockReadGuard, SgxRwLockWriteGuard}; |
| |
| use eraftpb::{ConfState, Entry, HardState, Snapshot}; |
| |
| use errors::{Error, Result, StorageError}; |
| use util; |
| |
| /// Holds both the hard state (commit index, vote leader, term) and the configuration state |
| /// (Current node IDs) |
| #[derive(Debug, Clone)] |
| pub struct RaftState { |
| /// Contains the last meta information including commit index, the vote leader, and the vote term. |
| pub hard_state: HardState, |
| /// Records the current node IDs like `[1, 2, 3]` in the cluster. Every Raft node must have a unique ID in the cluster; |
| pub conf_state: ConfState, |
| } |
| |
| /// Storage saves all the information about the current Raft implementation, including Raft Log, commit index, the leader to vote for, etc. |
| /// Pay attention to what is returned when there is no Log but it needs to get the `term` at index `first_index() - 1`. To solve this, you can use a dummy Log entry to keep the last truncated Log entry. See [`entries: vec![Entry::new()]`](src/storage.rs#L85) as a reference. |
| /// |
| /// If any Storage method returns an error, the raft instance will |
| /// become inoperable and refuse to paticipate in elections; the |
| /// application is responsible for cleanup and recovery in this case. |
| pub trait Storage { |
| /// `initial_state` is called when Raft is initialized. This interface will return a `RaftState` which contains `HardState` and `ConfState`; |
| fn initial_state(&self) -> Result<RaftState>; |
| /// Returns a slice of log entries in the range `[low, high)`. |
| /// max_size limits the total size of the log entries returned, but |
| /// entries returns at least one entry if any. |
| fn entries(&self, low: u64, high: u64, max_size: u64) -> Result<Vec<Entry>>; |
| /// Returns the term of entry idx, which must be in the range |
| /// [first_index()-1, last_index()]. The term of the entry before |
| /// first_index is retained for matching purpose even though the |
| /// rest of that entry may not be available. |
| fn term(&self, idx: u64) -> Result<u64>; |
| /// Returns the index of the first log entry that is |
| /// possible available via entries (older entries have been incorporated |
| /// into the latest snapshot; if storage only contains the dummy entry the |
| /// first log entry is not available). |
| fn first_index(&self) -> Result<u64>; |
| /// The index of the last entry in the log. |
| fn last_index(&self) -> Result<u64>; |
| /// Returns the most recent snapshot. |
| /// |
| /// If snapshot is temporarily unavailable, it should return SnapshotTemporarilyUnavailable, |
| /// so raft state machine could know that Storage needs some time to prepare |
| /// snapshot and call snapshot later. |
| fn snapshot(&self) -> Result<Snapshot>; |
| } |
| |
| /// The Memory Storage Core instance holds the actual state of the storage struct. To access this |
| /// value, use the `rl` and `wl` functions on the main MemStorage implementation. |
| pub struct MemStorageCore { |
| hard_state: HardState, |
| snapshot: Snapshot, |
| // TODO: maybe vec_deque |
| // entries[i] has raft log position i+snapshot.get_metadata().get_index() |
| entries: Vec<Entry>, |
| } |
| |
| impl Default for MemStorageCore { |
| fn default() -> MemStorageCore { |
| MemStorageCore { |
| // When starting from scratch populate the list with a dummy entry at term zero. |
| entries: vec![Entry::new()], |
| hard_state: HardState::new(), |
| snapshot: Snapshot::new(), |
| } |
| } |
| } |
| |
| impl MemStorageCore { |
| /// Saves the current HardState. |
| pub fn set_hardstate(&mut self, hs: HardState) { |
| self.hard_state = hs; |
| } |
| |
| fn inner_last_index(&self) -> u64 { |
| self.entries[0].get_index() + self.entries.len() as u64 - 1 |
| } |
| |
| /// Overwrites the contents of this Storage object with those of the given snapshot. |
| pub fn apply_snapshot(&mut self, snapshot: Snapshot) -> Result<()> { |
| // handle check for old snapshot being applied |
| let index = self.snapshot.get_metadata().get_index(); |
| let snapshot_index = snapshot.get_metadata().get_index(); |
| if index >= snapshot_index { |
| return Err(Error::Store(StorageError::SnapshotOutOfDate)); |
| } |
| |
| let mut e = Entry::new(); |
| e.set_term(snapshot.get_metadata().get_term()); |
| e.set_index(snapshot.get_metadata().get_index()); |
| self.entries = vec![e]; |
| self.snapshot = snapshot; |
| Ok(()) |
| } |
| |
| /// Makes a snapshot which can be retrieved with snapshot() and |
| /// can be used to reconstruct the state at that point. |
| /// If any configuration changes have been made since the last compaction, |
| /// the result of the last apply_conf_change must be passed in. |
| pub fn create_snapshot( |
| &mut self, |
| idx: u64, |
| cs: Option<ConfState>, |
| data: Vec<u8>, |
| ) -> Result<&Snapshot> { |
| if idx <= self.snapshot.get_metadata().get_index() { |
| return Err(Error::Store(StorageError::SnapshotOutOfDate)); |
| } |
| |
| let offset = self.entries[0].get_index(); |
| if idx > self.inner_last_index() { |
| panic!( |
| "snapshot {} is out of bound lastindex({})", |
| idx, |
| self.inner_last_index() |
| ) |
| } |
| self.snapshot.mut_metadata().set_index(idx); |
| self.snapshot |
| .mut_metadata() |
| .set_term(self.entries[(idx - offset) as usize].get_term()); |
| if let Some(cs) = cs { |
| self.snapshot.mut_metadata().set_conf_state(cs) |
| } |
| self.snapshot.set_data(data); |
| Ok(&self.snapshot) |
| } |
| |
| /// Discards all log entries prior to compact_index. |
| /// It is the application's responsibility to not attempt to compact an index |
| /// greater than RaftLog.applied. |
| pub fn compact(&mut self, compact_index: u64) -> Result<()> { |
| let offset = self.entries[0].get_index(); |
| if compact_index <= offset { |
| return Err(Error::Store(StorageError::Compacted)); |
| } |
| if compact_index > self.inner_last_index() { |
| panic!( |
| "compact {} is out of bound lastindex({})", |
| compact_index, |
| self.inner_last_index() |
| ) |
| } |
| |
| let i = (compact_index - offset) as usize; |
| let entries = self.entries.drain(i..).collect(); |
| self.entries = entries; |
| Ok(()) |
| } |
| |
| /// Append the new entries to storage. |
| /// TODO: ensure the entries are continuous and |
| /// entries[0].get_index() > self.entries[0].get_index() |
| pub fn append(&mut self, ents: &[Entry]) -> Result<()> { |
| if ents.is_empty() { |
| return Ok(()); |
| } |
| let first = self.entries[0].get_index() + 1; |
| let last = ents[0].get_index() + ents.len() as u64 - 1; |
| |
| if last < first { |
| return Ok(()); |
| } |
| // truncate compacted entries |
| let te: &[Entry] = if first > ents[0].get_index() { |
| let start_ent = (first - ents[0].get_index()) as usize; |
| &ents[start_ent..] |
| } else { |
| ents |
| }; |
| |
| let offset = te[0].get_index() - self.entries[0].get_index(); |
| if self.entries.len() as u64 > offset { |
| let mut new_entries: Vec<Entry> = vec![]; |
| new_entries.extend_from_slice(&self.entries[..offset as usize]); |
| new_entries.extend_from_slice(te); |
| self.entries = new_entries; |
| } else if self.entries.len() as u64 == offset { |
| self.entries.extend_from_slice(te); |
| } else { |
| panic!( |
| "missing log entry [last: {}, append at: {}]", |
| self.inner_last_index(), |
| te[0].get_index() |
| ) |
| } |
| |
| Ok(()) |
| } |
| } |
| |
| /// `MemStorage` is a thread-safe implementation of Storage trait. |
| /// It is mainly used for test purpose. |
| #[derive(Clone, Default)] |
| pub struct MemStorage { |
| core: Arc<SgxRwLock<MemStorageCore>>, |
| } |
| |
| impl MemStorage { |
| /// Returns a new memory storage value. |
| pub fn new() -> MemStorage { |
| MemStorage { |
| ..Default::default() |
| } |
| } |
| |
| /// Opens up a read lock on the storage and returns a guard handle. Use this |
| /// with functions that don't require mutation. |
| pub fn rl(&self) -> SgxRwLockReadGuard<MemStorageCore> { |
| self.core.read().unwrap() |
| } |
| |
| /// Opens up a write lock on the storage and returns guard handle. Use this |
| /// with functions that take a mutable reference to self. |
| pub fn wl(&self) -> SgxRwLockWriteGuard<MemStorageCore> { |
| self.core.write().unwrap() |
| } |
| } |
| |
| impl Storage for MemStorage { |
| /// Implements the Storage trait. |
| fn initial_state(&self) -> Result<RaftState> { |
| let core = self.rl(); |
| Ok(RaftState { |
| hard_state: core.hard_state.clone(), |
| conf_state: core.snapshot.get_metadata().get_conf_state().clone(), |
| }) |
| } |
| |
| /// Implements the Storage trait. |
| fn entries(&self, low: u64, high: u64, max_size: u64) -> Result<Vec<Entry>> { |
| let core = self.rl(); |
| let offset = core.entries[0].get_index(); |
| if low <= offset { |
| return Err(Error::Store(StorageError::Compacted)); |
| } |
| |
| if high > core.inner_last_index() + 1 { |
| panic!("index out of bound") |
| } |
| // only contains dummy entries. |
| if core.entries.len() == 1 { |
| return Err(Error::Store(StorageError::Unavailable)); |
| } |
| |
| let lo = (low - offset) as usize; |
| let hi = (high - offset) as usize; |
| let mut ents = core.entries[lo..hi].to_vec(); |
| util::limit_size(&mut ents, max_size); |
| Ok(ents) |
| } |
| |
| /// Implements the Storage trait. |
| fn term(&self, idx: u64) -> Result<u64> { |
| let core = self.rl(); |
| let offset = core.entries[0].get_index(); |
| if idx < offset { |
| return Err(Error::Store(StorageError::Compacted)); |
| } |
| if idx - offset >= core.entries.len() as u64 { |
| return Err(Error::Store(StorageError::Unavailable)); |
| } |
| Ok(core.entries[(idx - offset) as usize].get_term()) |
| } |
| |
| /// Implements the Storage trait. |
| fn first_index(&self) -> Result<u64> { |
| let core = self.rl(); |
| Ok(core.entries[0].get_index() + 1) |
| } |
| |
| /// Implements the Storage trait. |
| fn last_index(&self) -> Result<u64> { |
| let core = self.rl(); |
| Ok(core.inner_last_index()) |
| } |
| |
| /// Implements the Storage trait. |
| fn snapshot(&self) -> Result<Snapshot> { |
| let core = self.rl(); |
| Ok(core.snapshot.clone()) |
| } |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use eraftpb::{ConfState, Entry, Snapshot}; |
| use protobuf; |
| |
| use errors::{Error as RaftError, StorageError}; |
| use setup_for_test; |
| use storage::{MemStorage, Storage}; |
| |
| // TODO extract these duplicated utility functions for tests |
| |
| fn new_entry(index: u64, term: u64) -> Entry { |
| let mut e = Entry::new(); |
| e.set_term(term); |
| e.set_index(index); |
| e |
| } |
| |
| fn size_of<T: protobuf::Message>(m: &T) -> u32 { |
| m.compute_size() |
| } |
| |
| fn new_snapshot(index: u64, term: u64, nodes: Vec<u64>, data: Vec<u8>) -> Snapshot { |
| let mut s = Snapshot::new(); |
| s.mut_metadata().set_index(index); |
| s.mut_metadata().set_term(term); |
| s.mut_metadata().mut_conf_state().set_nodes(nodes); |
| s.set_data(data); |
| s |
| } |
| |
| #[test] |
| fn test_storage_term() { |
| setup_for_test(); |
| let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)]; |
| let mut tests = vec![ |
| (2, Err(RaftError::Store(StorageError::Compacted))), |
| (3, Ok(3)), |
| (4, Ok(4)), |
| (5, Ok(5)), |
| (6, Err(RaftError::Store(StorageError::Unavailable))), |
| ]; |
| |
| for (i, (idx, wterm)) in tests.drain(..).enumerate() { |
| let storage = MemStorage::new(); |
| storage.wl().entries = ents.clone(); |
| |
| let t = storage.term(idx); |
| if t != wterm { |
| panic!("#{}: expect res {:?}, got {:?}", i, wterm, t); |
| } |
| } |
| } |
| |
| #[test] |
| fn test_storage_entries() { |
| setup_for_test(); |
| let ents = vec![ |
| new_entry(3, 3), |
| new_entry(4, 4), |
| new_entry(5, 5), |
| new_entry(6, 6), |
| ]; |
| let max_u64 = u64::max_value(); |
| let mut tests = vec![ |
| ( |
| 2, |
| 6, |
| max_u64, |
| Err(RaftError::Store(StorageError::Compacted)), |
| ), |
| ( |
| 3, |
| 4, |
| max_u64, |
| Err(RaftError::Store(StorageError::Compacted)), |
| ), |
| (4, 5, max_u64, Ok(vec![new_entry(4, 4)])), |
| (4, 6, max_u64, Ok(vec![new_entry(4, 4), new_entry(5, 5)])), |
| ( |
| 4, |
| 7, |
| max_u64, |
| Ok(vec![new_entry(4, 4), new_entry(5, 5), new_entry(6, 6)]), |
| ), |
| // even if maxsize is zero, the first entry should be returned |
| (4, 7, 0, Ok(vec![new_entry(4, 4)])), |
| // limit to 2 |
| ( |
| 4, |
| 7, |
| u64::from(size_of(&ents[1]) + size_of(&ents[2])), |
| Ok(vec![new_entry(4, 4), new_entry(5, 5)]), |
| ), |
| ( |
| 4, |
| 7, |
| u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3]) / 2), |
| Ok(vec![new_entry(4, 4), new_entry(5, 5)]), |
| ), |
| ( |
| 4, |
| 7, |
| u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3]) - 1), |
| Ok(vec![new_entry(4, 4), new_entry(5, 5)]), |
| ), |
| // all |
| ( |
| 4, |
| 7, |
| u64::from(size_of(&ents[1]) + size_of(&ents[2]) + size_of(&ents[3])), |
| Ok(vec![new_entry(4, 4), new_entry(5, 5), new_entry(6, 6)]), |
| ), |
| ]; |
| for (i, (lo, hi, maxsize, wentries)) in tests.drain(..).enumerate() { |
| let storage = MemStorage::new(); |
| storage.wl().entries = ents.clone(); |
| let e = storage.entries(lo, hi, maxsize); |
| if e != wentries { |
| panic!("#{}: expect entries {:?}, got {:?}", i, wentries, e); |
| } |
| } |
| } |
| |
| #[test] |
| fn test_storage_last_index() { |
| setup_for_test(); |
| let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)]; |
| let storage = MemStorage::new(); |
| storage.wl().entries = ents; |
| |
| let wresult = Ok(5); |
| let result = storage.last_index(); |
| if result != wresult { |
| panic!("want {:?}, got {:?}", wresult, result); |
| } |
| |
| storage |
| .wl() |
| .append(&[new_entry(6, 5)]) |
| .expect("append failed"); |
| let wresult = Ok(6); |
| let result = storage.last_index(); |
| if result != wresult { |
| panic!("want {:?}, got {:?}", wresult, result); |
| } |
| } |
| |
| #[test] |
| fn test_storage_first_index() { |
| setup_for_test(); |
| let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)]; |
| let storage = MemStorage::new(); |
| storage.wl().entries = ents; |
| |
| let wresult = Ok(4); |
| let result = storage.first_index(); |
| if result != wresult { |
| panic!("want {:?}, got {:?}", wresult, result); |
| } |
| |
| storage.wl().compact(4).expect("compact failed"); |
| let wresult = Ok(5); |
| let result = storage.first_index(); |
| if result != wresult { |
| panic!("want {:?}, got {:?}", wresult, result); |
| } |
| } |
| |
| #[test] |
| fn test_storage_compact() { |
| setup_for_test(); |
| let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)]; |
| let mut tests = vec![ |
| (2, Err(RaftError::Store(StorageError::Compacted)), 3, 3, 3), |
| (3, Err(RaftError::Store(StorageError::Compacted)), 3, 3, 3), |
| (4, Ok(()), 4, 4, 2), |
| (5, Ok(()), 5, 5, 1), |
| ]; |
| for (i, (idx, wresult, windex, wterm, wlen)) in tests.drain(..).enumerate() { |
| let storage = MemStorage::new(); |
| storage.wl().entries = ents.clone(); |
| |
| let result = storage.wl().compact(idx); |
| if result != wresult { |
| panic!("#{}: want {:?}, got {:?}", i, wresult, result); |
| } |
| let index = storage.wl().entries[0].get_index(); |
| if index != windex { |
| panic!("#{}: want {}, index {}", i, windex, index); |
| } |
| let term = storage.wl().entries[0].get_term(); |
| if term != wterm { |
| panic!("#{}: want {}, term {}", i, wterm, term); |
| } |
| let len = storage.wl().entries.len(); |
| if len != wlen { |
| panic!("#{}: want {}, term {}", i, wlen, len); |
| } |
| } |
| } |
| |
| #[test] |
| fn test_storage_create_snapshot() { |
| setup_for_test(); |
| let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)]; |
| let nodes = vec![1, 2, 3]; |
| let mut cs = ConfState::new(); |
| cs.set_nodes(nodes.clone()); |
| let data = b"data".to_vec(); |
| |
| let mut tests = vec![ |
| (4, Ok(new_snapshot(4, 4, nodes.clone(), data.clone()))), |
| (5, Ok(new_snapshot(5, 5, nodes.clone(), data.clone()))), |
| ]; |
| for (i, (idx, wresult)) in tests.drain(..).enumerate() { |
| let storage = MemStorage::new(); |
| storage.wl().entries = ents.clone(); |
| |
| storage |
| .wl() |
| .create_snapshot(idx, Some(cs.clone()), data.clone()) |
| .expect("create snapshot failed"); |
| let result = storage.snapshot(); |
| if result != wresult { |
| panic!("#{}: want {:?}, got {:?}", i, wresult, result); |
| } |
| } |
| } |
| |
| #[test] |
| fn test_storage_append() { |
| setup_for_test(); |
| let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)]; |
| let mut tests = vec![ |
| ( |
| vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)], |
| Ok(()), |
| vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)], |
| ), |
| ( |
| vec![new_entry(3, 3), new_entry(4, 6), new_entry(5, 6)], |
| Ok(()), |
| vec![new_entry(3, 3), new_entry(4, 6), new_entry(5, 6)], |
| ), |
| ( |
| vec![ |
| new_entry(3, 3), |
| new_entry(4, 4), |
| new_entry(5, 5), |
| new_entry(6, 5), |
| ], |
| Ok(()), |
| vec![ |
| new_entry(3, 3), |
| new_entry(4, 4), |
| new_entry(5, 5), |
| new_entry(6, 5), |
| ], |
| ), |
| // truncate incoming entries, truncate the existing entries and append |
| ( |
| vec![new_entry(2, 3), new_entry(3, 3), new_entry(4, 5)], |
| Ok(()), |
| vec![new_entry(3, 3), new_entry(4, 5)], |
| ), |
| // truncate the existing entries and append |
| ( |
| vec![new_entry(4, 5)], |
| Ok(()), |
| vec![new_entry(3, 3), new_entry(4, 5)], |
| ), |
| // direct append |
| ( |
| vec![new_entry(6, 6)], |
| Ok(()), |
| vec![ |
| new_entry(3, 3), |
| new_entry(4, 4), |
| new_entry(5, 5), |
| new_entry(6, 6), |
| ], |
| ), |
| ]; |
| for (i, (entries, wresult, wentries)) in tests.drain(..).enumerate() { |
| let storage = MemStorage::new(); |
| storage.wl().entries = ents.clone(); |
| |
| let result = storage.wl().append(&entries); |
| if result != wresult { |
| panic!("#{}: want {:?}, got {:?}", i, wresult, result); |
| } |
| let e = &storage.wl().entries; |
| if *e != wentries { |
| panic!("#{}: want {:?}, entries {:?}", i, wentries, e); |
| } |
| } |
| } |
| |
| #[test] |
| fn test_storage_apply_snapshot() { |
| setup_for_test(); |
| let nodes = vec![1, 2, 3]; |
| let data = b"data".to_vec(); |
| |
| let snapshots = vec![ |
| new_snapshot(4, 4, nodes.clone(), data.clone()), |
| new_snapshot(3, 3, nodes.clone(), data.clone()), |
| ]; |
| |
| let storage = MemStorage::new(); |
| |
| // Apply snapshot successfully |
| let i = 0; |
| let wresult = Ok(()); |
| let r = storage.wl().apply_snapshot(snapshots[i].clone()); |
| if r != wresult { |
| panic!("#{}: want {:?}, got {:?}", i, wresult, r); |
| } |
| |
| // Apply snapshot fails due to StorageError::SnapshotOutOfDate |
| let i = 1; |
| let wresult = Err(RaftError::Store(StorageError::SnapshotOutOfDate)); |
| let r = storage.wl().apply_snapshot(snapshots[i].clone()); |
| if r != wresult { |
| panic!("#{}: want {:?}, got {:?}", i, wresult, r); |
| } |
| } |
| } |