% Public API
% Exported for code reloading
-import(couch_util, [
start_link(StartSeq, #httpdb{} = Db, ChangesQueue, Options) ->
Parent = self(),
{ok, spawn_link(fun() ->
put(last_seq, StartSeq),
put(retries_left, Db#httpdb.retries),
?MODULE:read_changes(Parent, StartSeq, Db#httpdb{retries = 0},
ChangesQueue, Options, 1)
start_link(StartSeq, Db, ChangesQueue, Options) ->
Parent = self(),
{ok, spawn_link(fun() ->
?MODULE:read_changes(Parent, StartSeq, Db, ChangesQueue, Options, 1)
read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) ->
Continuous = couch_util:get_value(continuous, Options),
couch_replicator_api_wrap:changes_since(Db, all_docs, StartSeq,
fun(Item) ->
process_change(Item, {Parent, Db, ChangesQueue, Continuous, Ts})
end, Options),
throw:recurse ->
LS = get(last_seq),
read_changes(Parent, LS, Db, ChangesQueue, Options, Ts+1);
exit:{http_request_failed, _, _, _} = Error ->
[couch_replicator, changes_read_failures]
case get(retries_left) of
N when N > 0 ->
put(retries_left, N - 1),
LastSeq = get(last_seq),
Db2 = case LastSeq of
StartSeq ->
couch_log:notice("Retrying _changes request to source database ~s"
" with since=~p in ~p seconds",
[couch_replicator_api_wrap:db_uri(Db), LastSeq, Db#httpdb.wait / 1000]),
ok = timer:sleep(Db#httpdb.wait),
Db#httpdb{wait = 2 * Db#httpdb.wait};
_ ->
couch_log:notice("Retrying _changes request to source database ~s"
" with since=~p", [couch_replicator_api_wrap:db_uri(Db), LastSeq]),
read_changes(Parent, LastSeq, Db2, ChangesQueue, Options, Ts);
_ ->
process_change(#doc_info{id = <<>>} = DocInfo, {_, Db, _, _, _}) ->
% Previous CouchDB releases had a bug which allowed a doc with an empty ID
% to be inserted into databases. Such doc is impossible to GET.
couch_log:error("Replicator: ignoring document with empty ID in "
"source database `~s` (_changes sequence ~p)",
[couch_replicator_api_wrap:db_uri(Db), DocInfo#doc_info.high_seq]);
process_change(#doc_info{} = DocInfo, {_, _, ChangesQueue, _, _}) ->
ok = couch_work_queue:queue(ChangesQueue, DocInfo),
put(last_seq, DocInfo#doc_info.high_seq);
process_change({last_seq, LS}, {Parent, _, _, true = _Continuous, Ts}) ->
% LS should never be undefined, but it doesn't hurt to be defensive inside
% the replicator.
Seq = case LS of undefined -> get(last_seq); _ -> LS end,
OldSeq = get(last_seq),
if Seq == OldSeq -> ok; true ->
Msg = {report_seq_done, {Ts, Seq}, couch_replicator_stats:new()},
ok = gen_server:call(Parent, Msg, infinity)
put(last_seq, Seq),
process_change({last_seq, _}, _) ->
% This clause is unreachable today, but let's plan ahead for the future
% where we checkpoint against last_seq instead of the sequence of the last
% change. The two can differ substantially in the case of a restrictive
% filter.