blob: ee91b5dc1123301bcc421e355f414305f2a2bab8 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_replicator).
-behaviour(gen_server).
-vsn(1).
% public API
-export([replicate/2]).
% meant to be used only by the replicator database listener
-export([async_replicate/1]).
-export([cancel_replication/1]).
% gen_server callbacks
-export([init/1, terminate/2, code_change/3]).
-export([handle_call/3, handle_cast/2, handle_info/2]).
-export([format_status/2]).
-export([details/1]).
-include_lib("couch/include/couch_db.hrl").
-include("couch_replicator_api_wrap.hrl").
-include("couch_replicator.hrl").
-define(LOWEST_SEQ, 0).
-define(DEFAULT_CHECKPOINT_INTERVAL, 30000).
-import(couch_util, [
get_value/2,
get_value/3,
to_binary/1
]).
-import(couch_replicator_utils, [
start_db_compaction_notifier/2,
stop_db_compaction_notifier/1
]).
-record(rep_state, {
rep_details,
source_name,
target_name,
source,
target,
history,
checkpoint_history,
start_seq,
committed_seq,
current_through_seq,
seqs_in_progress = [],
highest_seq_done = {0, ?LOWEST_SEQ},
source_log,
target_log,
rep_starttime,
src_starttime,
tgt_starttime,
timer, % checkpoint timer
changes_queue,
changes_manager,
changes_reader,
workers,
stats = couch_replicator_stats:new(),
session_id,
source_db_compaction_notifier = nil,
target_db_compaction_notifier = nil,
source_monitor = nil,
target_monitor = nil,
source_seq = nil,
use_checkpoints = true,
checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
type = db,
view = nil
}).
replicate(PostBody, Ctx) ->
{ok, #rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep} =
couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
case get_value(cancel, Options, false) of
true ->
case get_value(id, Options, nil) of
nil ->
cancel_replication(RepId);
RepId2 ->
cancel_replication(RepId2, UserCtx)
end;
false ->
{ok, Listener} = rep_result_listener(RepId),
Result = do_replication_loop(Rep),
couch_replicator_notifier:stop(Listener),
Result
end.
do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
case async_replicate(Rep) of
{ok, _Pid} ->
case get_value(continuous, Options, false) of
true ->
{ok, {continuous, ?l2b(BaseId ++ Ext)}};
false ->
wait_for_result(Id)
end;
Error ->
Error
end.
async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
RepChildId = BaseId ++ Ext,
Source = couch_replicator_api_wrap:db_uri(Src),
Target = couch_replicator_api_wrap:db_uri(Tgt),
Timeout = get_value(connection_timeout, Rep#rep.options),
ChildSpec = {
RepChildId,
{gen_server, start_link, [?MODULE, Rep, [{timeout, Timeout}]]},
temporary,
250,
worker,
[?MODULE]
},
% All these nested cases to attempt starting/restarting a replication child
% are ugly and not 100% race condition free. The following patch submission
% is a solution:
%
% http://erlang.2086793.n4.nabble.com/PATCH-supervisor-atomically-delete-child-spec-when-child-terminates-td3226098.html
%
case supervisor:start_child(couch_replicator_job_sup, ChildSpec) of
{ok, Pid} ->
couch_log:notice("starting new replication `~s` at ~p (`~s` -> `~s`)",
[RepChildId, Pid, Source, Target]),
{ok, Pid};
{error, already_present} ->
case supervisor:restart_child(couch_replicator_job_sup, RepChildId) of
{ok, Pid} ->
couch_log:notice("restarting replication `~s` at ~p (`~s` -> `~s`)",
[RepChildId, Pid, Source, Target]),
{ok, Pid};
{error, running} ->
%% this error occurs if multiple replicators are racing
%% each other to start and somebody else won. Just grab
%% the Pid by calling start_child again.
timer:sleep(50 + random:uniform(100)),
async_replicate(Rep);
{error, {'EXIT', {badarg,
[{erlang, apply, [gen_server, start_link, undefined]} | _]}}} ->
% Clause to deal with a change in the supervisor module introduced
% in R14B02. For more details consult the thread at:
% http://erlang.org/pipermail/erlang-bugs/2011-March/002273.html
_ = supervisor:delete_child(couch_replicator_job_sup, RepChildId),
async_replicate(Rep);
{error, _} = Error ->
Error
end;
{error, {already_started, Pid}} ->
couch_log:notice("replication `~s` already running at ~p (`~s` -> `~s`)",
[RepChildId, Pid, Source, Target]),
{ok, Pid};
{error, {Error, _}} ->
{error, Error}
end.
rep_result_listener(RepId) ->
ReplyTo = self(),
{ok, _Listener} = couch_replicator_notifier:start_link(
fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
ReplyTo ! Ev;
(_) ->
ok
end).
wait_for_result(RepId) ->
receive
{finished, RepId, RepResult} ->
{ok, RepResult};
{error, RepId, Reason} ->
{error, Reason}
end.
cancel_replication({BaseId, Extension}) ->
FullRepId = BaseId ++ Extension,
couch_log:notice("Canceling replication `~s`...", [FullRepId]),
case supervisor:terminate_child(couch_replicator_job_sup, FullRepId) of
ok ->
couch_log:notice("Replication `~s` canceled.", [FullRepId]),
case supervisor:delete_child(couch_replicator_job_sup, FullRepId) of
ok ->
{ok, {cancelled, ?l2b(FullRepId)}};
{error, not_found} ->
{ok, {cancelled, ?l2b(FullRepId)}};
Error ->
Error
end;
Error ->
couch_log:error("Error canceling replication `~s`: ~p", [FullRepId, Error]),
Error
end.
cancel_replication(RepId, #user_ctx{name = Name, roles = Roles}) ->
case lists:member(<<"_admin">>, Roles) of
true ->
cancel_replication(RepId);
false ->
case find_replicator(RepId) of
{ok, Pid} ->
case details(Pid) of
{ok, #rep{user_ctx = #user_ctx{name = Name}}} ->
cancel_replication(RepId);
{ok, _} ->
throw({unauthorized,
<<"Can't cancel a replication triggered by another user">>});
Error ->
Error
end;
Error ->
Error
end
end.
find_replicator({BaseId, Ext} = _RepId) ->
case lists:keysearch(
BaseId ++ Ext, 1, supervisor:which_children(couch_replicator_job_sup)) of
{value, {_, Pid, _, _}} when is_pid(Pid) ->
{ok, Pid};
_ ->
{error, not_found}
end.
details(Pid) ->
case (catch gen_server:call(Pid, get_details)) of
{ok, Rep} ->
{ok, Rep};
{'EXIT', {noproc, {gen_server, call, _}}} ->
{error, not_found};
Error ->
throw(Error)
end.
init(InitArgs) ->
{ok, InitArgs, 0}.
do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
process_flag(trap_exit, true),
random:seed(os:timestamp()),
#rep_state{
source = Source,
target = Target,
source_name = SourceName,
target_name = TargetName,
start_seq = {_Ts, StartSeq},
committed_seq = {_, CommittedSeq},
highest_seq_done = {_, HighestSeq},
checkpoint_interval = CheckpointInterval
} = State = init_state(Rep),
NumWorkers = get_value(worker_processes, Options),
BatchSize = get_value(worker_batch_size, Options),
{ok, ChangesQueue} = couch_work_queue:new([
{max_items, BatchSize * NumWorkers * 2},
{max_size, 100 * 1024 * NumWorkers}
]),
% This starts the _changes reader process. It adds the changes from
% the source db to the ChangesQueue.
{ok, ChangesReader} = couch_replicator_changes_reader:start_link(
StartSeq, Source, ChangesQueue, Options
),
% Changes manager - responsible for dequeing batches from the changes queue
% and deliver them to the worker processes.
ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
% This starts the worker processes. They ask the changes queue manager for a
% a batch of _changes rows to process -> check which revs are missing in the
% target, and for the missing ones, it copies them from the source to the target.
MaxConns = get_value(http_connections, Options),
Workers = lists:map(
fun(_) ->
couch_stats:increment_counter([couch_replicator, workers_started]),
{ok, Pid} = couch_replicator_worker:start_link(
self(), Source, Target, ChangesManager, MaxConns),
Pid
end,
lists:seq(1, NumWorkers)),
couch_task_status:add_task([
{type, replication},
{user, UserCtx#user_ctx.name},
{replication_id, ?l2b(BaseId ++ Ext)},
{database, Rep#rep.db_name},
{doc_id, Rep#rep.doc_id},
{source, ?l2b(SourceName)},
{target, ?l2b(TargetName)},
{continuous, get_value(continuous, Options, false)},
{revisions_checked, 0},
{missing_revisions_found, 0},
{docs_read, 0},
{docs_written, 0},
{changes_pending, get_pending_count(State)},
{doc_write_failures, 0},
{source_seq, HighestSeq},
{checkpointed_source_seq, CommittedSeq},
{checkpoint_interval, CheckpointInterval}
]),
couch_task_status:set_update_frequency(1000),
% Until OTP R14B03:
%
% Restarting a temporary supervised child implies that the original arguments
% (#rep{} record) specified in the MFA component of the supervisor
% child spec will always be used whenever the child is restarted.
% This implies the same replication performance tunning parameters will
% always be used. The solution is to delete the child spec (see
% cancel_replication/1) and then start the replication again, but this is
% unfortunately not immune to race conditions.
couch_log:notice("Replication `~p` is using:~n"
"~c~p worker processes~n"
"~ca worker batch size of ~p~n"
"~c~p HTTP connections~n"
"~ca connection timeout of ~p milliseconds~n"
"~c~p retries per request~n"
"~csocket options are: ~s~s",
[BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t,
MaxConns, $\t, get_value(connection_timeout, Options),
$\t, get_value(retries, Options),
$\t, io_lib:format("~p", [get_value(socket_options, Options)]),
case StartSeq of
?LOWEST_SEQ ->
"";
_ ->
io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq])
end]),
couch_log:debug("Worker pids are: ~p", [Workers]),
couch_replicator_manager:replication_started(Rep),
{ok, State#rep_state{
changes_queue = ChangesQueue,
changes_manager = ChangesManager,
changes_reader = ChangesReader,
workers = Workers
}
}.
adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
couch_log:notice(Msg, [RepId]),
Src#httpdb{http_connections = 2};
adjust_maxconn(Src, _RepId) ->
Src.
handle_info(shutdown, St) ->
{stop, shutdown, St};
handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
couch_log:error("Source database is down. Reason: ~p", [Why]),
{stop, source_db_down, St};
handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
couch_log:error("Target database is down. Reason: ~p", [Why]),
{stop, target_db_down, St};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
{noreply, State};
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
{stop, changes_reader_died, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
{noreply, State};
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
{stop, changes_manager_died, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
{noreply, State};
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
{stop, changes_queue_died, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
case Workers -- [Pid] of
Workers ->
couch_log:error("unknown pid bit the dust ~p ~n",[Pid]),
{noreply, State#rep_state{workers = Workers}};
%% not clear why a stop was here before
%%{stop, {unknown_process_died, Pid, normal}, State};
[] ->
catch unlink(State#rep_state.changes_manager),
catch exit(State#rep_state.changes_manager, kill),
do_last_checkpoint(State);
Workers2 ->
{noreply, State#rep_state{workers = Workers2}}
end;
handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
State2 = cancel_timer(State),
case lists:member(Pid, Workers) of
false ->
{stop, {unknown_process_died, Pid, Reason}, State2};
true ->
couch_stats:increment_counter([couch_replicator, worker_deaths]),
couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
{stop, {worker_died, Pid, Reason}, State2}
end;
handle_info(timeout, InitArgs) ->
try do_init(InitArgs) of {ok, State} ->
{noreply, State}
catch Class:Error ->
Stack = erlang:get_stacktrace(),
{stop, shutdown, {error, Class, Error, Stack, InitArgs}}
end.
handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
{reply, {ok, Rep}, State};
handle_call({add_stats, Stats}, From, State) ->
gen_server:reply(From, ok),
NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
{noreply, State#rep_state{stats = NewStats}};
handle_call({report_seq_done, Seq, StatsInc}, From,
#rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
current_through_seq = ThroughSeq, stats = Stats} = State) ->
gen_server:reply(From, ok),
{NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
[] ->
{Seq, []};
[Seq | Rest] ->
{Seq, Rest};
[_ | _] ->
{ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
end,
NewHighestDone = lists:max([HighestDone, Seq]),
NewThroughSeq = case NewSeqsInProgress of
[] ->
lists:max([NewThroughSeq0, NewHighestDone]);
_ ->
NewThroughSeq0
end,
couch_log:debug("Worker reported seq ~p, through seq was ~p, "
"new through seq is ~p, highest seq done was ~p, "
"new highest seq done is ~p~n"
"Seqs in progress were: ~p~nSeqs in progress are now: ~p",
[Seq, ThroughSeq, NewThroughSeq, HighestDone,
NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
NewState = State#rep_state{
stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
current_through_seq = NewThroughSeq,
seqs_in_progress = NewSeqsInProgress,
highest_seq_done = NewHighestDone
},
update_task(NewState),
{noreply, NewState}.
handle_cast({db_compacted, DbName},
#rep_state{source = #db{name = DbName} = Source} = State) ->
{ok, NewSource} = couch_db:reopen(Source),
{noreply, State#rep_state{source = NewSource}};
handle_cast({db_compacted, DbName},
#rep_state{target = #db{name = DbName} = Target} = State) ->
{ok, NewTarget} = couch_db:reopen(Target),
{noreply, State#rep_state{target = NewTarget}};
handle_cast(checkpoint, State) ->
#rep_state{rep_details = #rep{} = Rep} = State,
case couch_replicator_manager:continue(Rep) of
{true, _} ->
case do_checkpoint(State) of
{ok, NewState} ->
couch_stats:increment_counter([couch_replicator, checkpoints, success]),
{noreply, NewState#rep_state{timer = start_timer(State)}};
Error ->
couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
{stop, Error, State}
end;
{false, Owner} ->
couch_replicator_manager:replication_usurped(Rep, Owner),
{stop, shutdown, State}
end;
handle_cast({report_seq, Seq},
#rep_state{seqs_in_progress = SeqsInProgress} = State) ->
NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
{noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
code_change(_OldVsn, #rep_state{}=State, _Extra) ->
{ok, State}.
headers_strip_creds([], Acc) ->
lists:reverse(Acc);
headers_strip_creds([{Key, Value0} | Rest], Acc) ->
Value = case string:to_lower(Key) of
"authorization" ->
"****";
_ ->
Value0
end,
headers_strip_creds(Rest, [{Key, Value} | Acc]).
httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
HttpDb#httpdb{
url = couch_util:url_strip_password(Url),
headers = headers_strip_creds(Headers, [])
};
httpdb_strip_creds(LocalDb) ->
LocalDb.
rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
Rep#rep{
source = httpdb_strip_creds(Source),
target = httpdb_strip_creds(Target)
}.
state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) ->
% #rep_state contains the source and target at the top level and also
% in the nested #rep_details record
State#rep_state{
rep_details = rep_strip_creds(Rep),
source = httpdb_strip_creds(Source),
target = httpdb_strip_creds(Target)
}.
terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
checkpoint_history = CheckpointHistory} = State) ->
terminate_cleanup(State),
couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
couch_replicator_manager:replication_completed(Rep, rep_stats(State));
terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
% cancelled replication throught ?MODULE:cancel_replication/1
couch_replicator_notifier:notify({error, RepId, <<"cancelled">>}),
terminate_cleanup(State);
terminate(shutdown, {error, Class, Error, Stack, InitArgs}) ->
#rep{id=RepId} = InitArgs,
couch_stats:increment_counter([couch_replicator, failed_starts]),
CleanInitArgs = rep_strip_creds(InitArgs),
couch_log:error("~p:~p: Replication failed to start for args ~p: ~p",
[Class, Error, CleanInitArgs, Stack]),
case Error of
{unauthorized, DbUri} ->
NotifyError = {unauthorized, <<"unauthorized to access or create database ", DbUri/binary>>};
{db_not_found, DbUri} ->
NotifyError = {db_not_found, <<"could not open ", DbUri/binary>>};
_ ->
NotifyError = Error
end,
couch_replicator_notifier:notify({error, RepId, NotifyError}),
couch_replicator_manager:replication_error(InitArgs, NotifyError);
terminate(Reason, State) ->
#rep_state{
source_name = Source,
target_name = Target,
rep_details = #rep{id = {BaseId, Ext} = RepId} = Rep
} = State,
couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s",
[BaseId ++ Ext, Source, Target, to_binary(Reason)]),
terminate_cleanup(State),
couch_replicator_notifier:notify({error, RepId, Reason}),
couch_replicator_manager:replication_error(Rep, Reason).
terminate_cleanup(State) ->
update_task(State),
stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
couch_replicator_api_wrap:db_close(State#rep_state.source),
couch_replicator_api_wrap:db_close(State#rep_state.target).
format_status(_Opt, [_PDict, State]) ->
[{data, [{"State", state_strip_creds(State)}]}].
do_last_checkpoint(#rep_state{seqs_in_progress = [],
highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
{stop, normal, cancel_timer(State)};
do_last_checkpoint(#rep_state{seqs_in_progress = [],
highest_seq_done = Seq} = State) ->
case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
{ok, NewState} ->
couch_stats:increment_counter([couch_replicator, checkpoints, success]),
{stop, normal, cancel_timer(NewState)};
Error ->
couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
{stop, Error, State}
end.
start_timer(State) ->
After = State#rep_state.checkpoint_interval,
case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
{ok, Ref} ->
Ref;
Error ->
couch_log:error("Replicator, error scheduling checkpoint: ~p", [Error]),
nil
end.
cancel_timer(#rep_state{timer = nil} = State) ->
State;
cancel_timer(#rep_state{timer = Timer} = State) ->
{ok, cancel} = timer:cancel(Timer),
State#rep_state{timer = nil}.
init_state(Rep) ->
#rep{
id = {BaseId, _Ext},
source = Src0, target = Tgt0,
options = Options, user_ctx = UserCtx,
type = Type, view = View
} = Rep,
% note if fabric should be used
Src1 = maybe_fabric(Rep, Src0),
Tgt = maybe_fabric(Rep, Tgt0),
% Adjust minimum number of http source connections to 2 to avoid deadlock
Src = adjust_maxconn(Src1, BaseId),
{ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
{ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
get_value(create_target, Options, false)),
{ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
{ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
[SourceLog, TargetLog] = find_replication_logs([Source, Target], Rep),
{StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
StartSeq1 = get_value(since_seq, Options, StartSeq0),
StartSeq = {0, StartSeq1},
SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
#doc{body={CheckpointHistory}} = SourceLog,
State = #rep_state{
rep_details = Rep,
source_name = couch_replicator_api_wrap:db_uri(Source),
target_name = couch_replicator_api_wrap:db_uri(Target),
source = Source,
target = Target,
history = History,
checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]},
start_seq = StartSeq,
current_through_seq = StartSeq,
committed_seq = StartSeq,
source_log = SourceLog,
target_log = TargetLog,
rep_starttime = httpd_util:rfc1123_date(),
src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
session_id = couch_uuids:random(),
source_db_compaction_notifier =
start_db_compaction_notifier(Source, self()),
target_db_compaction_notifier =
start_db_compaction_notifier(Target, self()),
source_monitor = db_monitor(Source),
target_monitor = db_monitor(Target),
source_seq = SourceSeq,
use_checkpoints = get_value(use_checkpoints, Options, true),
checkpoint_interval = get_value(checkpoint_interval, Options,
?DEFAULT_CHECKPOINT_INTERVAL),
type = Type,
view = View
},
State#rep_state{timer = start_timer(State)}.
%% annotate "local" dbname with fabric tuple if
%% from a clustered database.
maybe_fabric(#rep{}, #httpdb{} = HttpDb) ->
HttpDb;
maybe_fabric(#rep{db_name = <<"shards/", _/binary>>}, DbName) ->
{fabric, DbName};
maybe_fabric(#rep{}, DbName) ->
DbName.
find_replication_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
lists:reverse(Acc);
fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
{error, <<"not_found">>} when Vsn > 1 ->
OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
fold_replication_logs(Dbs, Vsn - 1,
?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
{error, <<"not_found">>} ->
fold_replication_logs(
Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]);
{ok, Doc} when LogId =:= NewId ->
fold_replication_logs(
Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
{ok, Doc} ->
MigratedLog = #doc{id = NewId, body = Doc#doc.body},
fold_replication_logs(
Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc])
end.
spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
spawn_link(fun() ->
changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
end).
changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
receive
{get_changes, From} ->
case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
closed ->
From ! {closed, self()};
{ok, Changes} ->
#doc_info{high_seq = Seq} = lists:last(Changes),
ReportSeq = {Ts, Seq},
ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
From ! {changes, self(), Changes, ReportSeq}
end,
changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1)
end.
do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} },
{ok, NewState};
do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
update_task(State),
{ok, State};
do_checkpoint(State) ->
#rep_state{
source_name=SourceName,
target_name=TargetName,
source = Source,
target = Target,
history = OldHistory,
start_seq = {_, StartSeq},
current_through_seq = {_Ts, NewSeq} = NewTsSeq,
source_log = SourceLog,
target_log = TargetLog,
rep_starttime = ReplicationStartTime,
src_starttime = SrcInstanceStartTime,
tgt_starttime = TgtInstanceStartTime,
stats = Stats,
rep_details = #rep{options = Options},
session_id = SessionId
} = State,
case commit_to_both(Source, Target) of
{source_error, Reason} ->
{checkpoint_commit_failure,
<<"Failure on source commit: ", (to_binary(Reason))/binary>>};
{target_error, Reason} ->
{checkpoint_commit_failure,
<<"Failure on target commit: ", (to_binary(Reason))/binary>>};
{SrcInstanceStartTime, TgtInstanceStartTime} ->
couch_log:notice("recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
[SourceName, TargetName, NewSeq]),
StartTime = ?l2b(ReplicationStartTime),
EndTime = ?l2b(httpd_util:rfc1123_date()),
NewHistoryEntry = {[
{<<"session_id">>, SessionId},
{<<"start_time">>, StartTime},
{<<"end_time">>, EndTime},
{<<"start_last_seq">>, StartSeq},
{<<"end_last_seq">>, NewSeq},
{<<"recorded_seq">>, NewSeq},
{<<"missing_checked">>, couch_replicator_stats:missing_checked(Stats)},
{<<"missing_found">>, couch_replicator_stats:missing_found(Stats)},
{<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
{<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
{<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
]},
BaseHistory = [
{<<"session_id">>, SessionId},
{<<"source_last_seq">>, NewSeq},
{<<"replication_id_version">>, ?REP_ID_VERSION}
] ++ case get_value(doc_ids, Options) of
undefined ->
[];
_DocIds ->
% backwards compatibility with the result of a replication by
% doc IDs in versions 0.11.x and 1.0.x
% TODO: deprecate (use same history format, simplify code)
[
{<<"start_time">>, StartTime},
{<<"end_time">>, EndTime},
{<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
{<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
{<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
]
end,
% limit history to 50 entries
NewRepHistory = {
BaseHistory ++
[{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
},
try
{SrcRevPos, SrcRevId} = update_checkpoint(
Source, SourceLog#doc{body = NewRepHistory}, source),
{TgtRevPos, TgtRevId} = update_checkpoint(
Target, TargetLog#doc{body = NewRepHistory}, target),
NewState = State#rep_state{
checkpoint_history = NewRepHistory,
committed_seq = NewTsSeq,
source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
},
update_task(NewState),
{ok, NewState}
catch throw:{checkpoint_commit_failure, _} = Failure ->
Failure
end;
{SrcInstanceStartTime, _NewTgtInstanceStartTime} ->
{checkpoint_commit_failure, <<"Target database out of sync. "
"Try to increase max_dbs_open at the target's server.">>};
{_NewSrcInstanceStartTime, TgtInstanceStartTime} ->
{checkpoint_commit_failure, <<"Source database out of sync. "
"Try to increase max_dbs_open at the source's server.">>};
{_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} ->
{checkpoint_commit_failure, <<"Source and target databases out of "
"sync. Try to increase max_dbs_open at both servers.">>}
end.
update_checkpoint(Db, Doc, DbType) ->
try
update_checkpoint(Db, Doc)
catch throw:{checkpoint_commit_failure, Reason} ->
throw({checkpoint_commit_failure,
<<"Error updating the ", (to_binary(DbType))/binary,
" checkpoint document: ", (to_binary(Reason))/binary>>})
end.
update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
try
case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
{ok, PosRevId} ->
PosRevId;
{error, Reason} ->
throw({checkpoint_commit_failure, Reason})
end
catch throw:conflict ->
case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of
{ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
% This means that we were able to update successfully the
% checkpoint doc in a previous attempt but we got a connection
% error (timeout for e.g.) before receiving the success response.
% Therefore the request was retried and we got a conflict, as the
% revision we sent is not the current one.
% We confirm this by verifying the doc body we just got is the same
% that we have just sent.
{Pos, RevId};
_ ->
throw({checkpoint_commit_failure, conflict})
end
end.
commit_to_both(Source, Target) ->
% commit the src async
ParentPid = self(),
SrcCommitPid = spawn_link(
fun() ->
Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)),
ParentPid ! {self(), Result}
end),
% commit tgt sync
TargetResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)),
SourceResult = receive
{SrcCommitPid, Result} ->
unlink(SrcCommitPid),
receive {'EXIT', SrcCommitPid, _} -> ok after 0 -> ok end,
Result;
{'EXIT', SrcCommitPid, Reason} ->
{error, Reason}
end,
case TargetResult of
{ok, TargetStartTime} ->
case SourceResult of
{ok, SourceStartTime} ->
{SourceStartTime, TargetStartTime};
SourceError ->
{source_error, SourceError}
end;
TargetError ->
{target_error, TargetError}
end.
compare_replication_logs(SrcDoc, TgtDoc) ->
#doc{body={RepRecProps}} = SrcDoc,
#doc{body={RepRecPropsTgt}} = TgtDoc,
case get_value(<<"session_id">>, RepRecProps) ==
get_value(<<"session_id">>, RepRecPropsTgt) of
true ->
% if the records have the same session id,
% then we have a valid replication history
OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ),
OldHistory = get_value(<<"history">>, RepRecProps, []),
{OldSeqNum, OldHistory};
false ->
SourceHistory = get_value(<<"history">>, RepRecProps, []),
TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
couch_log:notice("Replication records differ. "
"Scanning histories to find a common ancestor.", []),
couch_log:debug("Record on source:~p~nRecord on target:~p~n",
[RepRecProps, RepRecPropsTgt]),
compare_rep_history(SourceHistory, TargetHistory)
end.
compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
couch_log:notice("no common ancestry -- performing full replication", []),
{?LOWEST_SEQ, []};
compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
SourceId = get_value(<<"session_id">>, S),
case has_session_id(SourceId, Target) of
true ->
RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ),
couch_log:notice("found a common replication record with source_seq ~p",
[RecordSeqNum]),
{RecordSeqNum, SourceRest};
false ->
TargetId = get_value(<<"session_id">>, T),
case has_session_id(TargetId, SourceRest) of
true ->
RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ),
couch_log:notice("found a common replication record with source_seq ~p",
[RecordSeqNum]),
{RecordSeqNum, TargetRest};
false ->
compare_rep_history(SourceRest, TargetRest)
end
end.
has_session_id(_SessionId, []) ->
false;
has_session_id(SessionId, [{Props} | Rest]) ->
case get_value(<<"session_id">>, Props, nil) of
SessionId ->
true;
_Else ->
has_session_id(SessionId, Rest)
end.
db_monitor(#db{} = Db) ->
couch_db:monitor(Db);
db_monitor(_HttpDb) ->
nil.
get_pending_count(St) ->
Rep = St#rep_state.rep_details,
Timeout = get_value(connection_timeout, Rep#rep.options),
TimeoutMicro = Timeout * 1000,
case get(pending_count_state) of
{LastUpdate, PendingCount} ->
case timer:now_diff(os:timestamp(), LastUpdate) > TimeoutMicro of
true ->
NewPendingCount = get_pending_count_int(St),
put(pending_count_state, {os:timestamp(), NewPendingCount}),
NewPendingCount;
false ->
PendingCount
end;
undefined ->
NewPendingCount = get_pending_count_int(St),
put(pending_count_state, {os:timestamp(), NewPendingCount}),
NewPendingCount
end.
get_pending_count_int(#rep_state{source = #httpdb{} = Db0}=St) ->
{_, Seq} = St#rep_state.highest_seq_done,
Db = Db0#httpdb{retries = 3},
case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of
{ok, Pending} ->
Pending;
_ ->
null
end;
get_pending_count_int(#rep_state{source = Db}=St) ->
{_, Seq} = St#rep_state.highest_seq_done,
{ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq),
Pending.
update_task(State) ->
#rep_state{
current_through_seq = {_, ThroughSeq},
highest_seq_done = {_, HighestSeq}
} = State,
couch_task_status:update(
rep_stats(State) ++ [
{source_seq, HighestSeq},
{through_seq, ThroughSeq}
]).
rep_stats(State) ->
#rep_state{
committed_seq = {_, CommittedSeq},
stats = Stats
} = State,
[
{revisions_checked, couch_replicator_stats:missing_checked(Stats)},
{missing_revisions_found, couch_replicator_stats:missing_found(Stats)},
{docs_read, couch_replicator_stats:docs_read(Stats)},
{docs_written, couch_replicator_stats:docs_written(Stats)},
{changes_pending, get_pending_count(State)},
{doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)},
{checkpointed_source_seq, CommittedSeq}
].