Preserve replication job stats when jobs are re-created

Previously we made sure replication job statistics were preserved when
the jobs were started and stopped by the scheduler. However, if a db
node restarted or user re-created the job, replication stats would be
reset to 0.

Some statistics like `docs_read` and `docs_written` are perhaps not as
critical. However `doc_write_failures` is. That is the indicator that
some replication docs have not replicated to the target. Not
preserving that statistic meant users could perceive there was a data
loss during replication -- data was replicated successfully according
to the replication job with no write failures, user deletes source
database, then some times later noticed some of their data is missing.

These statistics were already logged in the checkpoint history and we
just had to initialize a stats object from them when a replication job
starts. In that initialization code we pick the highest values from
either the running scheduler or the checkpointed log. The reason is
that the running stats could be higher if say job was stopped suddenly
and failed to checkpoint but scheduler retained the data.

Fixes: #2414
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 12d3e55..0b33419 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -565,7 +565,7 @@
         options = Options,
         type = Type, view = View,
         start_time = StartTime,
-        stats = Stats
+        stats = ArgStats0
     } = Rep,
     % Adjust minimum number of http source connections to 2 to avoid deadlock
     Src = adjust_maxconn(Src0, BaseId),
@@ -580,6 +580,14 @@
     [SourceLog, TargetLog] = find_and_migrate_logs([Source, Target], Rep),
 
     {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
+
+    ArgStats1 = couch_replicator_stats:new(ArgStats0),
+    HistoryStats = case History of
+        [{[_ | _] = HProps} | _] -> couch_replicator_stats:new(HProps);
+        _ -> couch_replicator_stats:new()
+    end,
+    Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats),
+
     StartSeq1 = get_value(since_seq, Options, StartSeq0),
     StartSeq = {0, StartSeq1},
 
@@ -609,7 +617,7 @@
                                         ?DEFAULT_CHECKPOINT_INTERVAL),
         type = Type,
         view = View,
-        stats = couch_replicator_stats:new(Stats)
+        stats = Stats
     },
     State#rep_state{timer = start_timer(State)}.
 
diff --git a/src/couch_replicator/src/couch_replicator_stats.erl b/src/couch_replicator/src/couch_replicator_stats.erl
index cd62949..37848b3 100644
--- a/src/couch_replicator/src/couch_replicator_stats.erl
+++ b/src/couch_replicator/src/couch_replicator_stats.erl
@@ -17,7 +17,8 @@
     new/1,
     get/2,
     increment/2,
