blob: 6537c8b20d2eee029b99e8f739f73ecf16156f18 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_replication_manager).
-behaviour(gen_server).
-export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
-export([code_change/3, terminate/2]).
-include("couch_db.hrl").
-define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
-define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
-define(INITIAL_WAIT, 5).
-record(state, {
changes_feed_loop = nil,
db_notifier = nil,
rep_db_name = nil,
rep_start_pids = [],
max_retries
}).
-import(couch_util, [
get_value/2,
get_value/3
]).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init(_) ->
process_flag(trap_exit, true),
_ = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, protected]),
_ = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]),
Server = self(),
ok = couch_config:register(
fun("replicator", "db", NewName) ->
ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)});
("replicator", "max_replication_retry_count", NewMaxRetries1) ->
NewMaxRetries = list_to_integer(NewMaxRetries1),
ok = gen_server:cast(Server, {set_max_retries, NewMaxRetries})
end
),
{Loop, RepDbName} = changes_feed_loop(),
{ok, #state{
changes_feed_loop = Loop,
rep_db_name = RepDbName,
db_notifier = db_update_notifier(),
max_retries = list_to_integer(
couch_config:get("replicator", "max_replication_retry_count", "10"))
}}.
handle_call({rep_db_update, Change}, _From, State) ->
{reply, ok, process_update(State, Change)};
handle_call({triggered, {BaseId, _}}, _From, State) ->
[{BaseId, {DocId, true}}] = ets:lookup(?REP_ID_TO_DOC_ID, BaseId),
true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}),
{reply, ok, State};
handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) ->
DocId = get_value(<<"_id">>, Props),
[{DocId, {{BaseId, _} = RepId, MaxRetries}}] = ets:lookup(
?DOC_ID_TO_REP_ID, DocId),
?LOG_ERROR("Failed to start replication `~s` after ~p attempts using "
"the document `~s`. Last error reason was: ~p",
[pp_rep_id(RepId), MaxRetries, DocId, Error]),
couch_rep:update_rep_doc(
RepDoc,
[{<<"_replication_state">>, <<"error">>},
{<<"_replication_id">>, ?l2b(BaseId)}]),
true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
{reply, ok, State};
handle_call(Msg, From, State) ->
?LOG_ERROR("Replication manager received unexpected call ~p from ~p",
[Msg, From]),
{stop, {error, {unexpected_call, Msg}}, State}.
handle_cast({rep_db_changed, NewName}, #state{rep_db_name = NewName} = State) ->
{noreply, State};
handle_cast({rep_db_changed, _NewName}, State) ->
{noreply, restart(State)};
handle_cast({rep_db_created, NewName}, #state{rep_db_name = NewName} = State) ->
{noreply, State};
handle_cast({rep_db_created, _NewName}, State) ->
{noreply, restart(State)};
handle_cast({set_max_retries, MaxRetries}, State) ->
{noreply, State#state{max_retries = MaxRetries}};
handle_cast(Msg, State) ->
?LOG_ERROR("Replication manager received unexpected cast ~p", [Msg]),
{stop, {error, {unexpected_cast, Msg}}, State}.
handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) ->
% replicator DB deleted
{noreply, State#state{changes_feed_loop = nil, rep_db_name = nil}};
handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->
?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),
{stop, {db_update_notifier_died, Reason}, State};
handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
% one of the replication start processes terminated successfully
{noreply, State#state{rep_start_pids = Pids -- [From]}};
handle_info(Msg, State) ->
?LOG_ERROR("Replication manager received unexpected message ~p", [Msg]),
{stop, {unexpected_msg, Msg}, State}.
terminate(_Reason, State) ->
#state{
rep_start_pids = StartPids,
changes_feed_loop = Loop,
db_notifier = Notifier
} = State,
stop_all_replications(),
lists:foreach(
fun(Pid) ->
catch unlink(Pid),
catch exit(Pid, stop)
end,
[Loop | StartPids]),
true = ets:delete(?REP_ID_TO_DOC_ID),
true = ets:delete(?DOC_ID_TO_REP_ID),
couch_db_update_notifier:stop(Notifier).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
changes_feed_loop() ->
{ok, RepDb} = couch_rep:ensure_rep_db_exists(),
Server = self(),
Pid = spawn_link(
fun() ->
ChangesFeedFun = couch_changes:handle_changes(
#changes_args{
include_docs = true,
feed = "continuous",
timeout = infinity,
db_open_options = [sys_db]
},
{json_req, null},
RepDb
),
ChangesFeedFun(
fun({change, Change, _}, _) ->
case has_valid_rep_id(Change) of
true ->
ok = gen_server:call(
Server, {rep_db_update, Change}, infinity);
false ->
ok
end;
(_, _) ->
ok
end
)
end
),
couch_db:close(RepDb),
{Pid, couch_db:name(RepDb)}.
has_valid_rep_id({Change}) ->
has_valid_rep_id(get_value(<<"id">>, Change));
has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
false;
has_valid_rep_id(_Else) ->
true.
db_update_notifier() ->
Server = self(),
{ok, Notifier} = couch_db_update_notifier:start_link(
fun({created, DbName}) ->
case ?l2b(couch_config:get("replicator", "db", "_replicator")) of
DbName ->
ok = gen_server:cast(Server, {rep_db_created, DbName});
_ ->
ok
end;
(_) ->
% no need to handle the 'deleted' event - the changes feed loop
% dies when the database is deleted
ok
end
),
Notifier.
restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) ->
stop_all_replications(),
lists:foreach(
fun(Pid) ->
catch unlink(Pid),
catch exit(Pid, rep_db_changed)
end,
[Loop | StartPids]),
{NewLoop, NewRepDbName} = changes_feed_loop(),
State#state{
changes_feed_loop = NewLoop,
rep_db_name = NewRepDbName,
rep_start_pids = []
}.
process_update(State, {Change}) ->
{RepProps} = JsonRepDoc = get_value(doc, Change),
DocId = get_value(<<"_id">>, RepProps),
case get_value(<<"deleted">>, Change, false) of
true ->
rep_doc_deleted(DocId),
State;
false ->
case get_value(<<"_replication_state">>, RepProps) of
<<"completed">> ->
replication_complete(DocId),
State;
<<"error">> ->
stop_replication(DocId),
State;
<<"triggered">> ->
maybe_start_replication(State, DocId, JsonRepDoc);
undefined ->
maybe_start_replication(State, DocId, JsonRepDoc)
end
end.
rep_user_ctx({RepDoc}) ->
case get_value(<<"user_ctx">>, RepDoc) of
undefined ->
#user_ctx{};
{UserCtx} ->
#user_ctx{
name = get_value(<<"name">>, UserCtx, null),
roles = get_value(<<"roles">>, UserCtx, [])
}
end.
maybe_start_replication(#state{max_retries = MaxRetries} = State,
DocId, JsonRepDoc) ->
UserCtx = rep_user_ctx(JsonRepDoc),
{BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
[] ->
true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}),
true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, {RepId, MaxRetries}}),
Server = self(),
Pid = spawn_link(fun() ->
start_replication(Server, JsonRepDoc, RepId, UserCtx, MaxRetries)
end),
State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
[{BaseId, {DocId, _}}] ->
State;
[{BaseId, {OtherDocId, false}}] ->
?LOG_INFO("The replication specified by the document `~s` was already"
" triggered by the document `~s`", [DocId, OtherDocId]),
maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
State;
[{BaseId, {OtherDocId, true}}] ->
?LOG_INFO("The replication specified by the document `~s` is already"
" being triggered by the document `~s`", [DocId, OtherDocId]),
maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
State
end.
maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) ->
case get_value(<<"_replication_id">>, Props) of
RepId ->
ok;
_ ->
couch_rep:update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}])
end.
start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx, MaxRetries) ->
case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
Pid when is_pid(Pid) ->
?LOG_INFO("Document `~s` triggered replication `~s`",
[get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]),
ok = gen_server:call(Server, {triggered, RepId}, infinity),
couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
Error ->
couch_rep:update_rep_doc(
RepDoc,
[{<<"_replication_state">>, <<"error">>},
{<<"_replication_id">>, ?l2b(element(1, RepId))}]),
keep_retrying(
Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, MaxRetries)
end.
keep_retrying(Server, _RepId, RepDoc, _UserCtx, Error, _Wait, 0) ->
ok = gen_server:call(Server, {restart_failure, RepDoc, Error}, infinity);
keep_retrying(Server, RepId, RepDoc, UserCtx, Error, Wait, RetriesLeft) ->
{RepProps} = RepDoc,
DocId = get_value(<<"_id">>, RepProps),
?LOG_ERROR("Error starting replication `~s` (document `~s`): ~p. "
"Retrying in ~p seconds", [pp_rep_id(RepId), DocId, Error, Wait]),
ok = timer:sleep(Wait * 1000),
case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
Pid when is_pid(Pid) ->
ok = gen_server:call(Server, {triggered, RepId}, infinity),
[{DocId, {RepId, MaxRetries}}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId),
?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts",
[DocId, pp_rep_id(RepId), MaxRetries - RetriesLeft + 1]),
couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
NewError ->
keep_retrying(
Server, RepId, RepDoc, UserCtx, NewError, Wait * 2, RetriesLeft - 1)
end.
rep_doc_deleted(DocId) ->
case stop_replication(DocId) of
{ok, RepId} ->
?LOG_INFO("Stopped replication `~s` because replication document `~s`"
" was deleted", [pp_rep_id(RepId), DocId]);
none ->
ok
end.
replication_complete(DocId) ->
case stop_replication(DocId) of
{ok, RepId} ->
?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
[pp_rep_id(RepId), DocId]);
none ->
ok
end.
stop_replication(DocId) ->
case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of
[{DocId, {{BaseId, _} = RepId, _MaxRetries}}] ->
couch_rep:end_replication(RepId),
true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
{ok, RepId};
[] ->
none
end.
stop_all_replications() ->
?LOG_INFO("Stopping all ongoing replications because the replicator"
" database was deleted or changed", []),
ets:foldl(
fun({_, {RepId, _}}, _) ->
couch_rep:end_replication(RepId)
end,
ok, ?DOC_ID_TO_REP_ID),
true = ets:delete_all_objects(?REP_ID_TO_DOC_ID),
true = ets:delete_all_objects(?DOC_ID_TO_REP_ID).
% pretty-print replication id
pp_rep_id({Base, Extension}) ->
Base ++ Extension.