Integrate raft algorithm (WIP)

couch_raft.erl is a complete implementation of the raft algorithm but
currently only manages an in-memory state machine and log.

Preliminary work is also here to add a new btree inside the `.couch`
files, which will be the real raft log. The intent is that log entries
can be removed from this log and applied to by_id and by_seq trees
atomically.

raft log is preserved over compaction in the same manner as local
docs, all entries are slurped into memory and written in one
pass. This should be fine as the log should stay short, committed
entries can be promptly removed. It's probably not fine for local
docs, though...

Anyway, it's progress and hopefully we're going somewhere cool.
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 486ed7c..8feb360 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -66,6 +66,11 @@
     purge_docs/3,
     copy_purge_infos/2,
 
+    raft_lookup/2,
+    raft_insert/2,
+    raft_discard/2,
+    raft_last/1,
+
     commit_data/1,
 
     open_write_stream/2,
@@ -102,7 +107,11 @@
     purge_tree_join/2,
     purge_tree_reduce/2,
     purge_seq_tree_split/1,
-    purge_seq_tree_join/2
+    purge_seq_tree_join/2,
+
+    raft_tree_split/1,
+    raft_tree_join/2,
+    raft_tree_reduce/2
 ]).
 
 % Used by the compactor
@@ -631,6 +640,44 @@
     {ok, Changes} = couch_btree:fold_reduce(BTree, FoldFun, 0, Opts),
     Changes.
 