-    sum_stats/2
+    sum_stats/2,
+    max_stats/2
 ]).
 
 -export([
@@ -64,14 +65,29 @@
 sum_stats(S1, S2) ->
     orddict:merge(fun(_, V1, V2) -> V1+V2 end, S1, S2).
 
+max_stats(S1, S2) ->
+    orddict:merge(fun(_, V1, V2) -> max(V1, V2) end, S1, S2).
 
-% Handle initializing from a status object which uses same values but different
-% field names.
-fmap({revisions_checked, V})       -> {true, {missing_checked, V}};
-fmap({missing_revisions_found, V}) -> {true, {missing_found, V}};
-fmap({missing_checked, _})         -> true;
-fmap({missing_found, _})           -> true;
-fmap({docs_read, _})               -> true;
-fmap({docs_written, _})            -> true;
-fmap({doc_write_failures, _})      -> true;
-fmap({_, _})                       -> false.
+
+% Handle initializing from a status object, which uses same values but
+% different field names, as well as from ejson props from the checkpoint
+% history
+%
+fmap({missing_found, _})             -> true;
+fmap({missing_revisions_found, V})   -> {true, {missing_found, V}};
+fmap({<<"missing_found">>, V})       -> {true, {missing_found, V}};
+
+fmap({missing_checked, _})           -> true;
+fmap({revisions_checked, V})         -> {true, {missing_checked, V}};
+fmap({<<"missing_checked">>, V})     -> {true, {missing_checked, V}};
+
+fmap({docs_read, _})                 -> true;
+fmap({<<"docs_read">>, V})           -> {true, {docs_read, V}};
+
+fmap({docs_written, _})              -> true;
+fmap({<<"docs_written">>, V})        -> {true, {docs_written, V}};
+
+fmap({doc_write_failures, _})        -> true;
+fmap({<<"doc_write_failures">>, V})  -> {true, {doc_write_failures, V}};
+
+fmap({_, _})                         -> false.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
index 9dd86b3..037f371 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
@@ -18,48 +18,93 @@
 
 -define(DELAY, 500).
 -define(TIMEOUT, 60000).
--define(i2l(I), integer_to_list(I)).
--define(io2b(Io), iolist_to_binary(Io)).
+
+
+setup_all() ->
+    test_util:start_couch([couch_replicator, chttpd, mem3, fabric]).
+
+
+teardown_all(Ctx) ->
+    ok = test_util:stop_couch(Ctx).
 
 
 setup() ->
-    Ctx = test_util:start_couch([couch_replicator, chttpd, mem3, fabric]),
     Source = setup_db(),
     Target = setup_db(),
-    {Ctx, {Source, Target}}.
+    {Source, Target}.
 
 
-teardown({Ctx, {Source, Target}}) ->
+teardown({Source, Target}) ->
     teardown_db(Source),
     teardown_db(Target),
-    ok = application:stop(couch_replicator),
-    ok = test_util:stop_couch(Ctx).
+    ok.
 
 
 stats_retained_test_() ->
     {
         setup,
-        fun setup/0,
-        fun teardown/1,
-        fun t_stats_retained/1
+        fun setup_all/0,
+        fun teardown_all/1,
+        {
+            foreach,
+            fun setup/0,
+            fun teardown/1,
+            [
+                fun t_stats_retained_by_scheduler/1,
+                fun t_stats_retained_on_job_removal/1
+            ]
+        }
     }.
 
 
-t_stats_retained({_Ctx, {Source, Target}}) ->
+t_stats_retained_by_scheduler({Source, Target}) ->
     ?_test(begin
-        populate_db(Source, 42),
+        {ok, _} = add_vdu(Target),
+        populate_db_reject_even_docs(Source, 1, 10),
         {ok, RepPid, RepId} = replicate(Source, Target),
+        wait_target_in_sync(6, Target),
 
-        wait_target_in_sync(Source, Target),
-        check_active_tasks(42, 42),
-        check_scheduler_jobs(42, 42),
+        check_active_tasks(10, 5, 5),
+        check_scheduler_jobs(10, 5, 5),
 
         stop_job(RepPid),
-        check_scheduler_jobs(42, 42),
+        check_scheduler_jobs(10, 5, 5),
 
         start_job(),
-        check_active_tasks(42, 42),
-        check_scheduler_jobs(42, 42),
+        check_active_tasks(10, 5, 5),
+        check_scheduler_jobs(10, 5, 5),
+        couch_replicator_scheduler:remove_job(RepId)
+    end).
+
+
+t_stats_retained_on_job_removal({Source, Target}) ->
+    ?_test(begin
+        {ok, _} = add_vdu(Target),
+        populate_db_reject_even_docs(Source, 1, 10),
+        {ok, _, RepId} = replicate(Source, Target),
+        wait_target_in_sync(6, Target),  % 5 + 1 vdu
+
+        check_active_tasks(10, 5, 5),
+        check_scheduler_jobs(10, 5, 5),
+
+        couch_replicator_scheduler:remove_job(RepId),
+
+        populate_db_reject_even_docs(Source, 11, 20),
+        {ok, _, RepId} = replicate(Source, Target),
+        wait_target_in_sync(11, Target),  % 6 + 5
+
+        check_scheduler_jobs(20, 10, 10),
+        check_active_tasks(20, 10, 10),
+
+        couch_replicator_scheduler:remove_job(RepId),
+
+        populate_db_reject_even_docs(Source, 21, 30),
+        {ok, _, RepId} = replicate(Source, Target),
+        wait_target_in_sync(16, Target),  % 11 + 5
+
+        check_scheduler_jobs(30, 15, 15),
+        check_active_tasks(30, 15, 15),
+
         couch_replicator_scheduler:remove_job(RepId)
     end).
 
@@ -92,14 +137,16 @@
     couch_replicator_scheduler:reschedule().
 
 
-check_active_tasks(DocsRead, DocsWritten) ->
+check_active_tasks(DocsRead, DocsWritten, DocsFailed) ->
     RepTask = wait_for_task_status(),
     ?assertNotEqual(timeout, RepTask),
     ?assertEqual(DocsRead, couch_util:get_value(docs_read, RepTask)),
-    ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)).
+    ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)),
+    ?assertEqual(DocsFailed, couch_util:get_value(doc_write_failures,
+        RepTask)).
 
 
-check_scheduler_jobs(DocsRead, DocsWritten) ->
+check_scheduler_jobs(DocsRead, DocsWritten, DocFailed) ->
     Info = wait_scheduler_info(),
     ?assert(maps:is_key(<<"changes_pending">>, Info)),
     ?assert(maps:is_key(<<"doc_write_failures">>, Info)),
