blob: 30a6988d8f4164a40846bb4fd6d90b7dc7a4fc38 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_replicator_doc_processor_worker).
-export([spawn_worker/4]).
-include("couch_replicator.hrl").
-import(couch_replicator_utils, [
pp_rep_id/1
]).
-define(WORKER_TIMEOUT_MSEC, 61000).
% Spawn a worker which attempts to calculate replication id then add a
% replication job to scheduler. This function create a monitor to the worker
% a worker will then exit with the #doc_worker_result{} record within
% ?WORKER_TIMEOUT_MSEC timeout period.A timeout is considered a `temporary_error`.
% Result will be sent as the `Reason` in the {'DOWN',...} message.
-spec spawn_worker(db_doc_id(), #rep{}, seconds(), reference()) -> pid().
spawn_worker(Id, Rep, WaitSec, WRef) ->
{Pid, _Ref} = spawn_monitor(fun() -> worker_fun(Id, Rep, WaitSec, WRef) end),
Pid.
% Private functions
-spec worker_fun(db_doc_id(), #rep{}, seconds(), reference()) -> no_return().
worker_fun(Id, Rep, WaitSec, WRef) ->
timer:sleep(WaitSec * 1000),
Fun = fun() ->
try maybe_start_replication(Id, Rep, WRef) of
Res ->
exit(Res)
catch
throw:{filter_fetch_error, Reason} ->
exit({temporary_error, Reason});
_Tag:Reason ->
exit({temporary_error, Reason})
end
end,
{Pid, Ref} = spawn_monitor(Fun),
receive
{'DOWN', Ref, _, Pid, Result} ->
exit(#doc_worker_result{id = Id, wref = WRef, result = Result})
after ?WORKER_TIMEOUT_MSEC ->
erlang:demonitor(Ref, [flush]),
exit(Pid, kill),
{DbName, DocId} = Id,
TimeoutSec = round(?WORKER_TIMEOUT_MSEC / 1000),
Msg = io_lib:format("Replication for db ~p doc ~p failed to start due "
"to timeout after ~B seconds", [DbName, DocId, TimeoutSec]),
Result = {temporary_error, couch_util:to_binary(Msg)},
exit(#doc_worker_result{id = Id, wref = WRef, result = Result})
end.
% Try to start a replication. Used by a worker. This function should return
% rep_start_result(), also throws {filter_fetch_error, Reason} if cannot fetch filter.
% It can also block for an indeterminate amount of time while fetching the
% filter.
maybe_start_replication(Id, RepWithoutId, WRef) ->
Rep = couch_replicator_docs:update_rep_id(RepWithoutId),
case maybe_add_job_to_scheduler(Id, Rep, WRef) of
ignore ->
ignore;
{ok, RepId} ->
{ok, RepId};
{temporary_error, Reason} ->
{temporary_error, Reason};
{permanent_failure, Reason} ->
{DbName, DocId} = Id,
StartTime = Rep#rep.start_time,
couch_replicator_docs:update_failed(DbName, DocId, Reason, StartTime),
{permanent_failure, Reason}
end.
-spec maybe_add_job_to_scheduler(db_doc_id(), #rep{}, reference()) ->
rep_start_result().
maybe_add_job_to_scheduler({DbName, DocId}, Rep, WRef) ->
RepId = Rep#rep.id,
case couch_replicator_scheduler:rep_state(RepId) of
nil ->
% Before adding a job check that this worker is still the current
% worker. This is to handle a race condition where a worker which was
% sleeping and then checking a replication filter may inadvertently re-add
% a replication which was already deleted.
case couch_replicator_doc_processor:get_worker_ref({DbName, DocId}) of
WRef ->
ok = couch_replicator_scheduler:add_job(Rep),
{ok, RepId};
_NilOrOtherWRef ->
ignore
end;
#rep{doc_id = DocId} ->
{ok, RepId};
#rep{doc_id = null} ->
Msg = io_lib:format("Replication `~s` specified by document `~s`"
" already running as a transient replication, started via"
" `_replicate` API endpoint", [pp_rep_id(RepId), DocId]),
{temporary_error, couch_util:to_binary(Msg)};
#rep{db_name = OtherDb, doc_id = OtherDocId} ->
Msg = io_lib:format("Replication `~s` specified by document `~s`"
" already started, triggered by document `~s` from db `~s`",
[pp_rep_id(RepId), DocId, OtherDocId, mem3:dbname(OtherDb)]),
{permanent_failure, couch_util:to_binary(Msg)}
end.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-define(DB, <<"db">>).
-define(DOC1, <<"doc1">>).
-define(R1, {"0b7831e9a41f9322a8600ccfa02245f2", ""}).
doc_processor_worker_test_() ->
{
foreach,
fun setup/0,
fun teardown/1,
[
t_should_add_job(),
t_already_running_same_docid(),
t_already_running_transient(),
t_already_running_other_db_other_doc(),
t_spawn_worker(),
t_ignore_if_doc_deleted(),
t_ignore_if_worker_ref_does_not_match()
]
}.
% Replication is already running, with same doc id. Ignore change.
t_should_add_job() ->
?_test(begin
Id = {?DB, ?DOC1},
Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
?assert(added_job())
end).
% Replication is already running, with same doc id. Ignore change.
t_already_running_same_docid() ->
?_test(begin
Id = {?DB, ?DOC1},
mock_already_running(?DB, ?DOC1),
Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
?assert(did_not_add_job())
end).
% There is a transient replication with same replication id running. Ignore change.
t_already_running_transient() ->
?_test(begin
Id = {?DB, ?DOC1},
mock_already_running(null, null),
Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
?assertMatch({temporary_error, _}, maybe_start_replication(Id, Rep, nil)),
?assert(did_not_add_job())
end).
% There is a duplicate replication potentially from a different db and doc.
% Write permanent failure to doc.
t_already_running_other_db_other_doc() ->
?_test(begin
Id = {?DB, ?DOC1},
mock_already_running(<<"otherdb">>, <<"otherdoc">>),
Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
?assertMatch({permanent_failure, _}, maybe_start_replication(Id, Rep, nil)),
?assert(did_not_add_job()),
1 == meck:num_calls(couch_replicator_docs, update_failed, '_')
end).
% Should spawn worker
t_spawn_worker() ->
?_test(begin
Id = {?DB, ?DOC1},
Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
WRef = make_ref(),
meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, WRef),
Pid = spawn_worker(Id, Rep, 0, WRef),
Res = receive {'DOWN', _Ref, process, Pid, Reason} -> Reason
after 1000 -> timeout end,
Expect = #doc_worker_result{id = Id, wref = WRef, result = {ok, ?R1}},
?assertEqual(Expect, Res),
?assert(added_job())
end).
% Should not add job if by the time worker got to fetching the filter
% and getting a replication id, replication doc was deleted
t_ignore_if_doc_deleted() ->
?_test(begin
Id = {?DB, ?DOC1},
Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
?assertNot(added_job())
end).
% Should not add job if by the time worker got to fetchign the filter
% and building a replication id, another worker was spawned.
t_ignore_if_worker_ref_does_not_match() ->
?_test(begin
Id = {?DB, ?DOC1},
Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, make_ref()),
?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
?assertNot(added_job())
end).
% Test helper functions
setup() ->
meck:expect(couch_replicator_scheduler, add_job, 1, ok),
meck:expect(config, get, fun(_, _, Default) -> Default end),
meck:expect(couch_server, get_uuid, 0, this_is_snek),
meck:expect(couch_replicator_docs, update_failed, 4, ok),
meck:expect(couch_replicator_scheduler, rep_state, 1, nil),
meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
ok.
teardown(_) ->
meck:unload().
mock_already_running(DbName, DocId) ->
meck:expect(couch_replicator_scheduler, rep_state,
fun(RepId) -> #rep{id = RepId, doc_id = DocId, db_name = DbName} end).
added_job() ->
1 == meck:num_calls(couch_replicator_scheduler, add_job, '_').
did_not_add_job() ->
0 == meck:num_calls(couch_replicator_scheduler, add_job, '_').
change() ->
{[
{<<"_id">>, ?DOC1},
{<<"source">>, <<"src">>},
{<<"target">>, <<"tgt">>}
]}.
-endif.