+raft_insert(#st{} = St, Entries) when is_list(Entries) ->
+    #st{
+        raft_tree = RaftTree0
+    } = St,
+    {ok, RaftTree1} = couch_btree:add_remove(RaftTree0, Entries, []),
+    {ok, St#st{
+        raft_tree = RaftTree1,
+        needs_commit = true
+    }}.
+
+raft_lookup(#st{} = St, Indexes) ->
+    Results = couch_btree:lookup(St#st.raft_tree, Indexes),
+    lists:map(
+        fun
+            ({ok, Entry}) -> Entry;
+            (not_found) -> not_found
+        end,
+        Results
+    ).
+
+raft_discard(#st{} = St, UpTo) ->
+    #st{
+        raft_tree = RaftTree0
+    } = St,
+    {ok, {First, _Last}} = couch_btree:full_reduce(RaftTree0),
+    {FirstIndex, _FirstTerm} = First,
+    Remove = lists:seq(FirstIndex, UpTo),
+    {ok, RaftTree1} = couch_btree:add_remove(RaftTree0, [], Remove),
+    {ok, St#st{
+        raft_tree = RaftTree1,
+        needs_commit = true
+    }}.
+
+
+raft_last(#st{} = St) ->
+    {ok, {_First, Last}} = couch_btree:full_reduce(St#st.raft_tree),
+    Last.
+
 start_compaction(St, DbName, Options, Parent) ->
     Args = [St, DbName, Options, Parent],
     Pid = spawn_link(couch_bt_engine_compactor, start, Args),
@@ -799,6 +846,23 @@
 purge_tree_reduce(rereduce, Reds) ->
     lists:sum(Reds).
 
+raft_tree_split({Index, Term, Value}) ->
+    {Index, {Term, Value}}.
+
+raft_tree_join(Index, {Term, Value}) ->
+    {Index, Term, Value}.
+
+
+raft_tree_reduce(reduce, []) ->
+    {{0, 0}, {0, 0}};
+raft_tree_reduce(reduce, Entries) ->
+    {MinIndex, MinTerm, _} = lists:min(Entries),
+    {MaxIndex, MaxTerm, _} = lists:max(Entries),
+    {{MinIndex, MinTerm}, {MaxIndex, MaxTerm}};
+raft_tree_reduce(rereduce, Reds) ->
+    {Mins, Maxs} = lists:unzip(Reds),
+    {lists:min(Mins), lists:max(Maxs)}.
+
 set_update_seq(#st{header = Header} = St, UpdateSeq) ->
     {ok, St#st{
         header = couch_bt_engine_header:set(Header, [
@@ -894,6 +958,13 @@
         {reduce, fun ?MODULE:purge_tree_reduce/2}
     ]),
 
+    RaftTreeState = couch_bt_engine_header:raft_tree_state(Header),
+    {ok, RaftTree} = couch_btree:open(RaftTreeState, Fd, [
+        {split, fun ?MODULE:raft_tree_split/1},
+        {join, fun ?MODULE:raft_tree_join/2},
+        {reduce, fun ?MODULE:raft_tree_reduce/2}
+    ]),
+
     ok = couch_file:set_db_pid(Fd, self()),
 
     St = #st{
@@ -907,7 +978,8 @@
         local_tree = LocalTree,
         compression = Compression,
         purge_tree = PurgeTree,
-        purge_seq_tree = PurgeSeqTree
+        purge_seq_tree = PurgeSeqTree,
+        raft_tree = RaftTree
     },
 
     % If this is a new database we've just created a
@@ -927,7 +999,8 @@
         {id_tree_state, couch_btree:get_state(St#st.id_tree)},
         {local_tree_state, couch_btree:get_state(St#st.local_tree)},
         {purge_tree_state, couch_btree:get_state(St#st.purge_tree)},
-        {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)}
+        {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)},
+        {raft_tree_state, couch_btree:get_state(St#st.raft_tree)}
     ]).
 
 increment_update_seq(#st{header = Header} = St) ->
@@ -1097,7 +1170,8 @@
         St#st.seq_tree,
         St#st.local_tree,
         St#st.purge_tree,
-        St#st.purge_seq_tree
+        St#st.purge_seq_tree,
+        St#st.raft_tree
     ],
     lists:foldl(
         fun(T, Acc) ->
@@ -1171,12 +1245,14 @@
 finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
     #st{
         filepath = FilePath,
-        local_tree = OldLocal
+        local_tree = OldLocal,
+        raft_tree = OldRaft
     } = OldSt,
     #st{
         filepath = CompactDataPath,
         header = Header,
-        local_tree = NewLocal1
+        local_tree = NewLocal1,
+        raft_tree = NewRaft1
     } = NewSt1,
 
     % suck up all the local docs into memory and write them to the new db
@@ -1186,13 +1262,18 @@
     {ok, _, LocalDocs} = couch_btree:foldl(OldLocal, LoadFun, []),
     {ok, NewLocal2} = couch_btree:add(NewLocal1, LocalDocs),
 
+    % do the same for the raft log
+    {ok, _, RaftLog} = couch_btree:foldl(OldRaft, LoadFun, []),
+    {ok, NewRaft2} = couch_btree:add(NewRaft1, RaftLog),
+
     {ok, NewSt2} = commit_data(NewSt1#st{
         header = couch_bt_engine_header:set(Header, [
             {compacted_seq, get_update_seq(OldSt)},
             {revs_limit, get_revs_limit(OldSt)},
             {purge_infos_limit, get_purge_infos_limit(OldSt)}
         ]),
-        local_tree = NewLocal2
+        local_tree = NewLocal2,
+        raft_tree = NewRaft2
     }),
 
     % Rename our *.compact.data file to *.compact so that if we
diff --git a/src/couch/src/couch_bt_engine.hrl b/src/couch/src/couch_bt_engine.hrl
index e3c1d49..0d347e9 100644
--- a/src/couch/src/couch_bt_engine.hrl
+++ b/src/couch/src/couch_bt_engine.hrl
@@ -23,5 +23,6 @@
     local_tree,
     compression,
     purge_tree,
-    purge_seq_tree
+    purge_seq_tree,
+    raft_tree
 }).
diff --git a/src/couch/src/couch_bt_engine_header.erl b/src/couch/src/couch_bt_engine_header.erl
index e28f077..9e663b0 100644
--- a/src/couch/src/couch_bt_engine_header.erl
+++ b/src/couch/src/couch_bt_engine_header.erl
@@ -34,6 +34,7 @@
     purge_tree_state/1,
     purge_seq_tree_state/1,
     purge_infos_limit/1,
+    raft_tree_state/1,
     security_ptr/1,
     revs_limit/1,
     uuid/1,
@@ -69,7 +70,8 @@
     epochs,
     compacted_seq,
     purge_infos_limit = 1000,
-    props_ptr
+    props_ptr,
+    raft_tree_state = nil
 }).
 
 -define(PARTITION_DISK_VERSION, 8).
@@ -177,6 +179,9 @@
 purge_infos_limit(Header) ->
     get_field(Header, purge_infos_limit).
 
+raft_tree_state(Header) ->
+    get_field(Header, raft_tree_state).
+
 get_field(Header, Field) ->
     get_field(Header, Field, undefined).
 
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 70ba1c2..274526a 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -114,6 +114,11 @@
     fold_purge_infos/4,
     fold_purge_infos/5,
 
+    raft_insert/2,
+    raft_lookup/2,
+    raft_discard/2,
+    raft_last/1,
+
     calculate_start_seq/3,
     owner_of/2,
 
@@ -1813,6 +1818,20 @@
 fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts) ->
     couch_db_engine:fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts).
 