@@ -110,7 +157,8 @@
     ?assert(maps:is_key(<<"source_seq">>, Info)),
     ?assert(maps:is_key(<<"revisions_checked">>, Info)),
     ?assertMatch(#{<<"docs_read">> := DocsRead}, Info),
-    ?assertMatch(#{<<"docs_written">> := DocsWritten}, Info).
+    ?assertMatch(#{<<"docs_written">> := DocsWritten}, Info),
+    ?assertMatch(#{<<"doc_write_failures">> := DocFailed}, Info).
 
 
 replication_tasks() ->
@@ -138,25 +186,31 @@
     end).
 
 
-populate_db(DbName, DocCount) ->
+populate_db_reject_even_docs(DbName, Start, End) ->
+    BodyFun = fun(Id) ->
+        case Id rem 2 == 0 of
+            true -> {[{<<"nope">>, true}]};
+            false -> {[]}
+        end
+    end,
+    populate_db(DbName, Start, End, BodyFun).
+
+
+populate_db(DbName, Start, End, BodyFun) when is_function(BodyFun, 1) ->
     {ok, Db} = couch_db:open_int(DbName, []),
     Docs = lists:foldl(
         fun(DocIdCounter, Acc) ->
-            Id = ?io2b(["doc", ?i2l(DocIdCounter)]),
-            Doc = #doc{id = Id, body = {[]}},
+            Id = integer_to_binary(DocIdCounter),
+            Doc = #doc{id = Id, body = BodyFun(DocIdCounter)},
             [Doc | Acc]
         end,
-        [], lists:seq(1, DocCount)),
+        [], lists:seq(Start, End)),
     {ok, _} = couch_db:update_docs(Db, Docs, []),
     ok = couch_db:close(Db).
 
 
-wait_target_in_sync(Source, Target) ->
-    {ok, SourceDb} = couch_db:open_int(Source, []),
-    {ok, SourceInfo} = couch_db:get_db_info(SourceDb),
-    ok = couch_db:close(SourceDb),
-    SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
-    wait_target_in_sync_loop(SourceDocCount, Target, 300).
+wait_target_in_sync(DocCount, Target) when is_integer(DocCount) ->
+    wait_target_in_sync_loop(DocCount, Target, 300).
 
 
 wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
@@ -170,7 +224,7 @@
     {ok, TargetInfo} = couch_db:get_db_info(Target),
     ok = couch_db:close(Target),
     TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
-    case TargetDocCount == DocCount of
+    case TargetDocCount  == DocCount of
         true ->
             true;
         false ->
@@ -201,3 +255,28 @@
     {ok, 200, _, Body} = test_request:get(Url, []),
     Json = jiffy:decode(Body, [return_maps]),
     maps:get(<<"jobs">>, Json).
+
+
+vdu() ->
+    <<"function(newDoc, oldDoc, userCtx) {
+        if(newDoc.nope === true) {
+            throw({forbidden: 'nope'});
+        } else {
+            return;
+        }
+    }">>.
+
+
+add_vdu(DbName) ->
+    DocProps = [
+        {<<"_id">>, <<"_design/vdu">>},
+        {<<"language">>, <<"javascript">>},
+        {<<"validate_doc_update">>, vdu()}
+    ],
+    Doc = couch_doc:from_json_obj({DocProps}, []),
+    {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
+    try
+        {ok, _Rev} = couch_db:update_doc(Db, Doc, [])
+    after
+        couch_db:close(Db)
+    end.
diff --git a/test/elixir/test/replication_test.exs b/test/elixir/test/replication_test.exs
index 73ceca6..bdd683e 100644
--- a/test/elixir/test/replication_test.exs
+++ b/test/elixir/test/replication_test.exs
@@ -75,8 +75,8 @@
     assert is_list(result["history"])
     assert length(result["history"]) == 2
     history = Enum.at(result["history"], 0)
-    assert history["docs_written"] == 1
-    assert history["docs_read"] == 1
+    assert history["docs_written"] == 2
+    assert history["docs_read"] == 2
     assert history["doc_write_failures"] == 0
 
     query = %{
@@ -352,10 +352,10 @@
     assert history["session_id"] == result["session_id"]
     assert is_binary(history["start_time"])
     assert is_binary(history["end_time"])
-    assert history["missing_checked"] == 6
-    assert history["missing_found"] == 6
-    assert history["docs_read"] == 6
-    assert history["docs_written"] == 6
+    assert history["missing_checked"] == 27
+    assert history["missing_found"] == 27
+    assert history["docs_read"] == 27
+    assert history["docs_written"] == 27
     assert history["doc_write_failures"] == 0
 
     copy = Couch.get!("/#{tgt_db_name}/#{new_doc["_id"]}").body
@@ -414,10 +414,10 @@
     assert is_list(result["history"])
     assert length(result["history"]) == 3
     history = Enum.at(result["history"], 0)
-    assert history["missing_checked"] == 1
-    assert history["missing_found"] == 1
-    assert history["docs_read"] == 1
-    assert history["docs_written"] == 1
+    assert history["missing_checked"] == 28
+    assert history["missing_found"] == 28
+    assert history["docs_read"] == 28
+    assert history["docs_written"] == 28
     assert history["doc_write_failures"] == 0
 
     resp = Couch.get("/#{tgt_db_name}/#{del_doc["_id"]}")
@@ -446,10 +446,10 @@
     assert is_list(result["history"])
     assert length(result["history"]) == 4
     history = Enum.at(result["history"], 0)
-    assert history["missing_checked"] == 1
-    assert history["missing_found"] == 1
-    assert history["docs_read"] == 1
-    assert history["docs_written"] == 1
+    assert history["missing_checked"] == 29
+    assert history["missing_found"] == 29
+    assert history["docs_read"] == 29
+    assert history["docs_written"] == 29
     assert history["doc_write_failures"] == 0
 
     copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body
@@ -473,10 +473,10 @@
     assert is_list(result["history"])
     assert length(result["history"]) == 5
     history = Enum.at(result["history"], 0)
-    assert history["missing_checked"] == 1
-    assert history["missing_found"] == 1
-    assert history["docs_read"] == 1
-    assert history["docs_written"] == 1
+    assert history["missing_checked"] == 30
+    assert history["missing_found"] == 30
+    assert history["docs_read"] == 30
+    assert history["docs_written"] == 30
     assert history["doc_write_failures"] == 0
 
     copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body
@@ -502,10 +502,10 @@
     assert is_list(result["history"])
     assert length(result["history"]) == 6
     history = Enum.at(result["history"], 0)
-    assert history["missing_checked"] == 1
-    assert history["missing_found"] == 1
-    assert history["docs_read"] == 1
-    assert history["docs_written"] == 1
+    assert history["missing_checked"] == 31
+    assert history["missing_found"] == 31
+    assert history["docs_read"] == 31
+    assert history["docs_written"] == 31
     assert history["doc_write_failures"] == 0
 
     copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body
@@ -534,10 +534,10 @@
     assert is_list(result["history"])
     assert length(result["history"]) == 7
     history = Enum.at(result["history"], 0)
-    assert history["missing_checked"] == 3
-    assert history["missing_found"] == 1
-    assert history["docs_read"] == 1
-    assert history["docs_written"] == 1
+    assert history["missing_checked"] == 34
+    assert history["missing_found"] == 32
+    assert history["docs_read"] == 32
+    assert history["docs_written"] == 32
     assert history["doc_write_failures"] == 0
 
     docs = [
@@ -559,10 +559,10 @@
     assert is_list(result["history"])
     assert length(result["history"]) == 8
     history = Enum.at(result["history"], 0)
-    assert history["missing_checked"] == 2
-    assert history["missing_found"] == 0
-    assert history["docs_read"] == 0
-    assert history["docs_written"] == 0
+    assert history["missing_checked"] == 36
+    assert history["missing_found"] == 32
+    assert history["docs_read"] == 32
+    assert history["docs_written"] == 32
     assert history["doc_write_failures"] == 0
 
     # Test nothing to replicate
@@ -822,10 +822,10 @@
     assert length(result["history"]) == 2
     history = Enum.at(result["history"], 0)
 
-    assert history["missing_checked"] == 3
-    assert history["missing_found"] == 3
-    assert history["docs_read"] == 3
-    assert history["docs_written"] == 3
+    assert history["missing_checked"] == 19
+    assert history["missing_found"] == 19
+    assert history["docs_read"] == 19
+    assert history["docs_written"] == 19
     assert history["doc_write_failures"] == 0
   end
 
@@ -1185,8 +1185,8 @@
 
     result = replicate(repl_src, repl_tgt, body: repl_body)
     assert result["ok"]
-    assert result["docs_read"] == 1
-    assert result["docs_written"] == 1
+    assert result["docs_read"] == 2
+    assert result["docs_written"] == 2
     assert result["doc_write_failures"] == 0
 
     retry_until(fn ->