% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
error_reporting_test_() ->
fun couch_replicator_test_helper:test_setup/0,
fun couch_replicator_test_helper:test_teardown/1,
?TDEF_FE(t_fail_bulk_get, 15),
?TDEF_FE(t_fail_open_docs_get, 15),
t_fail_bulk_docs({_Ctx, {Source, Target}}) ->
populate_db(Source, 1, 5),
{ok, RepId} = replicate(Source, Target),
wait_target_in_sync(Source, Target),
{ok, Listener} = rep_result_listener(RepId),
mock_fail_req(post, "/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}),
populate_db(Source, 6, 6),
{error, Result} = wait_rep_result(RepId),
?assertEqual({bulk_docs_failed, 403, {[{<<"x">>, <<"y">>}]}}, Result),
t_fail_changes_reader({_Ctx, {Source, Target}}) ->
populate_db(Source, 1, 5),
{ok, RepId} = replicate(Source, Target),
wait_target_in_sync(Source, Target),
{ok, Listener} = rep_result_listener(RepId),
mock_fail_req(get, "/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
populate_db(Source, 6, 6),
{error, Result} = wait_rep_result(RepId),
?assertEqual({changes_req_failed, 418, {[{<<"x">>, <<"y">>}]}}, Result),
t_fail_doc_put_4xx_well_formed_json_error({_Ctx, {Source, Target}}) ->
populate_db(Source, 1, 5),
{ok, RepId} = replicate(Source, Target),
wait_target_in_sync(Source, Target),
{ok, Listener} = rep_result_listener(RepId),
ErrBody = [<<"{\"error\":\"x\", \"reason\":\"y\"}">>],
mock_fail_req(put, "/6", {ok, "400", [], ErrBody}),
populate_db(Source, 6, 6, _WithAttachments = true),
{error, Result} = wait_rep_result(RepId),
?assertEqual({doc_write_failed, {<<"x">>, <<"y">>}}, Result),
t_fail_doc_put_4xx_unexpected_json_error({_Ctx, {Source, Target}}) ->
populate_db(Source, 1, 5),
{ok, RepId} = replicate(Source, Target),
wait_target_in_sync(Source, Target),
{ok, Listener} = rep_result_listener(RepId),
ErrBody = [<<"{\"a\":\"b\"}">>],
mock_fail_req(put, "/6", {ok, "400", [], ErrBody}),
populate_db(Source, 6, 6, _WithAttachments = true),
{error, Result} = wait_rep_result(RepId),
?assertEqual({doc_write_failed, {400, [{<<"a">>, <<"b">>}]}}, Result),
t_fail_doc_put_4xx_invalid_json_error({_Ctx, {Source, Target}}) ->
populate_db(Source, 1, 5),
{ok, RepId} = replicate(Source, Target),
wait_target_in_sync(Source, Target),
{ok, Listener} = rep_result_listener(RepId),
mock_fail_req(put, "/6", {ok, "400", [], [<<"potato">>]}),
populate_db(Source, 6, 6, _WithAttachments = true),
{error, Result} = wait_rep_result(RepId),
?assertMatch({doc_write_failed, {invalid_json, _}}, Result),
t_skip_doc_put_401_errors({_Ctx, {Source, Target}}) ->
populate_db(Source, 1, 5),
populate_db(Source, 6, 6, _WithAttachments = true),
ErrBody = [<<"{\"error\":\"unauthorized\", \"reason\":\"vdu\"}">>],
mock_fail_req(put, "/6", {ok, "401", [], ErrBody}),
{ok, RepId} = replicate(Source, Target, #{continuous => false}),
{ok, Listener} = rep_result_listener(RepId),
Res = wait_rep_result(RepId),
% Replication job should succeed
?assertMatch({ok, {[_ | _]}}, Res),
{ok, {Props}} = Res,
History = proplists:get_value(<<"history">>, Props),
?assertMatch([{[_ | _]}], History),
[{HistProps}] = History,
DocsWritten = proplists:get_value(<<"docs_written">>, HistProps),
DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, HistProps),
?assertEqual(5, DocsWritten),
?assertEqual(1, DocWriteFailures),
t_skip_doc_put_403_errors({_Ctx, {Source, Target}}) ->
populate_db(Source, 1, 5),
populate_db(Source, 6, 6, _WithAttachments = true),
ErrBody = [<<"{\"error\":\"forbidden\", \"reason\":\"vdu\"}">>],
mock_fail_req(put, "/6", {ok, "403", [], ErrBody}),
{ok, RepId} = replicate(Source, Target, #{continuous => false}),
{ok, Listener} = rep_result_listener(RepId),
Res = wait_rep_result(RepId),
% Replication job should succeed
?assertMatch({ok, {[_ | _]}}, Res),
{ok, {Props}} = Res,
History = proplists:get_value(<<"history">>, Props),
?assertMatch([{[_ | _]}], History),
[{HistProps}] = History,
DocsWritten = proplists:get_value(<<"docs_written">>, HistProps),
DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, HistProps),
?assertEqual(5, DocsWritten),
?assertEqual(1, DocWriteFailures),
t_skip_doc_put_413_errors({_Ctx, {Source, Target}}) ->
populate_db(Source, 1, 5),
populate_db(Source, 6, 6, _WithAttachments = true),
ErrBody = [<<"{\"error\":\"too_large\", \"reason\":\"too_large\"}">>],
mock_fail_req(put, "/6", {ok, "413", [], ErrBody}),
{ok, RepId} = replicate(Source, Target, #{continuous => false}),
{ok, Listener} = rep_result_listener(RepId),
Res = wait_rep_result(RepId),
% Replication job should succeed
?assertMatch({ok, {[_ | _]}}, Res),
{ok, {Props}} = Res,
History = proplists:get_value(<<"history">>, Props),
?assertMatch([{[_ | _]}], History),
[{HistProps}] = History,
DocsWritten = proplists:get_value(<<"docs_written">>, HistProps),
DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, HistProps),
?assertEqual(5, DocsWritten),
?assertEqual(1, DocWriteFailures),
t_skip_doc_put_415_errors({_Ctx, {Source, Target}}) ->
populate_db(Source, 1, 5),
populate_db(Source, 6, 6, _WithAttachments = true),
ErrBody = [<<"{\"error\":\"unsupported_media_type\", \"reason\":\"bad_media\"}">>],
mock_fail_req(put, "/6", {ok, "415", [], ErrBody}),
{ok, RepId} = replicate(Source, Target, #{continuous => false}),
{ok, Listener} = rep_result_listener(RepId),
Res = wait_rep_result(RepId),
% Replication job should succeed
?assertMatch({ok, {[_ | _]}}, Res),
{ok, {Props}} = Res,
History = proplists:get_value(<<"history">>, Props),
?assertMatch([{[_ | _]}], History),
[{HistProps}] = History,
DocsWritten = proplists:get_value(<<"docs_written">>, HistProps),
DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, HistProps),
?assertEqual(5, DocsWritten),
?assertEqual(1, DocWriteFailures),
t_skip_doc_put_invalid_attachment_name({_Ctx, {Source, Target}}) ->
populate_db(Source, 1, 5),
populate_db(Source, 6, 6, _WithAttachments = true),
ErrBody = [
<<"{\"error\":\"bad_request\", \"reason\":\"Attachment name '_foo' starts with prohibited character '_'\"}">>
mock_fail_req(put, "/6", {ok, "400", [], ErrBody}),
{ok, RepId} = replicate(Source, Target, #{continuous => false}),
{ok, Listener} = rep_result_listener(RepId),
Res = wait_rep_result(RepId),
% Replication job should succeed
?assertMatch({ok, {[_ | _]}}, Res),
{ok, {Props}} = Res,
History = proplists:get_value(<<"history">>, Props),
?assertMatch([{[_ | _]}], History),
[{HistProps}] = History,
DocsWritten = proplists:get_value(<<"docs_written">>, HistProps),
DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, HistProps),
?assertEqual(5, DocsWritten),
?assertEqual(1, DocWriteFailures),
t_fail_revs_diff({_Ctx, {Source, Target}}) ->
populate_db(Source, 1, 5),
{ok, RepId} = replicate(Source, Target),
wait_target_in_sync(Source, Target),
{ok, Listener} = rep_result_listener(RepId),
mock_fail_req(post, "/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}),
populate_db(Source, 6, 6),
{error, Result} = wait_rep_result(RepId),
?assertEqual({revs_diff_failed, 407, {[{<<"x">>, <<"y">>}]}}, Result),
t_fail_bulk_get({_Ctx, {Source, Target}}) ->
% For _bulk_get the expectation is that the replication job will fallback
% to a plain GET so the shape of the test is a bit different than the other
% tests here.
meck:new(couch_replicator_api_wrap, [passthrough]),
populate_db(Source, 1, 5),
{ok, _} = replicate(Source, Target),
wait_target_in_sync(Source, Target),
% Tolerate a 500 error
mock_fail_req(post, "/_bulk_get", {ok, "501", [], [<<"not_implemented">>]}),
populate_db(Source, 6, 6),
wait_target_in_sync(Source, Target),
% Check that there was a fallback to a plain GET
?assertEqual(1, meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6)),
% Tolerate a 400 error
mock_fail_req(post, "/_bulk_get", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
populate_db(Source, 7, 7),
wait_target_in_sync(Source, Target),
% Check that there was a falback to a plain GET
?assertEqual(1, meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6)).
t_fail_open_docs_get({_Ctx, {Source, Target}}) ->
populate_db(Source, 1, 5),
Opts = #{
% We're testing the case of individual doc rev GETs
use_bulk_get => false,
% Perform at least one retry before giving up (for extra coverage)
retries_per_request => 2
{ok, RepId} = replicate(Source, Target, Opts),
wait_target_in_sync(Source, Target),
{ok, Listener} = rep_result_listener(RepId),
% Break open_doc_revs on the server side and see what happens
meck:new(fabric_doc_open_revs, [passthrough]),
meck:expect(fabric_doc_open_revs, go, fun
(Src, <<"6">>, _, _) when Src =:= Source ->
% This is a random error, no particular reason for a 404
meck:exception(throw, not_found);
(ArgDb, ArgDocId, ArgRevs, ArgOpts) ->
meck:passthrough([ArgDb, ArgDocId, ArgRevs, ArgOpts])
populate_db(Source, 6, 6),
{error, Result} = wait_rep_result(RepId),
?assertMatch({worker_died, _, {process_died, _, open_doc_revs_failed}}, Result),
?assert(meck:num_calls(fabric_doc_open_revs, go, 4) >= 2),
t_fail_changes_queue({_Ctx, {Source, Target}}) ->
populate_db(Source, 1, 5),
{ok, RepId} = replicate(Source, Target),
wait_target_in_sync(Source, Target),
RepPid = couch_replicator_test_helper:get_pid(RepId),
State = sys:get_state(RepPid),
ChangesQueue = element(20, State),
{ok, Listener} = rep_result_listener(RepId),
exit(ChangesQueue, boom),
{error, Result} = wait_rep_result(RepId),
?assertEqual({changes_queue_died, boom}, Result),
t_fail_changes_manager({_Ctx, {Source, Target}}) ->
populate_db(Source, 1, 5),
{ok, RepId} = replicate(Source, Target),
wait_target_in_sync(Source, Target),
RepPid = couch_replicator_test_helper:get_pid(RepId),
State = sys:get_state(RepPid),
ChangesManager = element(21, State),
{ok, Listener} = rep_result_listener(RepId),
exit(ChangesManager, bam),
{error, Result} = wait_rep_result(RepId),
?assertEqual({changes_manager_died, bam}, Result),
t_fail_changes_reader_proc({_Ctx, {Source, Target}}) ->
populate_db(Source, 1, 5),
{ok, RepId} = replicate(Source, Target),
wait_target_in_sync(Source, Target),
RepPid = couch_replicator_test_helper:get_pid(RepId),
State = sys:get_state(RepPid),
ChangesReader = element(22, State),
{ok, Listener} = rep_result_listener(RepId),
exit(ChangesReader, kapow),
{error, Result} = wait_rep_result(RepId),
?assertEqual({changes_reader_died, kapow}, Result),
t_dont_start_duplicate_job({_Ctx, {Source, Target}}) ->
meck:new(couch_replicator_pg, [passthrough]),
Pid = pid_from_another_node(),
meck:expect(couch_replicator_pg, should_start, fun(_, _) -> {no, Pid} end),
Rep = make_rep(Source, Target, #{continuous => true}),
ExpectErr = {error, {already_started, Pid}},
?assertEqual(ExpectErr, couch_replicator_scheduler_job:start_link(Rep)).
t_can_start_multiple_jobs({_Ctx, {Source, Target1}}) ->
Target2 = couch_replicator_test_helper:setup_db(),
populate_db(Source, 1, 5),
{ok, RepId1} = replicate(Source, Target1),
{ok, RepId2} = replicate(Source, Target2),
RepPid1 = couch_replicator_test_helper:get_pid(RepId1),
RepPid2 = couch_replicator_test_helper:get_pid(RepId2),
wait_target_in_sync(Source, Target1),
wait_target_in_sync(Source, Target2),
exit(RepPid1, kill),
exit(RepPid2, kill),
t_stop_duplicate_job({_Ctx, {Source, Target}}) ->
{ok, RepId} = replicate(Source, Target),
wait_target_in_sync(Source, Target),
RepPid = couch_replicator_test_helper:get_pid(RepId),
{ok, Listener} = rep_result_listener(RepId),
Pid = pid_from_another_node(),
meck:expect(couch_replicator_pg, should_run, fun(_, _) -> {no, Pid} end),
RepPid ! {'$gen_cast', checkpoint},
{error, Result} = wait_rep_result(RepId),
?assertEqual(duplicate_job, Result),
pid_from_another_node() ->
% Use a Pid serialized from a node named A@1
% (A@1)1> term_to_binary(self()).
Bin = <<131, 88, 100, 0, 3, 65, 64, 49, 0, 0, 0, 89, 0, 0, 0, 0, 99, 137, 147, 218>>,
Pid = binary_to_term(Bin),
?assertEqual('A@1', node(Pid)),
mock_fail_req(Method, Path, Return) ->
fun(W, Url, Headers, Meth, Body, Opts, TOut) ->
Args = [W, Url, Headers, Meth, Body, Opts, TOut],
#{path := UPath} = uri_string:parse(Url),
case {lists:suffix(Path, UPath), Method == Meth} of
{true, true} ->
_ = meck:passthrough(Args),
{_, _} ->
rep_result_listener(RepId) ->
ReplyTo = self(),
{ok, _Listener} = couch_replicator_notifier:start_link(
({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
ReplyTo ! Ev;
(_) ->
wait_rep_result(RepId) ->
{finished, RepId, RepResult} -> {ok, RepResult};
{error, RepId, Reason} -> {error, Reason}
populate_db(DbName, Start, End) ->
populate_db(DbName, Start, End, false).
populate_db(DbName, Start, End, WithAttachments) ->
Docs = lists:foldl(
fun(DocIdCounter, Acc) ->
Id = integer_to_binary(DocIdCounter),
Atts =
case WithAttachments of
true -> [att(<<"att1">>, 1024, <<"app/binary">>)];
false -> []
Doc = #doc{id = Id, body = {[]}, atts = Atts},
[Doc | Acc]
lists:seq(Start, End)
{ok, [_ | _]} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]).
att(Name, Size, Type) ->
{name, Name},
{type, Type},
{att_len, Size},
{data, fun(Count) -> crypto:strong_rand_bytes(Count) end}
wait_target_in_sync(Source, Target) ->
{ok, SourceDocCount} = fabric:get_doc_count(Source),
wait_target_in_sync_loop(SourceDocCount, Target, 300).
wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
{assertion_failed, [
{module, ?MODULE},
{line, ?LINE},
{reason, "Could not get source and target databases in sync"}
wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
{ok, TargetDocCount} = fabric:get_doc_count(TargetName),
case TargetDocCount == DocCount of
true ->
false ->
ok = timer:sleep(500),
wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
replicate(Source, Target) ->
replicate(Source, Target, #{}).
replicate(Source, Target, #{} = Opts) ->
Rep = make_rep(Source, Target, Opts),
ok = couch_replicator_scheduler:add_job(Rep),
make_rep(Source, Target, #{} = OverrideOpts) ->
Opts0 = #{
source => url(Source),
target => url(Target),
continuous => true,
worker_processes => 1,
retries_per_request => 1,
% Low connection timeout so _changes feed gets restarted quicker
connection_timeout => 3000
RepMap = maps:merge(Opts0, OverrideOpts),
% parse_rep_doc accepts {[...]} ejson format
RepEJson = couch_util:json_decode(couch_util:json_encode(RepMap)),
{ok, Rep} = couch_replicator_parse:parse_rep_doc(RepEJson, ?ADMIN_USER),
url(DbName) ->