+raft_insert(#db{main_pid = Pid} = Db, Entries) ->
+    check_is_admin(Db),
+    gen_server:call(Pid, {raft_insert, Entries}, infinity).
+
+raft_lookup(Db, Indexes) ->
+    couch_db_engine:raft_lookup(Db, Indexes).
+
+raft_discard(#db{main_pid = Pid} = Db, UpTo) ->
+    check_is_admin(Db),
+    gen_server:call(Pid, {raft_discard, UpTo}, infinity).
+
+raft_last(Db) ->
+    couch_db_engine:raft_last(Db).
+
 count_changes_since(Db, SinceSeq) ->
     couch_db_engine:count_changes_since(Db, SinceSeq).
 
diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index 9e46b81..2969df9 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -704,6 +704,11 @@
     read_doc_body/2,
     load_purge_infos/2,
 
+    raft_lookup/2,
+    raft_insert/2,
+    raft_discard/2,
+    raft_last/1,
+
     serialize_doc/2,
     write_doc_body/2,
     write_doc_infos/3,
@@ -927,6 +932,28 @@
     ),
     {ok, Db#db{engine = {Engine, NewSt}}}.
 
+raft_insert(#db{} = Db, Entries) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    {ok, NewSt} = Engine:raft_insert(
+        EngineState, Entries
+    ),
+    {ok, Db#db{engine = {Engine, NewSt}}}.
+
+raft_lookup(#db{} = Db, Indexes) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    Engine:raft_lookup(EngineState, Indexes).
+
+raft_discard(#db{} = Db, UpTo) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    {ok, NewSt} = Engine:raft_discard(
+        EngineState, UpTo
+    ),
+    {ok, Db#db{engine = {Engine, NewSt}}}.
+
+raft_last(#db{} = Db) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    Engine:raft_last(EngineState).
+
 commit_data(#db{} = Db) ->
     #db{engine = {Engine, EngineState}} = Db,
     {ok, NewSt} = Engine:commit_data(EngineState),
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 17a1e91..2a449e8 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -118,6 +118,16 @@
         end,
     {ok, NewDb, Replies} = purge_docs(Db, PurgeReqs),
     {reply, {ok, Replies}, NewDb, idle_limit()};
+handle_call({raft_insert, Entries}, _From, Db) ->
+    {ok, Db2} = couch_db_engine:raft_insert(Db, Entries),
+    Db3 = commit_data(Db2),
+    ok = couch_server:db_updated(Db3),
+    {reply, ok, Db3, idle_limit()};
+handle_call({raft_discard, UpTo}, _From, Db) ->
+    {ok, Db2} = couch_db_engine:raft_discard(Db, UpTo),
+    Db3 = commit_data(Db2),
+    ok = couch_server:db_updated(Db3),
+    {reply, ok, Db3, idle_limit()};
 handle_call(Msg, From, Db) ->
     case couch_db_engine:handle_db_updater_call(Msg, From, Db) of
         {reply, Resp, NewDb} ->
diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
new file mode 100644
index 0000000..f398b4f
--- /dev/null
+++ b/src/couch/src/couch_raft.erl
@@ -0,0 +1,350 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft).
+-behaviour(gen_statem).
+
+-define(ELECTION_DELAY, 150).
+-define(ELECTION_SPLAY, 150).
+-define(LEADER_HEARTBEAT, 75).
+-define(CLIENT_TIMEOUT, 5_000).
+
+% maximum number of entries to send in one go.
+-define(BATCH_SIZE, 10).
+
+% public api
+
+-export([
+    start/2,
+    start_link/2,
+    stop/1,
+    call/2
+]).
+
+% mandatory gen_statem callbacks
+
+-export([
+    init/1,
+    callback_mode/0,
+    handle_event/4
+]).
+
+%% public api
+
+start(Name, Cohort) ->
+    gen_statem:start({local, Name}, ?MODULE, new(Name, Cohort), []).
+
+start_link(Name, Cohort) ->
+    gen_statem:start_link({local, Name}, ?MODULE, new(Name, Cohort), []).
+
+new(Name, Cohort) ->
+    Peers = peers(Cohort),
+    #{
+        name => Name,
+        cohort => Cohort,
+        term => 0,
+        votedFor => undefined,
+        votesGranted => #{},
+        nextIndex => maps:from_list([{Peer, 1} || Peer <- Peers]),
+        matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]),
+        log => couch_raft_log:new(),
+        commitIndex => 0,
+        froms => #{},
+        lastApplied => 0,
+        machine => <<0>>
+    }.
+
+stop(ServerRef) ->
+    gen_statem:stop(ServerRef).
+
+call(ServerRef, Value) ->
+    gen_statem:call(ServerRef, #{type => 'ClientRequest', value => Value}, ?CLIENT_TIMEOUT).
+
+init(Data) ->
+    {ok, follower, Data}.
+
+callback_mode() ->
+    [handle_event_function, state_enter].
+
+%% erlfmt-ignore
+handle_event(cast, #{term := FutureTerm} = Msg, _State, #{term := CurrentTerm} = Data) when FutureTerm > CurrentTerm ->
+    couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [node(), FutureTerm]),
+    {next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}};
+
+handle_event(enter, _OldState, follower, Data) ->
+    #{term := Term, froms := Froms} = Data,
+    couch_log:notice("~p became follower in term ~B", [node(), Term]),
+    Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
+    {keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]};
+
+handle_event(enter, _OldState, candidate, Data) ->
+    #{term := Term} = Data,
+    couch_log:notice("~p became candidate in term ~B", [node(), Term]),
+    {keep_state, start_election(Data), restart_election_timeout()};
+
+handle_event(enter, _OldState, leader, Data) ->
+    #{log := Log, cohort := Cohort, term := Term} = Data,
+    couch_log:notice("~p became leader in term ~B", [node(), Term]),
+    Peers = peers(Cohort),
+    {keep_state, Data#{
+        nextIndex => maps:from_list([{Peer, couch_raft_log:index(couch_raft_log:last(Log)) + 1} || Peer <- Peers]),
+        matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers])
+    }, restart_heartbeat_timeout()};
+
+handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State, #{term := CurrentTerm} = Data)
+  when Term =< CurrentTerm ->
+    #{
+        source := MSource,
+        lastLogIndex := MLastLogIndex,
+        lastLogTerm := MLastLogTerm
+    } = Msg,
+    #{
+        log := Log,
+        votedFor := VotedFor
+    } = Data,
+    LogOk = MLastLogTerm > couch_raft_log:term(couch_raft_log:last(Log)) orelse (MLastLogTerm == couch_raft_log:term(couch_raft_log:last(Log)) andalso MLastLogIndex >= couch_raft_log:index(couch_raft_log:last(Log))),
+    Grant = Term == CurrentTerm andalso LogOk andalso (VotedFor == undefined orelse VotedFor == MSource),
+    couch_log:notice("~p received RequestVoteRequest from ~p in term ~B when in term ~B (Grant:~p, LogOk:~p, VotedFor:~p)", [node(), MSource, Term, CurrentTerm, Grant, LogOk, VotedFor]),
+    Reply = #{
+        type => 'RequestVoteResponse',
+        term => CurrentTerm,
+        voteGranted => Grant,
+        source => node()
+    },
+    cast(MSource, Reply, Data),
+    if
+        Grant ->
+            {keep_state, Data#{votedFor => MSource}, restart_election_timeout()};
+        true ->
+            {keep_state_and_data, restart_election_timeout()}
+    end;
+
+handle_event(cast, #{type := 'RequestVoteResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
+    couch_log:notice("~p ignored RequestVoteResponse from past term ~B", [node(), PastTerm]),
+    keep_state_and_data;
+
+handle_event(cast, #{type := 'RequestVoteResponse', term := Term} = Msg, _State, #{term := Term} = Data) ->
+    #{source := MSource, voteGranted := MVoteGranted} = Msg,
+    #{cohort := Cohort, votesGranted := VotesGranted0} = Data,
+    VotesGranted1 = if MVoteGranted -> lists:usort([MSource | VotesGranted0]); true -> VotesGranted0 end,
+    couch_log:notice("~p received RequestVoteResponse from ~p in current term ~B (VotesGranted:~p)", [node(), MSource, Term, VotesGranted1]),
+    if
+        length(VotesGranted1) >= length(Cohort) div 2 + 1 ->
+            couch_log:notice("~p has enough votes to be leader in term ~B", [node(), Term]),
+            {next_state, leader, Data#{votesGranted => VotesGranted1}};
+        true ->
+            {keep_state, Data#{votesGranted => VotesGranted1}}
+    end;
+
+
+handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State, #{term := CurrentTerm} = Data)
+  when Term =< CurrentTerm ->
+    #{
+        source := MSource,
+        prevLogIndex := MPrevLogIndex,
+        prevLogTerm := MPrevLogTerm,
+        entries := MEntries,
+        commitIndex := MCommitIndex
+    } = Msg,
+    #{
+        log := Log
+    } = Data,
+    LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< couch_raft_log:index(couch_raft_log:last(Log)) andalso MPrevLogTerm == couch_raft_log:term(couch_raft_log:nth(MPrevLogIndex,Log))),
+    if
+        Term < CurrentTerm orelse (Term == CurrentTerm andalso State == follower andalso not LogOk) ->
+            Reply = #{
+                type => 'AppendEntriesResponse',
+                term => CurrentTerm,
+                success => false,
+                matchIndex => 0,
+                source => node()
+            },
+            cast(MSource, Reply, Data),
+            if
+                State == leader ->
+                    keep_state_and_data;
+                true ->
+                    {keep_state_and_data, restart_election_timeout()}
+            end;
+        Term == CurrentTerm andalso State == candidate ->
+            {next_state, follower, Data, {next_event, cast, Msg}};
+        Term == CurrentTerm andalso State == follower andalso LogOk ->
+            if
+                MEntries == [] ->
+                    Reply = #{
+                        type => 'AppendEntriesResponse',
+                        term => CurrentTerm,
+                        success => true,
+                        matchIndex => MPrevLogIndex,
+                        source => node()
+                    },
+                    couch_log:debug("~p received heartbeat and everything matches, sending matchIndex:~p", [node(), MPrevLogIndex]),
+                    cast(MSource, Reply, Data),
+                    {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+                true ->
+                    Index = MPrevLogIndex + 1,
+                    LastLogIndex = couch_raft_log:index(couch_raft_log:last(Log)),
+                    if
+                        LastLogIndex >= Index ->
+                            NthLogTerm = couch_raft_log:term(couch_raft_log:nth(Index, Log)),
+                            FirstEntryTerm = couch_raft_log:term(hd(MEntries)),
+                            if
+                                NthLogTerm == FirstEntryTerm ->
+                                    Reply = #{
+                                        type => 'AppendEntriesResponse',
+                                        term => CurrentTerm,
+                                        success => true,
+                                        matchIndex => MPrevLogIndex + length(MEntries),
+                                        source => node()
+                                    },
+                                    couch_log:notice("~p received entry:~p that's already applied, sending matchIndex:~p", [node(), MEntries, MPrevLogIndex + length(MEntries)]),
+                                    cast(MSource, Reply, Data),
+                                    {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+                                NthLogTerm /= FirstEntryTerm ->
+                                    couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]),
+                                    {keep_state, Data#{log => lists:sublist(Log, LastLogIndex - 1)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+                            end;
+                        LastLogIndex == MPrevLogIndex ->
+                            couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]),
+                            {keep_state, Data#{log => couch_raft_log:append(Log, MEntries)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+                    end
+            end
+    end;
+
+handle_event(cast, #{type := 'AppendEntriesResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
+    couch_log:notice("~p ignored AppendEntriesResponse from past term ~B", [node(), PastTerm]),
+    keep_state_and_data;
+
+handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _State, #{term := Term} = Data) ->
+    #{success := MSuccess, matchIndex := MMatchIndex, source := MSource} = Msg,
+    #{nextIndex := NextIndex, matchIndex := MatchIndex} = Data,
+    couch_log:debug("~p received AppendEntriesResponse from ~p in current term ~B (Success:~p)", [node(), MSource, Term, MSuccess]),
+    SourceNextIndex = maps:get(MSource, NextIndex),
+    if
+        MSuccess ->
+            {keep_state, Data#{
+                nextIndex => NextIndex#{MSource => MMatchIndex + 1},
+                matchIndex => MatchIndex#{MSource => MMatchIndex}
+            }};
+        true ->
+            {keep_state, Data#{
+                nextIndex => NextIndex#{MSource => max(SourceNextIndex - 1, 1)}
+            }}
+    end;
+
+handle_event({call, From}, #{type := 'ClientRequest'} = Msg, leader, Data) ->
+    #{value := Value} = Msg,
+    #{term := Term, log := Log, froms := Froms} = Data,
+    EntryIndex = couch_raft_log:index(couch_raft_log:last(Log)) + 1,
+    Entry = {EntryIndex, Term, Value},
+    {keep_state, Data#{log => couch_raft_log:append(Log, [Entry]), froms => Froms#{EntryIndex => From}}};
+
+handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
+    {keep_state_and_data, {reply, From, {error, not_leader}}};
+
+handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate ->
+    #{term := Term} = Data,
+    couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]),
+    {next_state, candidate, start_election(Data), restart_election_timeout()};
+
+handle_event(state_timeout, heartbeat, leader, Data) ->
+    #{term := Term} = Data,
+    couch_log:debug("~p leader sending a heartbeat in term ~B", [node(), Term]),
+    ok = send_append_entries(Data),
+    {keep_state, advance_commit_index(Data), restart_heartbeat_timeout()};
+
+handle_event(EventType, EventContent, State, Data) ->
+    {stop, {unknown_event, EventType, EventContent, State, Data}}.
+
+
+send_append_entries(#{cohort := Cohort} = Data) ->
+    send_append_entries(peers(Cohort), Data).
+
+send_append_entries([], _Data) ->
+    ok;
+send_append_entries([Peer | Rest], Data) ->
+    #{term := Term, nextIndex := NextIndex, log := Log, commitIndex := CommitIndex} = Data,
+    PrevLogIndex = maps:get(Peer, NextIndex) - 1,
+    PrevLogTerm = if PrevLogIndex > 0 -> couch_raft_log:term(couch_raft_log:nth(PrevLogIndex, Log)); true -> 0 end,
+    LastEntry = min(couch_raft_log:index(couch_raft_log:last(Log)), PrevLogIndex + 2),
+    Entries = couch_raft_log:sublist(Log, PrevLogIndex + 1, ?BATCH_SIZE),
+    Msg = #{
+        type => 'AppendEntriesRequest',
+        term => Term,
+        source => node(),
+        prevLogIndex => PrevLogIndex,
+        prevLogTerm => PrevLogTerm,
+        entries => Entries,
+        commitIndex => min(CommitIndex, LastEntry)
+    },
+    cast(Peer, Msg, Data),
+    send_append_entries(Rest, Data).
+
+advance_commit_index(Data) ->
+    #{matchIndex := MatchIndex, log := Log, cohort := Cohort, term := Term} = Data,
+    LastTerm = couch_raft_log:term(couch_raft_log:last(Log)),
+    LastIndexes = lists:sort([couch_raft_log:index(couch_raft_log:last(Log)) | maps:values(MatchIndex)]),
+    NewCommitIndex = lists:nth(length(Cohort) div 2 + 1, LastIndexes),
+    if
+        LastTerm == Term ->
+            update_state_machine(Data#{commitIndex => NewCommitIndex});
+        true ->
+            Data
+    end.
+
+update_state_machine(#{lastApplied := Same, commitIndex := Same} = Data) ->
+    Data;
+update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data) when LastApplied < CommitIndex ->
+    #{log := Log, froms := Froms0, machine := Machine0} = Data,
+    From = LastApplied + 1,
+    To = min(couch_raft_log:index(couch_raft_log:last(Log)), CommitIndex),
+    Fun = fun(Index, {Froms, Machine}) ->
+        Value = couch_raft_log:value(couch_raft_log:nth(Index, Log)),
+        Result = crypto:hash(sha256, <<Machine/binary, Value/binary>>),
+        case maps:is_key(Index, Froms) of
+            true ->
+                gen_statem:reply(maps:get(Index, Froms), Result),
+                {maps:remove(Index, Froms), Result};
+            false ->
+                {Froms, Result}
+        end
+    end,
+    {Froms1, Machine1} = lists:foldl(Fun, {Froms0, Machine0}, lists:seq(From, To)),
+    Data#{froms => Froms1, machine => Machine1, lastApplied => To}.
+
+start_election(Data) ->
+    #{term := Term, cohort := Cohort, log := Log} = Data,
+    ElectionTerm = Term + 1,
+    couch_log:notice("~p starting election in term ~B", [node(), ElectionTerm]),
+    RequestVote = #{
+        type => 'RequestVoteRequest',
+        term => ElectionTerm,
+        lastLogIndex => couch_raft_log:index(couch_raft_log:last(Log)),
+        lastLogTerm => couch_raft_log:term(couch_raft_log:last(Log)),
+        source => node()
+    },
+    lists:foreach(fun(Peer) -> cast(Peer, RequestVote, Data) end, peers(Cohort)),
+    Data#{term => ElectionTerm, votedFor => node(), votesGranted => [node()]}.
+
+cast(Node, Msg, #{name := Name}) ->
+    gen_statem:cast({Name, Node}, Msg).
+
+restart_election_timeout() ->
+    {state_timeout, ?ELECTION_DELAY + rand:uniform(?ELECTION_SPLAY), new_election}.
+
+restart_heartbeat_timeout() ->
+    {state_timeout, ?LEADER_HEARTBEAT, heartbeat}.
+
+peers(Cohort) ->
+    Cohort -- [node()].
diff --git a/src/couch/src/couch_raft_log.erl b/src/couch/src/couch_raft_log.erl
new file mode 100644
index 0000000..9872124
--- /dev/null
+++ b/src/couch/src/couch_raft_log.erl
@@ -0,0 +1,52 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft_log).
+
+-export([
+    new/0,
+    append/2,
+    sublist/3,
+    nth/2,
+    last/1,
+    index/1,
+    term/1,
+    value/1
+]).
+
+new() ->
+    [].
+
+append(Log, Items) ->
+    lists:append(Log, Items).
+
+sublist(Log, Start, Len) ->
+    lists:sublist(Log, Start, Len).
+
+nth(N, Log) ->
+    lists:nth(N, Log).
+
+last([]) ->
+    {0, 0, undefined};
+last(Log) ->
+    lists:last(Log).
+
+index(Entry) ->
+    element(1, Entry).
+
+term(Entry) ->
+    element(2, Entry).
+
+value(Entry) ->
+    element(3, Entry).
diff --git a/src/couch/test/eunit/couch_raft_SUITE.erl b/src/couch/test/eunit/couch_raft_SUITE.erl
new file mode 100644
index 0000000..1c3f8eb
--- /dev/null
+++ b/src/couch/test/eunit/couch_raft_SUITE.erl
@@ -0,0 +1,67 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft_SUITE).
+
+-behaviour(ct_suite).
+
+-export([all/0]).
+-export([three_nodes/1]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+all() ->
+    [three_nodes].
+
+three_nodes(Config) when is_list(Config) ->
+    N = 3,
+    Args = ["-pa", filename:dirname(code:which(craft))],
+    Peers = [?CT_PEER(#{wait_boot => {self(), tag}, args => Args}) || _ <- lists:seq(1, N)],
+    Cohort = [receive {tag, {started, Node, Peer}} -> Node end || {ok, Peer} <- Peers],
+
+    Crafts = [erpc:call(Node, craft3, start, [foo, Cohort]) || Node <- Cohort],
+
+    % wait for leader election
+    timer:sleep(500),
+
+    % verify only one leader elected
+    [{leader, FirstLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
+    [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts]),
+
+    % make a series of calls
+    Hash1 = crypto:hash(sha256, <<0, 1>>),
+    ?assertEqual(Hash1, craft3:call(FirstLeader, <<1>>)),
+
+    Hash2 = crypto:hash(sha256, <<Hash1/binary, 2>>),
+    ?assertEqual(Hash2, craft3:call(FirstLeader, <<2>>)),
+
+    Hash3 = crypto:hash(sha256, <<Hash2/binary, 3>>),
+    ?assertEqual(Hash3, craft3:call(FirstLeader, <<3>>)),
+
+    % force a re-election
+    craft3:stop(FirstLeader),
+    timer:sleep(500),
+
+    % verify new leader elected
+    [{leader, SecondLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
+        [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts, Pid /= FirstLeader]),
+    ?assertNotEqual(FirstLeader, SecondLeader),
+
+    % make another call
+    Hash4 = crypto:hash(sha256, <<Hash3/binary, 4>>),
+    ?assertEqual(Hash4, craft3:call(SecondLeader, <<4>>)),
+
+    [craft3:stop(Pid) || {ok, Pid} <- Crafts, Pid /= FirstLeader],
+    [peer:stop(Peer) || {ok, Peer} <- Peers].