blob: 6b4f95c2523542aede6b08fec2719223a352a528 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_replicator_error_reporting_tests).
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_replicator/src/couch_replicator.hrl").
setup_all() ->
test_util:start_couch([couch_replicator, chttpd, mem3, fabric]).
teardown_all(Ctx) ->
ok = test_util:stop_couch(Ctx).
setup() ->
meck:unload(),
Source = setup_db(),
Target = setup_db(),
{Source, Target}.
teardown({Source, Target}) ->
meck:unload(),
teardown_db(Source),
teardown_db(Target),
ok.
error_reporting_test_() ->
{
setup,
fun setup_all/0,
fun teardown_all/1,
{
foreach,
fun setup/0,
fun teardown/1,
[
fun t_fail_bulk_docs/1,
fun t_fail_changes_reader/1,
fun t_fail_revs_diff/1,
fun t_fail_changes_queue/1,
fun t_fail_changes_manager/1,
fun t_fail_changes_reader_proc/1
]
}
}.
t_fail_bulk_docs({Source, Target}) ->
?_test(begin
populate_db(Source, 1, 5),
{ok, RepId} = replicate(Source, Target),
wait_target_in_sync(Source, Target),
{ok, Listener} = rep_result_listener(RepId),
mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}),
populate_db(Source, 6, 6),
{error, Result} = wait_rep_result(RepId),
?assertEqual({bulk_docs_failed, 403, {[{<<"x">>, <<"y">>}]}}, Result),
couch_replicator_notifier:stop(Listener)
end).
t_fail_changes_reader({Source, Target}) ->
?_test(begin
populate_db(Source, 1, 5),
{ok, RepId} = replicate(Source, Target),
wait_target_in_sync(Source, Target),
{ok, Listener} = rep_result_listener(RepId),
mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
populate_db(Source, 6, 6),
{error, Result} = wait_rep_result(RepId),
?assertEqual({changes_req_failed, 418, {[{<<"x">>, <<"y">>}]}}, Result),
couch_replicator_notifier:stop(Listener)
end).
t_fail_revs_diff({Source, Target}) ->
?_test(begin
populate_db(Source, 1, 5),
{ok, RepId} = replicate(Source, Target),
wait_target_in_sync(Source, Target),
{ok, Listener} = rep_result_listener(RepId),
mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}),
populate_db(Source, 6, 6),
{error, Result} = wait_rep_result(RepId),
?assertEqual({revs_diff_failed, 407, {[{<<"x">>, <<"y">>}]}}, Result),
couch_replicator_notifier:stop(Listener)
end).
t_fail_changes_queue({Source, Target}) ->
?_test(begin
populate_db(Source, 1, 5),
{ok, RepId} = replicate(Source, Target),
wait_target_in_sync(Source, Target),
RepPid = couch_replicator_test_helper:get_pid(RepId),
State = sys:get_state(RepPid),
ChangesQueue = element(20, State),
?assert(is_process_alive(ChangesQueue)),
{ok, Listener} = rep_result_listener(RepId),
exit(ChangesQueue, boom),
{error, Result} = wait_rep_result(RepId),
?assertEqual({changes_queue_died, boom}, Result),
couch_replicator_notifier:stop(Listener)
end).
t_fail_changes_manager({Source, Target}) ->
?_test(begin
populate_db(Source, 1, 5),
{ok, RepId} = replicate(Source, Target),
wait_target_in_sync(Source, Target),
RepPid = couch_replicator_test_helper:get_pid(RepId),
State = sys:get_state(RepPid),
ChangesManager = element(21, State),
?assert(is_process_alive(ChangesManager)),
{ok, Listener} = rep_result_listener(RepId),
exit(ChangesManager, bam),
{error, Result} = wait_rep_result(RepId),
?assertEqual({changes_manager_died, bam}, Result),
couch_replicator_notifier:stop(Listener)
end).
t_fail_changes_reader_proc({Source, Target}) ->
?_test(begin
populate_db(Source, 1, 5),
{ok, RepId} = replicate(Source, Target),
wait_target_in_sync(Source, Target),
RepPid = couch_replicator_test_helper:get_pid(RepId),
State = sys:get_state(RepPid),
ChangesReader = element(22, State),
?assert(is_process_alive(ChangesReader)),
{ok, Listener} = rep_result_listener(RepId),
exit(ChangesReader, kapow),
{error, Result} = wait_rep_result(RepId),
?assertEqual({changes_reader_died, kapow}, Result),
couch_replicator_notifier:stop(Listener)
end).
mock_fail_req(Path, Return) ->
meck:expect(ibrowse, send_req_direct,
fun(W, Url, Headers, Meth, Body, Opts, TOut) ->
Args = [W, Url, Headers, Meth, Body, Opts, TOut],
{ok, {_, _, _, _, UPath, _}} = http_uri:parse(Url),
case lists:suffix(Path, UPath) of
true -> Return;
false -> meck:passthrough(Args)
end
end).
rep_result_listener(RepId) ->
ReplyTo = self(),
{ok, _Listener} = couch_replicator_notifier:start_link(
fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
ReplyTo ! Ev;
(_) ->
ok
end).
wait_rep_result(RepId) ->
receive
{finished, RepId, RepResult} -> {ok, RepResult};
{error, RepId, Reason} -> {error, Reason}
end.
setup_db() ->
DbName = ?tempdb(),
{ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
ok = couch_db:close(Db),
DbName.
teardown_db(DbName) ->
ok = couch_server:delete(DbName, [?ADMIN_CTX]).
populate_db(DbName, Start, End) ->
{ok, Db} = couch_db:open_int(DbName, []),
Docs = lists:foldl(
fun(DocIdCounter, Acc) ->
Id = integer_to_binary(DocIdCounter),
Doc = #doc{id = Id, body = {[]}},
[Doc | Acc]
end,
[], lists:seq(Start, End)),
{ok, _} = couch_db:update_docs(Db, Docs, []),
ok = couch_db:close(Db).
wait_target_in_sync(Source, Target) ->
{ok, SourceDb} = couch_db:open_int(Source, []),
{ok, SourceInfo} = couch_db:get_db_info(SourceDb),
ok = couch_db:close(SourceDb),
SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
wait_target_in_sync_loop(SourceDocCount, Target, 300).
wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
erlang:error({assertion_failed, [
{module, ?MODULE}, {line, ?LINE},
{reason, "Could not get source and target databases in sync"}
]});
wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
{ok, Target} = couch_db:open_int(TargetName, []),
{ok, TargetInfo} = couch_db:get_db_info(Target),
ok = couch_db:close(Target),
TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
case TargetDocCount == DocCount of
true ->
true;
false ->
ok = timer:sleep(500),
wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
end.
replicate(Source, Target) ->
SrcUrl = couch_replicator_test_helper:db_url(Source),
TgtUrl = couch_replicator_test_helper:db_url(Target),
RepObject = {[
{<<"source">>, SrcUrl},
{<<"target">>, TgtUrl},
{<<"continuous">>, true},
{<<"worker_processes">>, 1},
{<<"retries_per_request">>, 1},
% Low connection timeout so _changes feed gets restarted quicker
{<<"connection_timeout">>, 3000}
]},
{ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
ok = couch_replicator_scheduler:add_job(Rep),
couch_replicator_scheduler:reschedule(),
{ok, Rep#rep.id}.