blob: 2cdc5da4b6fc7022fc41eb0ecf707dfe2e577802 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_replicator_retain_stats_between_job_runs).
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_replicator/src/couch_replicator.hrl").
-include_lib("fabric/test/fabric2_test.hrl").
-define(DELAY, 500).
stats_retained_test_() ->
{
setup,
fun couch_replicator_test_helper:start_couch/0,
fun couch_replicator_test_helper:stop_couch/1,
{
foreach,
fun setup/0,
fun teardown/1,
[
?TDEF_FE(t_stats_retained_on_job_removal, 60)
]
}
}.
setup() ->
Source = couch_replicator_test_helper:create_db(),
Target = couch_replicator_test_helper:create_db(),
config:set("replicator", "stats_update_interval_sec", "0", false),
config:set("replicator", "checkpoint_interval", "1000", false),
{Source, Target}.
teardown({Source, Target}) ->
config:delete("replicator", "stats_update_interval_sec", false),
config:delete("replicator", "checkpoint_interval", false),
couch_replicator_test_helper:delete_db(Source),
couch_replicator_test_helper:delete_db(Target).
t_stats_retained_on_job_removal({Source, Target}) ->
{ok, _} = add_vdu(Target),
populate_db_reject_even_docs(Source, 1, 10),
{ok, Pid1, RepId} = replicate(Source, Target),
% 5 + 1 vdu
wait_target_in_sync(6, Target),
check_scheduler_jobs(10, 5, 5),
cancel(RepId, Pid1),
populate_db_reject_even_docs(Source, 11, 20),
{ok, Pid2, RepId} = replicate(Source, Target),
% 6 + 5
wait_target_in_sync(11, Target),
check_scheduler_jobs(20, 10, 10),
cancel(RepId, Pid2),
populate_db_reject_even_docs(Source, 21, 30),
{ok, Pid3, RepId} = replicate(Source, Target),
% 11 + 5
wait_target_in_sync(16, Target),
check_scheduler_jobs(30, 15, 15),
cancel(RepId, Pid3).
check_scheduler_jobs(DocsRead, DocsWritten, DocFailed) ->
Info = wait_scheduler_info(DocsRead),
?assert(maps:is_key(<<"changes_pending">>, Info)),
?assert(maps:is_key(<<"doc_write_failures">>, Info)),
?assert(maps:is_key(<<"docs_read">>, Info)),
?assert(maps:is_key(<<"docs_written">>, Info)),
?assert(maps:is_key(<<"missing_revisions_found">>, Info)),
?assert(maps:is_key(<<"checkpointed_source_seq">>, Info)),
?assert(maps:is_key(<<"source_seq">>, Info)),
?assert(maps:is_key(<<"revisions_checked">>, Info)),
?assertMatch(#{<<"docs_read">> := DocsRead}, Info),
?assertMatch(#{<<"docs_written">> := DocsWritten}, Info),
?assertMatch(#{<<"doc_write_failures">> := DocFailed}, Info).
wait_scheduler_info(DocsRead) ->
test_util:wait(fun() ->
case couch_replicator_test_helper:scheduler_jobs() of
[] ->
wait;
[#{<<"info">> := null}] ->
wait;
[#{<<"info">> := Info}] ->
case Info of
#{<<"docs_read">> := DocsRead} -> Info;
#{} -> wait
end
end
end).
populate_db_reject_even_docs(DbName, Start, End) ->
BodyFun = fun(Id) ->
case Id rem 2 == 0 of
true -> {[{<<"nope">>, true}]};
false -> {[]}
end
end,
populate_db(DbName, Start, End, BodyFun).
populate_db(DbName, Start, End, BodyFun) when is_function(BodyFun, 1) ->
Docs = lists:foldl(
fun(DocIdCounter, Acc) ->
Id = integer_to_binary(DocIdCounter),
Doc = #doc{id = Id, body = BodyFun(DocIdCounter)},
[Doc | Acc]
end,
[],
lists:seq(Start, End)
),
couch_replicator_test_helper:create_docs(DbName, Docs).
wait_target_in_sync(DocCount, Target) when is_integer(DocCount) ->
wait_target_in_sync_loop(DocCount, Target, 300).
wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
erlang:error(
{assertion_failed, [
{module, ?MODULE},
{line, ?LINE},
{reason, "Could not get source and target databases in sync"}
]}
);
wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
{ok, Db} = fabric2_db:open(TargetName, [?ADMIN_CTX]),
{ok, TargetInfo} = fabric2_db:get_db_info(Db),
TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
case TargetDocCount == DocCount of
true ->
true;
false ->
ok = timer:sleep(?DELAY),
wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
end.
replicate(Source, Target) ->
couch_replicator_test_helper:replicate_continuous(Source, Target).
cancel(RepId, Pid) ->
couch_replicator_test_helper:cancel(RepId, Pid).
vdu() ->
<<
"function(newDoc, oldDoc, userCtx) {\n"
" if(newDoc.nope === true) {\n"
" throw({forbidden: 'nope'});\n"
" } else {\n"
" return;\n"
" }\n"
" }"
>>.
add_vdu(DbName) ->
DocProps = [
{<<"_id">>, <<"_design/vdu">>},
{<<"language">>, <<"javascript">>},
{<<"validate_doc_update">>, vdu()}
],
Doc = couch_doc:from_json_obj({DocProps}, []),
{ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
{ok, _} = fabric2_db:update_doc(Db, Doc, []).