blob: 51b6ec6b187673c22da613ceac3a48690f47ada9 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_replicator_scheduler_job).
-behaviour(gen_server).
-export([
start_link/1
]).
-export([
init/1,
terminate/2,
handle_call/3,
handle_info/2,
handle_cast/2,
format_status/2,
sum_stats/2,
report_seq_done/3
]).
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-include("couch_replicator.hrl").
-import(couch_util, [
get_value/2,
get_value/3,
to_binary/1
]).
-import(couch_replicator_utils, [
pp_rep_id/1
]).
-define(LOWEST_SEQ, 0).
-define(DEFAULT_CHECKPOINT_INTERVAL, 30000).
-define(STARTUP_JITTER_DEFAULT, 5000).
-record(rep_state, {
rep_details,
source_name,
target_name,
source,
target,
history,
checkpoint_history,
start_seq,
committed_seq,
current_through_seq,
seqs_in_progress = [],
highest_seq_done = {0, ?LOWEST_SEQ},
source_log,
target_log,
rep_starttime,
src_starttime,
tgt_starttime,
% checkpoint timer
timer,
changes_queue,
changes_manager,
changes_reader,
workers,
stats = couch_replicator_stats:new(),
session_id,
source_seq = nil,
use_checkpoints = true,
checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
type = db,
view = nil,
certificate_checker
}).
start_link(#rep{id = Id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
RepChildId = BaseId ++ Ext,
Source = couch_replicator_api_wrap:db_uri(Src),
Target = couch_replicator_api_wrap:db_uri(Tgt),
case couch_replicator_pg:should_start(Id, node()) of
yes ->
case gen_server:start_link(?MODULE, Rep, []) of
{ok, Pid} ->
couch_replicator_pg:join(Id, Pid),
{ok, Pid};
{error, Reason} ->
couch_log:warning(
"failed to start replication `~s` (`~s` -> `~s`)",
[RepChildId, Source, Target]
),
{error, Reason}
end;
{no, OtherPid} ->
{error, {already_started, OtherPid}}
end.
init(InitArgs) ->
{ok, InitArgs, 0}.
do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx = UserCtx} = Rep) ->
process_flag(trap_exit, true),
timer:sleep(startup_jitter()),
#rep_state{
source = Source,
target = Target,
source_name = SourceName,
target_name = TargetName,
start_seq = {_Ts, StartSeq},
highest_seq_done = {_, HighestSeq},
checkpoint_interval = CheckpointInterval
} = State = init_state(Rep),
NumWorkers = get_value(worker_processes, Options),
BatchSize = get_value(worker_batch_size, Options),
{ok, ChangesQueue} = couch_work_queue:new([
{max_items, BatchSize * NumWorkers * 2},
{max_size, 100 * 1024 * NumWorkers}
]),
% This starts the _changes reader process. It adds the changes from
% the source db to the ChangesQueue.
{ok, ChangesReader} = couch_replicator_changes_reader:start_link(
StartSeq, Source, ChangesQueue, Options
),
% Changes manager - responsible for dequeing batches from the changes queue
% and deliver them to the worker processes.
ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
% This starts the worker processes. They ask the changes queue manager for a
% a batch of _changes rows to process -> check which revs are missing in the
% target, and for the missing ones, it copies them from the source to the target.
Workers = lists:map(
fun(_) ->
couch_stats:increment_counter([couch_replicator, workers_started]),
{ok, Pid} = couch_replicator_worker:start_link(
self(), Source, Target, ChangesManager, Options
),
Pid
end,
lists:seq(1, NumWorkers)
),
couch_task_status:add_task(
[
{type, replication},
{user, UserCtx#user_ctx.name},
{replication_id, ?l2b(BaseId ++ Ext)},
{database, Rep#rep.db_name},
{doc_id, Rep#rep.doc_id},
{source, ?l2b(SourceName)},
{target, ?l2b(TargetName)},
{continuous, get_value(continuous, Options, false)},
{source_seq, seq_encode(HighestSeq)},
{checkpoint_interval, CheckpointInterval}
] ++ rep_stats(State)
),
couch_task_status:set_update_frequency(1000),
% Until OTP R14B03:
%
% Restarting a temporary supervised child implies that the original arguments
% (#rep{} record) specified in the MFA component of the supervisor
% child spec will always be used whenever the child is restarted.
% This implies the same replication performance tunning parameters will
% always be used. The solution is to delete the child spec (see
% cancel_replication/1) and then start the replication again, but this is
% unfortunately not immune to race conditions.
log_replication_start(State),
CertificateCheckerPid = verify_ssl_certificates_log(Rep),
couch_log:debug("Worker pids are: ~p", [Workers]),
doc_update_triggered(Rep),
{ok, State#rep_state{
changes_queue = ChangesQueue,
changes_manager = ChangesManager,
changes_reader = ChangesReader,
certificate_checker = CertificateCheckerPid,
workers = Workers
}}.
handle_call({report_seq_done, Seq, StatsInc}, From, State) ->
#rep_state{
seqs_in_progress = SeqsInProgress,
highest_seq_done = HighestDone,
current_through_seq = ThroughSeq,
stats = Stats
} = State,
gen_server:reply(From, ok),
{NewThroughSeq0, NewSeqsInProgress} =
case SeqsInProgress of
[] ->
{Seq, []};
[Seq | Rest] ->
{Seq, Rest};
[_ | _] ->
{ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
end,
NewHighestDone = lists:max([HighestDone, Seq]),
NewThroughSeq =
case NewSeqsInProgress of
[] ->
lists:max([NewThroughSeq0, NewHighestDone]);
_ ->
NewThroughSeq0
end,
couch_log:debug(
"Worker reported seq ~p, through seq was ~p, "
"new through seq is ~p, highest seq done was ~p, "
"new highest seq done is ~p~n"
"Seqs in progress were: ~p~nSeqs in progress are now: ~p",
[
Seq,
ThroughSeq,
NewThroughSeq,
HighestDone,
NewHighestDone,
SeqsInProgress,
NewSeqsInProgress
]
),
NewState = State#rep_state{
stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
current_through_seq = NewThroughSeq,
seqs_in_progress = NewSeqsInProgress,
highest_seq_done = NewHighestDone
},
update_task(NewState),
{noreply, NewState}.
handle_cast({sum_stats, Stats}, State) ->
NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
{noreply, State#rep_state{stats = NewStats}};
handle_cast(checkpoint, #rep_state{rep_details = Rep} = State) ->
case couch_replicator_pg:should_run(Rep#rep.id, self()) of
yes ->
case do_checkpoint(State) of
{ok, NewState} ->
couch_stats:increment_counter([couch_replicator, checkpoints, success]),
{noreply, NewState#rep_state{timer = start_timer(State)}};
Error ->
couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
{stop, Error, State}
end;
{no, OtherPid} ->
{stop, {shutdown, {duplicate_job, OtherPid}}, State}
end;
handle_cast(
{report_seq, Seq},
#rep_state{seqs_in_progress = SeqsInProgress} = State
) ->
NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
{noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
handle_info(shutdown, St) ->
{stop, shutdown, St};
handle_info({'EXIT', Pid, max_backoff}, State) ->
couch_log:error("Max backoff reached child process ~p", [Pid]),
{stop, {shutdown, max_backoff}, State};
handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) ->
couch_log:error("Max backoff reached child process ~p", [Pid]),
{stop, {shutdown, max_backoff}, State};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader = Pid} = State) ->
{noreply, State};
handle_info({'EXIT', Pid, _Reason}, #rep_state{certificate_checker = Pid} = State) ->
{noreply, State};
handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader = Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
Reason =
case Reason0 of
{changes_req_failed, _, _} = HttpFail ->
HttpFail;
{http_request_failed, _, _, {error, {code, Code}}} ->
{changes_req_failed, Code};
{http_request_failed, _, _, {error, Err}} ->
{changes_req_failed, Err};
Other ->
{changes_reader_died, Other}
end,
couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
{stop, {shutdown, Reason}, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
{noreply, State};
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
{stop, {shutdown, {changes_manager_died, Reason}}, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue = Pid} = State) ->
{noreply, State};
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue = Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
{stop, {shutdown, {changes_queue_died, Reason}}, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
case Workers -- [Pid] of
Workers ->
couch_log:error("unknown pid bit the dust ~p ~n", [Pid]),
{noreply, State#rep_state{workers = Workers}};
%% not clear why a stop was here before
%%{stop, {unknown_process_died, Pid, normal}, State};
[] ->
catch unlink(State#rep_state.changes_manager),
catch exit(State#rep_state.changes_manager, kill),
do_last_checkpoint(State);
Workers2 ->
{noreply, State#rep_state{workers = Workers2}}
end;
handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
State2 = cancel_timer(State),
case lists:member(Pid, Workers) of
false ->
{stop, {unknown_process_died, Pid, Reason}, State2};
true ->
couch_stats:increment_counter([couch_replicator, worker_deaths]),
StopReason =
case Reason of
{shutdown, _} = Err ->
Err;
Other ->
couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
{worker_died, Pid, Other}
end,
{stop, StopReason, State2}
end;
handle_info(timeout, InitArgs) ->
try do_init(InitArgs) of
{ok, State} ->
{noreply, State}
catch
exit:{http_request_failed, _, _, max_backoff} ->
{stop, {shutdown, max_backoff}, {error, InitArgs}};
Class:Error:Stack ->
ShutdownReason = {error, replication_start_error(Error)},
StackTop2 = lists:sublist(Stack, 2),
% Shutdown state is a hack as it is not really the state of the
% gen_server (it failed to initialize, so it doesn't have one).
% Shutdown state is used to pass extra info about why start failed.
ShutdownState = {error, Class, StackTop2, InitArgs},
{stop, {shutdown, ShutdownReason}, ShutdownState}
end;
handle_info({Ref, Tuple}, State) when is_reference(Ref), is_tuple(Tuple) ->
% Ignore responses from timed-out or retried ibrowse calls. Aliases in
% Erlang 24 should help with this problem, so we should revisit this clause
% when we update our minimum Erlang version to >= 24.
{noreply, State}.
terminate(
normal,
#rep_state{
rep_details = #rep{id = RepId} = Rep,
checkpoint_history = CheckpointHistory
} = State
) ->
terminate_cleanup(State),
couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
doc_update_completed(Rep, rep_stats(State));
terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
% Replication stopped via _scheduler_sup:terminate_child/1, which can be
% occur during regular scheduler operation or when job is removed from
% the scheduler.
State1 =
case do_checkpoint(State) of
{ok, NewState} ->
NewState;
Error ->
LogMsg = "~p : Failed last checkpoint. Job: ~p Error: ~p",
couch_log:error(LogMsg, [?MODULE, RepId, Error]),
State
end,
couch_replicator_notifier:notify({stopped, RepId, <<"stopped">>}),
terminate_cleanup(State1);
terminate({shutdown, max_backoff}, {error, InitArgs}) ->
#rep{id = {BaseId, Ext} = RepId} = InitArgs,
couch_replicator_pg:leave(RepId, self()),
couch_stats:increment_counter([couch_replicator, failed_starts]),
couch_log:warning("Replication `~s` reached max backoff ", [BaseId ++ Ext]),
couch_replicator_notifier:notify({error, RepId, max_backoff});
terminate({shutdown, {error, Error}}, {error, Class, Stack, InitArgs}) ->
#rep{
id = {BaseId, Ext} = RepId,
source = Source0,
target = Target0,
doc_id = DocId,
db_name = DbName
} = InitArgs,
couch_replicator_pg:leave(RepId, self()),
Source = couch_replicator_api_wrap:db_uri(Source0),
Target = couch_replicator_api_wrap:db_uri(Target0),
RepIdStr = BaseId ++ Ext,
Msg = "~p:~p: Replication ~s failed to start ~p -> ~p doc ~p:~p stack:~p",
couch_log:error(Msg, [
Class,
Error,
RepIdStr,
Source,
Target,
DbName,
DocId,
Stack
]),
couch_stats:increment_counter([couch_replicator, failed_starts]),
couch_replicator_notifier:notify({error, RepId, Error});
terminate({shutdown, max_backoff}, State) ->
#rep_state{
source_name = Source,
target_name = Target,
rep_details = #rep{id = {BaseId, Ext} = RepId}
} = State,
couch_log:error(
"Replication `~s` (`~s` -> `~s`) reached max backoff",
[BaseId ++ Ext, Source, Target]
),
terminate_cleanup(State),
couch_replicator_notifier:notify({error, RepId, max_backoff});
terminate({shutdown, {duplicate_job, OtherPid}}, State) ->
#rep_state{
source_name = Source,
target_name = Target,
rep_details = #rep{id = {BaseId, Ext} = RepId}
} = State,
couch_log:error(
"Replication `~s` (`~s` -> `~s`) with pid ~p was usurped by ~p on node ~p",
[BaseId ++ Ext, Source, Target, self(), OtherPid, node(OtherPid)]
),
terminate_cleanup(State),
couch_replicator_notifier:notify({error, RepId, duplicate_job});
terminate({shutdown, Reason}, State) ->
% Unwrap so when reporting we don't have an extra {shutdown, ...} tuple
% wrapped around the message
terminate(Reason, State);
terminate(Reason, State) ->
#rep_state{
source_name = Source,
target_name = Target,
rep_details = #rep{id = {BaseId, Ext} = RepId}
} = State,
couch_log:error(
"Replication `~s` (`~s` -> `~s`) failed: ~s",
[BaseId ++ Ext, Source, Target, to_binary(Reason)]
),
terminate_cleanup(State),
couch_replicator_notifier:notify({error, RepId, Reason}).
terminate_cleanup(#rep_state{rep_details = #rep{id = RepId}} = State) ->
couch_replicator_pg:leave(RepId, self()),
update_task(State),
couch_replicator_api_wrap:db_close(State#rep_state.source),
couch_replicator_api_wrap:db_close(State#rep_state.target).
format_status(_Opt, [_PDict, State]) ->
#rep_state{
source = Source,
target = Target,
rep_details = RepDetails,
start_seq = StartSeq,
source_seq = SourceSeq,
committed_seq = CommitedSeq,
current_through_seq = ThroughSeq,
highest_seq_done = HighestSeqDone,
session_id = SessionId
} = state_strip_creds(State),
#rep{
id = RepId,
options = Options,
doc_id = DocId,
db_name = DbName
} = RepDetails,
[
{rep_id, RepId},
{source, couch_replicator_api_wrap:db_uri(Source)},
{target, couch_replicator_api_wrap:db_uri(Target)},
{db_name, DbName},
{doc_id, DocId},
{options, Options},
{session_id, SessionId},
{start_seq, StartSeq},
{source_seq, SourceSeq},
{committed_seq, CommitedSeq},
{current_through_seq, ThroughSeq},
{highest_seq_done, HighestSeqDone}
].
sum_stats(Pid, Stats) when is_pid(Pid) ->
gen_server:cast(Pid, {sum_stats, Stats}).
report_seq_done(Pid, ReportSeq, Stats) when is_pid(Pid) ->
gen_server:call(Pid, {report_seq_done, ReportSeq, Stats}, infinity).
startup_jitter() ->
Jitter = config:get_integer(
"replicator",
"startup_jitter",
?STARTUP_JITTER_DEFAULT
),
couch_rand:uniform(erlang:max(1, Jitter)).
headers_strip_creds([], Acc) ->
lists:reverse(Acc);
headers_strip_creds([{Key, Value0} | Rest], Acc) ->
Value =
case string:to_lower(Key) of
"authorization" ->
"****";
_ ->
Value0
end,
headers_strip_creds(Rest, [{Key, Value} | Acc]).
httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
HttpDb#httpdb{
url = couch_util:url_strip_password(Url),
headers = headers_strip_creds(Headers, [])
};
httpdb_strip_creds(LocalDb) ->
LocalDb.
rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
Rep#rep{
source = httpdb_strip_creds(Source),
target = httpdb_strip_creds(Target)
}.
state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) ->
% #rep_state contains the source and target at the top level and also
% in the nested #rep_details record
State#rep_state{
rep_details = rep_strip_creds(Rep),
source = httpdb_strip_creds(Source),
target = httpdb_strip_creds(Target)
}.
adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
couch_log:notice(Msg, [RepId]),
Src#httpdb{http_connections = 2};
adjust_maxconn(Src, _RepId) ->
Src.
-spec doc_update_triggered(#rep{}) -> ok.
doc_update_triggered(#rep{db_name = null}) ->
ok;
doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) ->
case couch_replicator_doc_processor:update_docs() of
true ->
couch_replicator_docs:update_triggered(Rep, RepId);
false ->
ok
end,
couch_log:notice(
"Document `~s` triggered replication `~s`",
[DocId, pp_rep_id(RepId)]
),
ok.
-spec doc_update_completed(#rep{}, list()) -> ok.
doc_update_completed(#rep{db_name = null}, _Stats) ->
ok;
doc_update_completed(
#rep{
id = RepId,
doc_id = DocId,
db_name = DbName,
start_time = StartTime
},
Stats0
) ->
Stats = Stats0 ++ [{start_time, couch_replicator_utils:iso8601(StartTime)}],
couch_replicator_docs:update_doc_completed(DbName, DocId, Stats),
couch_log:notice(
"Replication `~s` completed (triggered by `~s`)",
[pp_rep_id(RepId), DocId]
),
ok.
do_last_checkpoint(
#rep_state{
seqs_in_progress = [],
highest_seq_done = {_Ts, ?LOWEST_SEQ}
} = State
) ->
{stop, normal, cancel_timer(State)};
do_last_checkpoint(
#rep_state{
seqs_in_progress = [],
highest_seq_done = Seq
} = State
) ->
case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
{ok, NewState} ->
couch_stats:increment_counter([couch_replicator, checkpoints, success]),
{stop, normal, cancel_timer(NewState)};
Error ->
couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
{stop, Error, State}
end.
start_timer(State) ->
After = State#rep_state.checkpoint_interval,
case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
{ok, Ref} ->
Ref;
Error ->
couch_log:error("Replicator, error scheduling checkpoint: ~p", [Error]),
nil
end.
cancel_timer(#rep_state{timer = nil} = State) ->
State;
cancel_timer(#rep_state{timer = Timer} = State) ->
{ok, cancel} = timer:cancel(Timer),
State#rep_state{timer = nil}.
init_state(Rep) ->
#rep{
id = {BaseId, _Ext},
source = Src0,
target = Tgt,
options = Options,
type = Type,
view = View,
start_time = StartTime,
stats = ArgStats0
} = Rep,
% Adjust minimum number of http source connections to 2 to avoid deadlock
Src = adjust_maxconn(Src0, BaseId),
{ok, Source} = couch_replicator_api_wrap:db_open(Src),
{CreateTargetParams} = get_value(create_target_params, Options, {[]}),
{ok, Target} = couch_replicator_api_wrap:db_open(
Tgt,
get_value(create_target, Options, false),
CreateTargetParams
),
{ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
{ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
[SourceLog, TargetLog] = find_and_migrate_logs([Source, Target], Rep),
{StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
ArgStats1 = couch_replicator_stats:new(ArgStats0),
HistoryStats =
case History of
[{[_ | _] = HProps} | _] -> couch_replicator_stats:new(HProps);
_ -> couch_replicator_stats:new()
end,
Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats),
StartSeq1 = get_value(since_seq, Options, StartSeq0),
StartSeq = {0, StartSeq1},
SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
#doc{body = {CheckpointHistory}} = SourceLog,
State = #rep_state{
rep_details = Rep,
source_name = couch_replicator_api_wrap:db_uri(Source),
target_name = couch_replicator_api_wrap:db_uri(Target),
source = Source,
target = Target,
history = History,
checkpoint_history = {[{<<"no_changes">>, true} | CheckpointHistory]},
start_seq = StartSeq,
current_through_seq = StartSeq,
committed_seq = StartSeq,
source_log = SourceLog,
target_log = TargetLog,
rep_starttime = StartTime,
src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
session_id = couch_uuids:random(),
source_seq = SourceSeq,
use_checkpoints = get_value(use_checkpoints, Options, true),
checkpoint_interval = get_value(
checkpoint_interval,
Options,
?DEFAULT_CHECKPOINT_INTERVAL
),
type = Type,
view = View,
stats = Stats
},
State#rep_state{timer = start_timer(State)}.
find_and_migrate_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
lists:reverse(Acc);
fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
{error, <<"not_found">>} when Vsn > 1 ->
OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
fold_replication_logs(
Dbs,
Vsn - 1,
?l2b(?LOCAL_DOC_PREFIX ++ OldRepId),
NewId,
Rep,
Acc
);
{error, <<"not_found">>} ->
fold_replication_logs(
Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]
);
{ok, Doc} when LogId =:= NewId ->
fold_replication_logs(
Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]
);
{ok, Doc} ->
MigratedLog = #doc{id = NewId, body = Doc#doc.body},
maybe_save_migrated_log(Rep, Db, MigratedLog, Doc#doc.id),
fold_replication_logs(
Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc]
)
end.
maybe_save_migrated_log(Rep, Db, #doc{} = Doc, OldId) ->
case get_value(use_checkpoints, Rep#rep.options, true) of
true ->
update_checkpoint(Db, Doc),
Msg = "Migrated replication checkpoint. Db:~p ~p -> ~p",
couch_log:notice(Msg, [httpdb_strip_creds(Db), OldId, Doc#doc.id]);
false ->
ok
end.
spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
spawn_link(fun() ->
changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
end).
changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
receive
{get_changes, From} ->
case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
closed ->
From ! {closed, self()};
{ok, ChangesOrLastSeqs} ->
ReportSeq =
case lists:last(ChangesOrLastSeqs) of
{last_seq, Seq} ->
{Ts, Seq};
#doc_info{high_seq = Seq} ->
{Ts, Seq}
end,
Changes = lists:filter(
fun
(#doc_info{}) ->
true;
({last_seq, _Seq}) ->
false
end,
ChangesOrLastSeqs
),
ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
From ! {changes, self(), Changes, ReportSeq}
end,
changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1)
end.
do_checkpoint(#rep_state{use_checkpoints = false} = State) ->
NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]}},
{ok, NewState};
do_checkpoint(#rep_state{current_through_seq = Seq, committed_seq = Seq} = State) ->
update_task(State),
{ok, State};
do_checkpoint(State) ->
#rep_state{
source_name = SourceName,
target_name = TargetName,
source = Source,
target = Target,
history = OldHistory,
start_seq = {_, StartSeq},
current_through_seq = {_Ts, NewSeq} = NewTsSeq,
source_log = SourceLog,
target_log = TargetLog,
rep_starttime = ReplicationStartTime,
src_starttime = SrcInstanceStartTime,
tgt_starttime = TgtInstanceStartTime,
stats = Stats,
rep_details = #rep{options = Options},
session_id = SessionId
} = State,
case commit_to_both(Source, Target) of
{source_error, Reason} ->
{checkpoint_commit_failure,
<<"Failure on source commit: ", (to_binary(Reason))/binary>>};
{target_error, Reason} ->
{checkpoint_commit_failure,
<<"Failure on target commit: ", (to_binary(Reason))/binary>>};
{<<S/binary>>, <<T/binary>>} when
% Handle upgrades from 3.2 to 3.3 better by expecting 0 to be
% returned if endpoints are upgraded while the replication job is running.
% TODO: Remove the `0` special case in a future release (4.x or 5.x)
(S =:= SrcInstanceStartTime orelse S =:= <<"0">> orelse
SrcInstanceStartTime =:= <<"0">>) andalso
(T =:= TgtInstanceStartTime orelse T =:= <<"0">> orelse
TgtInstanceStartTime =:= <<"0">>)
->
couch_log:notice(
"recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
[SourceName, TargetName, NewSeq]
),
LocalStartTime = calendar:now_to_local_time(ReplicationStartTime),
StartTime = ?l2b(httpd_util:rfc1123_date(LocalStartTime)),
EndTime = ?l2b(httpd_util:rfc1123_date()),
NewHistoryEntry =
{[
{<<"session_id">>, SessionId},
{<<"start_time">>, StartTime},
{<<"end_time">>, EndTime},
{<<"start_last_seq">>, StartSeq},
{<<"end_last_seq">>, NewSeq},
{<<"recorded_seq">>, NewSeq},
{<<"missing_checked">>, couch_replicator_stats:missing_checked(Stats)},
{<<"missing_found">>, couch_replicator_stats:missing_found(Stats)},
{<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
{<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
{<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)},
{<<"bulk_get_docs">>, couch_replicator_stats:bulk_get_docs(Stats)},
{<<"bulk_get_attempts">>, couch_replicator_stats:bulk_get_attempts(Stats)}
]},
BaseHistory =
[
{<<"session_id">>, SessionId},
{<<"source_last_seq">>, NewSeq},
{<<"replication_id_version">>, ?REP_ID_VERSION}
] ++
case get_value(doc_ids, Options) of
undefined ->
[];
_DocIds ->
% backwards compatibility with the result of a replication by
% doc IDs in versions 0.11.x and 1.0.x
% TODO: deprecate (use same history format, simplify code)
[
{<<"start_time">>, StartTime},
{<<"end_time">>, EndTime},
{<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
{<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
{<<"doc_write_failures">>,
couch_replicator_stats:doc_write_failures(Stats)}
]
end,
% limit history to 50 entries
NewRepHistory = {
BaseHistory ++
[{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
},
try
{SrcRevPos, SrcRevId} = update_checkpoint(
Source, SourceLog#doc{body = NewRepHistory}, source
),
{TgtRevPos, TgtRevId} = update_checkpoint(
Target, TargetLog#doc{body = NewRepHistory}, target
),
NewState = State#rep_state{
checkpoint_history = NewRepHistory,
committed_seq = NewTsSeq,
source_log = SourceLog#doc{revs = {SrcRevPos, [SrcRevId]}},
target_log = TargetLog#doc{revs = {TgtRevPos, [TgtRevId]}}
},
update_task(NewState),
{ok, NewState}
catch
throw:{checkpoint_commit_failure, _} = Failure ->
Failure
end;
{SrcInstanceStartTime, _NewTgtInstanceStartTime} ->
{checkpoint_commit_failure, <<
"instance_start_time on target database has changed since last checkpoint."
>>};
{_NewSrcInstanceStartTime, TgtInstanceStartTime} ->
{checkpoint_commit_failure, <<
"instance_start_time on source database has changed since last checkpoint."
>>};
{_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} ->
{checkpoint_commit_failure, <<
"instance_start_time on source and target database has changed since last checkpoint."
>>}
end.
update_checkpoint(Db, Doc, DbType) ->
try
update_checkpoint(Db, Doc)
catch
throw:{checkpoint_commit_failure, Reason} ->
throw(
{checkpoint_commit_failure,
<<"Error updating the ", (to_binary(DbType))/binary, " checkpoint document: ",
(to_binary(Reason))/binary>>}
)
end.
update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
try
case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
{ok, PosRevId} ->
PosRevId;
{error, Reason} ->
throw({checkpoint_commit_failure, Reason})
end
catch
throw:conflict ->
case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of
{ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
% This means that we were able to update successfully the
% checkpoint doc in a previous attempt but we got a connection
% error (timeout for e.g.) before receiving the success response.
% Therefore the request was retried and we got a conflict, as the
% revision we sent is not the current one.
% We confirm this by verifying the doc body we just got is the same
% that we have just sent.
{Pos, RevId};
_ ->
throw({checkpoint_commit_failure, conflict})
end
end.
commit_to_both(Source, Target) ->
% commit the src async
ParentPid = self(),
SrcCommitPid = spawn_link(
fun() ->
Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)),
ParentPid ! {self(), Result}
end
),
% commit tgt sync
TargetResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)),
SourceResult =
receive
{SrcCommitPid, Result} ->
unlink(SrcCommitPid),
receive
{'EXIT', SrcCommitPid, _} -> ok
after 0 -> ok
end,
Result;
{'EXIT', SrcCommitPid, Reason} ->
{error, Reason}
end,
case TargetResult of
{ok, TargetStartTime} ->
case SourceResult of
{ok, SourceStartTime} ->
{SourceStartTime, TargetStartTime};
SourceError ->
{source_error, SourceError}
end;
TargetError ->
{target_error, TargetError}
end.
compare_replication_logs(SrcDoc, TgtDoc) ->
#doc{body = {RepRecProps}} = SrcDoc,
#doc{body = {RepRecPropsTgt}} = TgtDoc,
case
get_value(<<"session_id">>, RepRecProps) ==
get_value(<<"session_id">>, RepRecPropsTgt)
of
true ->
% if the records have the same session id,
% then we have a valid replication history
OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ),
OldHistory = get_value(<<"history">>, RepRecProps, []),
{OldSeqNum, OldHistory};
false ->
SourceHistory = get_value(<<"history">>, RepRecProps, []),
TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
couch_log:notice(
"Replication records differ. "
"Scanning histories to find a common ancestor.",
[]
),
couch_log:debug(
"Record on source:~p~nRecord on target:~p~n",
[RepRecProps, RepRecPropsTgt]
),
compare_rep_history(SourceHistory, TargetHistory)
end.
compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
couch_log:notice("no common ancestry -- performing full replication", []),
{?LOWEST_SEQ, []};
compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
SourceId = get_value(<<"session_id">>, S),
case has_session_id(SourceId, Target) of
true ->
RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ),
couch_log:notice(
"found a common replication record with source_seq ~p",
[RecordSeqNum]
),
{RecordSeqNum, SourceRest};
false ->
TargetId = get_value(<<"session_id">>, T),
case has_session_id(TargetId, SourceRest) of
true ->
RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ),
couch_log:notice(
"found a common replication record with source_seq ~p",
[RecordSeqNum]
),
{RecordSeqNum, TargetRest};
false ->
compare_rep_history(SourceRest, TargetRest)
end
end.
has_session_id(_SessionId, []) ->
false;
has_session_id(SessionId, [{Props} | Rest]) ->
case get_value(<<"session_id">>, Props, nil) of
SessionId ->
true;
_Else ->
has_session_id(SessionId, Rest)
end.
get_pending_count(St) ->
Rep = St#rep_state.rep_details,
Timeout = get_value(connection_timeout, Rep#rep.options),
TimeoutMicro = Timeout * 1000,
case get(pending_count_state) of
{LastUpdate, PendingCount} ->
case timer:now_diff(os:timestamp(), LastUpdate) > TimeoutMicro of
true ->
NewPendingCount = get_pending_count_int(St),
put(pending_count_state, {os:timestamp(), NewPendingCount}),
NewPendingCount;
false ->
PendingCount
end;
undefined ->
NewPendingCount = get_pending_count_int(St),
put(pending_count_state, {os:timestamp(), NewPendingCount}),
NewPendingCount
end.
get_pending_count_int(#rep_state{source = #httpdb{} = Db0} = St) ->
{_, Seq} = St#rep_state.highest_seq_done,
Db = Db0#httpdb{retries = 3},
case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of
{ok, Pending} ->
Pending;
_ ->
null
end;
get_pending_count_int(#rep_state{source = Db} = St) ->
{_, Seq} = St#rep_state.highest_seq_done,
{ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq),
Pending.
seq_encode(Seq) ->
couch_replicator_utils:seq_encode(Seq).
update_task(State) ->
#rep_state{
rep_details = #rep{id = JobId},
current_through_seq = {_, ThroughSeq},
highest_seq_done = {_, HighestSeq}
} = State,
Status =
rep_stats(State) ++
[
{source_seq, seq_encode(HighestSeq)},
{through_seq, seq_encode(ThroughSeq)}
],
couch_replicator_scheduler:update_job_stats(JobId, Status),
couch_task_status:update(Status).
rep_stats(State) ->
#rep_state{
committed_seq = {_, CommittedSeq},
stats = Stats
} = State,
[
{revisions_checked, couch_replicator_stats:missing_checked(Stats)},
{missing_revisions_found, couch_replicator_stats:missing_found(Stats)},
{docs_read, couch_replicator_stats:docs_read(Stats)},
{docs_written, couch_replicator_stats:docs_written(Stats)},
{changes_pending, get_pending_count(State)},
{doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)},
{bulk_get_docs, couch_replicator_stats:bulk_get_docs(Stats)},
{bulk_get_attempts, couch_replicator_stats:bulk_get_attempts(Stats)},
{checkpointed_source_seq, seq_encode(CommittedSeq)}
].
replication_start_error({unauthorized, DbUri}) ->
{unauthorized, <<"unauthorized to access or create database ", DbUri/binary>>};
replication_start_error({db_not_found, DbUri}) ->
{db_not_found, <<"could not open ", DbUri/binary>>};
replication_start_error(
{http_request_failed, _Method, Url0, {error, {error, {conn_failed, {error, nxdomain}}}}}
) ->
Url = ?l2b(couch_util:url_strip_password(Url0)),
{nxdomain, <<"could not resolve ", Url/binary>>};
replication_start_error({http_request_failed, Method0, Url0, {error, {code, Code}}}) when
is_integer(Code)
->
Url = ?l2b(couch_util:url_strip_password(Url0)),
Method = ?l2b(Method0),
{http_error_code, Code, <<Method/binary, " ", Url/binary>>};
replication_start_error(Error) ->
Error.
log_replication_start(#rep_state{rep_details = Rep} = RepState) ->
#rep{
id = {BaseId, Ext},
doc_id = DocId,
db_name = DbName,
options = Options
} = Rep,
Id = BaseId ++ Ext,
Workers = get_value(worker_processes, Options),
BatchSize = get_value(worker_batch_size, Options),
#rep_state{
% credentials already stripped
source_name = Source,
% credentials already stripped
target_name = Target,
session_id = Sid
} = RepState,
From =
case DbName of
ShardName when is_binary(ShardName) ->
io_lib:format("from doc ~s:~s", [mem3:dbname(ShardName), DocId]);
_ ->
"from _replicate endpoint"
end,
Msg =
"Starting replication ~s (~s -> ~s) ~s worker_procesess:~p"
" worker_batch_size:~p session_id:~s",
couch_log:notice(Msg, [Id, Source, Target, From, Workers, BatchSize, Sid]).
verify_ssl_certificates_log(#rep{} = Rep) ->
case config:get_boolean("replicator", "verify_ssl_certificates_log", false) of
true ->
spawn_link(couch_replicator_utils, verify_ssl_certificates_log, [Rep]);
false ->
undefined
end.
-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").
replication_start_error_test() ->
?assertEqual(
{unauthorized, <<
"unauthorized to access or create database"
" http://x/y"
>>},
replication_start_error({unauthorized, <<"http://x/y">>})
),
?assertEqual(
{db_not_found, <<"could not open http://x/y">>},
replication_start_error({db_not_found, <<"http://x/y">>})
),
?assertEqual(
{nxdomain, <<"could not resolve http://x/y">>},
replication_start_error(
{http_request_failed, "GET", "http://x/y",
{error, {error, {conn_failed, {error, nxdomain}}}}}
)
),
?assertEqual(
{http_error_code, 503, <<"GET http://x/y">>},
replication_start_error({http_request_failed, "GET", "http://x/y", {error, {code, 503}}})
).
format_status_test_() ->
{
foreach,
fun meck_config/0,
fun(_) -> meck:unload() end,
[
?TDEF_FE(t_scheduler_job_format_status)
]
}.
meck_config() ->
test_util:mock(config).
t_scheduler_job_format_status(_) ->
Source = <<"http://u:p@h1/d1">>,
Target = <<"http://u:p@h2/d2">>,
Rep = #rep{
id = {"base", "+ext"},
source = couch_replicator_parse:parse_rep_db(Source, [], []),
target = couch_replicator_parse:parse_rep_db(Target, [], []),
options = [{create_target, true}],
doc_id = <<"mydoc">>,
db_name = <<"mydb">>
},
State = #rep_state{
rep_details = Rep,
source = Rep#rep.source,
target = Rep#rep.target,
session_id = <<"a">>,
start_seq = <<"1">>,
source_seq = <<"2">>,
committed_seq = <<"3">>,
current_through_seq = <<"4">>,
highest_seq_done = <<"5">>
},
Format = format_status(opts_ignored, [pdict, State]),
?assertEqual("http://h1/d1/", proplists:get_value(source, Format)),
?assertEqual("http://h2/d2/", proplists:get_value(target, Format)),
?assertEqual({"base", "+ext"}, proplists:get_value(rep_id, Format)),
?assertEqual([{create_target, true}], proplists:get_value(options, Format)),
?assertEqual(<<"mydoc">>, proplists:get_value(doc_id, Format)),
?assertEqual(<<"mydb">>, proplists:get_value(db_name, Format)),
?assertEqual(<<"a">>, proplists:get_value(session_id, Format)),
?assertEqual(<<"1">>, proplists:get_value(start_seq, Format)),
?assertEqual(<<"2">>, proplists:get_value(source_seq, Format)),
?assertEqual(<<"3">>, proplists:get_value(committed_seq, Format)),
?assertEqual(<<"4">>, proplists:get_value(current_through_seq, Format)),
?assertEqual(<<"5">>, proplists:get_value(highest_seq_done, Format)).
-endif.