| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_rep). |
| -behaviour(gen_server). |
| -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, |
| code_change/3]). |
| |
| -export([replicate/2, checkpoint/1]). |
| -export([ensure_rep_db_exists/0, make_replication_id/2]). |
| -export([start_replication/3, end_replication/1, get_result/4]). |
| -export([update_rep_doc/2]). |
| |
| -include("couch_db.hrl"). |
| -include("couch_js_functions.hrl"). |
| -include("../ibrowse/ibrowse.hrl"). |
| |
| -define(REP_ID_VERSION, 2). |
| |
| -record(state, { |
| changes_feed, |
| missing_revs, |
| reader, |
| writer, |
| |
| source, |
| target, |
| continuous, |
| create_target, |
| init_args, |
| checkpoint_scheduled = nil, |
| |
| start_seq, |
| history, |
| session_id, |
| source_log, |
| target_log, |
| rep_starttime, |
| src_starttime, |
| tgt_starttime, |
| checkpoint_history = nil, |
| |
| listeners = [], |
| complete = false, |
| committed_seq = 0, |
| |
| stats = nil, |
| rep_doc = nil, |
| source_db_update_notifier = nil, |
| target_db_update_notifier = nil |
| }). |
| |
| %% convenience function to do a simple replication from the shell |
| replicate(Source, Target) when is_list(Source) -> |
| replicate(?l2b(Source), Target); |
| replicate(Source, Target) when is_binary(Source), is_list(Target) -> |
| replicate(Source, ?l2b(Target)); |
| replicate(Source, Target) when is_binary(Source), is_binary(Target) -> |
| replicate({[{<<"source">>, Source}, {<<"target">>, Target}]}, #user_ctx{}); |
| |
| %% function handling POST to _replicate |
| replicate({Props}=PostBody, UserCtx) -> |
| RepId = make_replication_id(PostBody, UserCtx), |
| case couch_util:get_value(<<"cancel">>, Props, false) of |
| true -> |
| end_replication(RepId); |
| false -> |
| Server = start_replication(PostBody, RepId, UserCtx), |
| get_result(Server, RepId, PostBody, UserCtx) |
| end. |
| |
| end_replication({BaseId, Extension}) -> |
| RepId = BaseId ++ Extension, |
| case supervisor:terminate_child(couch_rep_sup, RepId) of |
| {error, not_found} = R -> |
| R; |
| ok -> |
| case supervisor:delete_child(couch_rep_sup, RepId) of |
| ok -> |
| {ok, {cancelled, ?l2b(BaseId)}}; |
| {error, not_found} -> |
| {ok, {cancelled, ?l2b(BaseId)}}; |
| {error, _} = Error -> |
| Error |
| end |
| end. |
| |
| start_replication(RepDoc, {BaseId, Extension}, UserCtx) -> |
| Replicator = { |
| BaseId ++ Extension, |
| {gen_server, start_link, |
| [?MODULE, [BaseId, RepDoc, UserCtx], []]}, |
| temporary, |
| 1, |
| worker, |
| [?MODULE] |
| }, |
| start_replication_server(Replicator). |
| |
| checkpoint(Server) -> |
| gen_server:cast(Server, do_checkpoint). |
| |
| get_result(Server, {BaseId, _Extension}, {Props} = PostBody, UserCtx) -> |
| case couch_util:get_value(<<"continuous">>, Props, false) of |
| true -> |
| {ok, {continuous, ?l2b(BaseId)}}; |
| false -> |
| try gen_server:call(Server, get_result, infinity) of |
| retry -> replicate(PostBody, UserCtx); |
| Else -> Else |
| catch |
| exit:{noproc, {gen_server, call, [Server, get_result, infinity]}} -> |
| %% oops, this replication just finished -- restart it. |
| replicate(PostBody, UserCtx); |
| exit:{normal, {gen_server, call, [Server, get_result, infinity]}} -> |
| %% we made the call during terminate |
| replicate(PostBody, UserCtx) |
| end |
| end. |
| |
| init(InitArgs) -> |
| try |
| do_init(InitArgs) |
| catch |
| throw:Error -> |
| {stop, Error} |
| end. |
| |
| do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) -> |
| process_flag(trap_exit, true), |
| |
| SourceProps = couch_util:get_value(<<"source">>, PostProps), |
| TargetProps = couch_util:get_value(<<"target">>, PostProps), |
| |
| Continuous = couch_util:get_value(<<"continuous">>, PostProps, false), |
| CreateTarget = couch_util:get_value(<<"create_target">>, PostProps, false), |
| |
| ProxyParams = parse_proxy_params( |
| couch_util:get_value(<<"proxy">>, PostProps, [])), |
| Source = open_db(SourceProps, UserCtx, ProxyParams), |
| Target = open_db(TargetProps, UserCtx, ProxyParams, CreateTarget), |
| |
| SourceInfo = dbinfo(Source), |
| TargetInfo = dbinfo(Target), |
| |
| maybe_set_triggered(RepDoc, RepId), |
| |
| [SourceLog, TargetLog] = find_replication_logs( |
| [Source, Target], RepId, {PostProps}, UserCtx), |
| {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), |
| |
| {ok, ChangesFeed} = |
| couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps), |
| {ok, MissingRevs} = |
| couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps), |
| {ok, Reader} = |
| couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps), |
| {ok, Writer} = |
| couch_rep_writer:start_link(self(), Target, Reader, PostProps), |
| |
| Stats = ets:new(replication_stats, [set, private]), |
| ets:insert(Stats, {total_revs,0}), |
| ets:insert(Stats, {missing_revs, 0}), |
| ets:insert(Stats, {docs_read, 0}), |
| ets:insert(Stats, {docs_written, 0}), |
| ets:insert(Stats, {doc_write_failures, 0}), |
| |
| {ShortId, _} = lists:split(6, RepId), |
| couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s", |
| [ShortId, dbname(Source), dbname(Target)]), "Starting"), |
| |
| State = #state{ |
| changes_feed = ChangesFeed, |
| missing_revs = MissingRevs, |
| reader = Reader, |
| writer = Writer, |
| |
| source = Source, |
| target = Target, |
| continuous = Continuous, |
| create_target = CreateTarget, |
| init_args = InitArgs, |
| stats = Stats, |
| checkpoint_scheduled = nil, |
| |
| start_seq = StartSeq, |
| history = History, |
| session_id = couch_uuids:random(), |
| source_log = SourceLog, |
| target_log = TargetLog, |
| rep_starttime = httpd_util:rfc1123_date(), |
| src_starttime = couch_util:get_value(instance_start_time, SourceInfo), |
| tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo), |
| rep_doc = RepDoc, |
| source_db_update_notifier = source_db_update_notifier(Source), |
| target_db_update_notifier = target_db_update_notifier(Target) |
| }, |
| {ok, State}. |
| |
| handle_call(get_result, From, #state{complete=true, listeners=[]} = State) -> |
| {stop, normal, State#state{listeners=[From]}}; |
| handle_call(get_result, From, State) -> |
| Listeners = State#state.listeners, |
| {noreply, State#state{listeners=[From|Listeners]}}; |
| |
| handle_call(get_source_db, _From, #state{source = Source} = State) -> |
| {reply, {ok, Source}, State}; |
| |
| handle_call(get_target_db, _From, #state{target = Target} = State) -> |
| {reply, {ok, Target}, State}. |
| |
| handle_cast(reopen_source_db, #state{source = Source} = State) -> |
| {ok, NewSource} = couch_db:reopen(Source), |
| {noreply, State#state{source = NewSource}}; |
| |
| handle_cast(reopen_target_db, #state{target = Target} = State) -> |
| {ok, NewTarget} = couch_db:reopen(Target), |
| {noreply, State#state{target = NewTarget}}; |
| |
| handle_cast(do_checkpoint, State) -> |
| {noreply, do_checkpoint(State)}; |
| |
| handle_cast(_Msg, State) -> |
| {noreply, State}. |
| |
| handle_info({missing_revs_checkpoint, SourceSeq}, State) -> |
| couch_task_status:update("MR Processed source update #~p", [SourceSeq]), |
| {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})}; |
| |
| handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State) |
| when SourceSeq > N -> |
| MissingRevs = State#state.missing_revs, |
| ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}), |
| couch_task_status:update("W Processed source update #~p", [SourceSeq]), |
| {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})}; |
| handle_info({writer_checkpoint, _}, State) -> |
| {noreply, State}; |
| |
| handle_info({update_stats, Key, N}, State) -> |
| ets:update_counter(State#state.stats, Key, N), |
| {noreply, State}; |
| |
| handle_info({'DOWN', _, _, _, _}, State) -> |
| ?LOG_INFO("replication terminating because local DB is shutting down", []), |
| timer:cancel(State#state.checkpoint_scheduled), |
| {stop, shutdown, State}; |
| |
| handle_info({'EXIT', Writer, normal}, #state{writer=Writer} = State) -> |
| case State#state.listeners of |
| [] -> |
| {noreply, State#state{complete = true}}; |
| _Else -> |
| {stop, normal, State} |
| end; |
| |
| handle_info({'EXIT', _, normal}, State) -> |
| {noreply, State}; |
| handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error; |
| Err == target_error -> |
| ?LOG_INFO("replication terminating due to ~p: ~p", [Err, Reason]), |
| timer:cancel(State#state.checkpoint_scheduled), |
| {stop, shutdown, State}; |
| handle_info({'EXIT', _Pid, Reason}, State) -> |
| {stop, Reason, State}. |
| |
| terminate(normal, #state{checkpoint_scheduled=nil} = State) -> |
| do_terminate(State), |
| update_rep_doc( |
| State#state.rep_doc, [{<<"_replication_state">>, <<"completed">>}]); |
| |
| terminate(normal, State) -> |
| timer:cancel(State#state.checkpoint_scheduled), |
| do_terminate(do_checkpoint(State)), |
| update_rep_doc( |
| State#state.rep_doc, [{<<"_replication_state">>, <<"completed">>}]); |
| |
| terminate(shutdown, #state{listeners = Listeners} = State) -> |
| % continuous replication stopped |
| [gen_server:reply(L, {ok, stopped}) || L <- Listeners], |
| terminate_cleanup(State); |
| |
| terminate(Reason, #state{listeners = Listeners} = State) -> |
| [gen_server:reply(L, {error, Reason}) || L <- Listeners], |
| terminate_cleanup(State), |
| update_rep_doc( |
| State#state.rep_doc, [{<<"_replication_state">>, <<"error">>}]). |
| |
| code_change(_OldVsn, State, _Extra) -> |
| {ok, State}. |
| |
| % internal funs |
| |
| start_replication_server(Replicator) -> |
| RepId = element(1, Replicator), |
| case supervisor:start_child(couch_rep_sup, Replicator) of |
| {ok, Pid} -> |
| ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]), |
| Pid; |
| {error, already_present} -> |
| case supervisor:restart_child(couch_rep_sup, RepId) of |
| {ok, Pid} -> |
| ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]), |
| Pid; |
| {error, running} -> |
| %% this error occurs if multiple replicators are racing |
| %% each other to start and somebody else won. Just grab |
| %% the Pid by calling start_child again. |
| {error, {already_started, Pid}} = |
| supervisor:start_child(couch_rep_sup, Replicator), |
| ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), |
| Pid; |
| {error, {db_not_found, DbUrl}} -> |
| throw({db_not_found, <<"could not open ", DbUrl/binary>>}); |
| {error, {unauthorized, DbUrl}} -> |
| throw({unauthorized, |
| <<"unauthorized to access database ", DbUrl/binary>>}); |
| {error, {'EXIT', {badarg, |
| [{erlang, apply, [gen_server, start_link, undefined]} | _]}}} -> |
| % Clause to deal with a change in the supervisor module introduced |
| % in R14B02. For more details consult the thread at: |
| % http://erlang.org/pipermail/erlang-bugs/2011-March/002273.html |
| _ = supervisor:delete_child(couch_rep_sup, RepId), |
| start_replication_server(Replicator) |
| end; |
| {error, {already_started, Pid}} -> |
| ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), |
| Pid; |
| {error, {{db_not_found, DbUrl}, _}} -> |
| throw({db_not_found, <<"could not open ", DbUrl/binary>>}); |
| {error, {{unauthorized, DbUrl}, _}} -> |
| throw({unauthorized, |
| <<"unauthorized to access database ", DbUrl/binary>>}) |
| end. |
| |
| compare_replication_logs(SrcDoc, TgtDoc) -> |
| #doc{body={RepRecProps}} = SrcDoc, |
| #doc{body={RepRecPropsTgt}} = TgtDoc, |
| case couch_util:get_value(<<"session_id">>, RepRecProps) == |
| couch_util:get_value(<<"session_id">>, RepRecPropsTgt) of |
| true -> |
| % if the records have the same session id, |
| % then we have a valid replication history |
| OldSeqNum = couch_util:get_value(<<"source_last_seq">>, RepRecProps, 0), |
| OldHistory = couch_util:get_value(<<"history">>, RepRecProps, []), |
| {OldSeqNum, OldHistory}; |
| false -> |
| SourceHistory = couch_util:get_value(<<"history">>, RepRecProps, []), |
| TargetHistory = couch_util:get_value(<<"history">>, RepRecPropsTgt, []), |
| ?LOG_INFO("Replication records differ. " |
| "Scanning histories to find a common ancestor.", []), |
| ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", |
| [RepRecProps, RepRecPropsTgt]), |
| compare_rep_history(SourceHistory, TargetHistory) |
| end. |
| |
| compare_rep_history(S, T) when S =:= [] orelse T =:= [] -> |
| ?LOG_INFO("no common ancestry -- performing full replication", []), |
| {0, []}; |
| compare_rep_history([{S}|SourceRest], [{T}|TargetRest]=Target) -> |
| SourceId = couch_util:get_value(<<"session_id">>, S), |
| case has_session_id(SourceId, Target) of |
| true -> |
| RecordSeqNum = couch_util:get_value(<<"recorded_seq">>, S, 0), |
| ?LOG_INFO("found a common replication record with source_seq ~p", |
| [RecordSeqNum]), |
| {RecordSeqNum, SourceRest}; |
| false -> |
| TargetId = couch_util:get_value(<<"session_id">>, T), |
| case has_session_id(TargetId, SourceRest) of |
| true -> |
| RecordSeqNum = couch_util:get_value(<<"recorded_seq">>, T, 0), |
| ?LOG_INFO("found a common replication record with source_seq ~p", |
| [RecordSeqNum]), |
| {RecordSeqNum, TargetRest}; |
| false -> |
| compare_rep_history(SourceRest, TargetRest) |
| end |
| end. |
| |
| close_db(#http_db{}) -> |
| ok; |
| close_db(Db) -> |
| couch_db:close(Db). |
| |
| dbname(#http_db{url = Url}) -> |
| couch_util:url_strip_password(Url); |
| dbname(#db{name = Name}) -> |
| Name. |
| |
| dbinfo(#http_db{} = Db) -> |
| {DbProps} = couch_rep_httpc:request(Db), |
| [{couch_util:to_existing_atom(K), V} || {K,V} <- DbProps]; |
| dbinfo(Db) -> |
| {ok, Info} = couch_db:get_db_info(Db), |
| Info. |
| |
| do_terminate(State) -> |
| #state{ |
| checkpoint_history = CheckpointHistory, |
| committed_seq = NewSeq, |
| listeners = Listeners, |
| source = Source, |
| continuous = Continuous, |
| source_log = #doc{body={OldHistory}} |
| } = State, |
| |
| NewRepHistory = case CheckpointHistory of |
| nil -> |
| {[{<<"no_changes">>, true} | OldHistory]}; |
| _Else -> |
| CheckpointHistory |
| end, |
| |
| %% reply to original requester |
| OtherListeners = case Continuous of |
| true -> |
| []; % continuous replications have no listeners |
| _ -> |
| [Original|Rest] = lists:reverse(Listeners), |
| gen_server:reply(Original, {ok, NewRepHistory}), |
| Rest |
| end, |
| |
| %% maybe trigger another replication. If this replicator uses a local |
| %% source Db, changes to that Db since we started will not be included in |
| %% this pass. |
| case up_to_date(Source, NewSeq) of |
| true -> |
| [gen_server:reply(R, {ok, NewRepHistory}) || R <- OtherListeners]; |
| false -> |
| [gen_server:reply(R, retry) || R <- OtherListeners] |
| end, |
| couch_task_status:update("Finishing"), |
| terminate_cleanup(State). |
| |
| terminate_cleanup(State) -> |
| close_db(State#state.source), |
| close_db(State#state.target), |
| stop_db_update_notifier(State#state.source_db_update_notifier), |
| stop_db_update_notifier(State#state.target_db_update_notifier), |
| ets:delete(State#state.stats). |
| |
| stop_db_update_notifier(nil) -> |
| ok; |
| stop_db_update_notifier(Notifier) -> |
| couch_db_update_notifier:stop(Notifier). |
| |
| has_session_id(_SessionId, []) -> |
| false; |
| has_session_id(SessionId, [{Props} | Rest]) -> |
| case couch_util:get_value(<<"session_id">>, Props, nil) of |
| SessionId -> |
| true; |
| _Else -> |
| has_session_id(SessionId, Rest) |
| end. |
| |
| maybe_append_options(Options, {Props}) -> |
| lists:foldl(fun(Option, Acc) -> |
| Acc ++ |
| case couch_util:get_value(Option, Props, false) of |
| true -> |
| "+" ++ ?b2l(Option); |
| false -> |
| "" |
| end |
| end, [], Options). |
| |
| make_replication_id(RepProps, UserCtx) -> |
| BaseId = make_replication_id(RepProps, UserCtx, ?REP_ID_VERSION), |
| Extension = maybe_append_options( |
| [<<"continuous">>, <<"create_target">>], RepProps), |
| {BaseId, Extension}. |
| |
| % Versioned clauses for generating replication ids |
| % If a change is made to how replications are identified |
| % add a new clause and increase ?REP_ID_VERSION at the top |
| make_replication_id({Props}, UserCtx, 2) -> |
| {ok, HostName} = inet:gethostname(), |
| Port = mochiweb_socket_server:get(couch_httpd, port), |
| Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), |
| Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), |
| maybe_append_filters({Props}, [HostName, Port, Src, Tgt], UserCtx); |
| make_replication_id({Props}, UserCtx, 1) -> |
| {ok, HostName} = inet:gethostname(), |
| Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), |
| Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), |
| maybe_append_filters({Props}, [HostName, Src, Tgt], UserCtx). |
| |
| maybe_append_filters({Props}, Base, UserCtx) -> |
| Base2 = Base ++ |
| case couch_util:get_value(<<"filter">>, Props) of |
| undefined -> |
| case couch_util:get_value(<<"doc_ids">>, Props) of |
| undefined -> |
| []; |
| DocIds -> |
| [DocIds] |
| end; |
| Filter -> |
| [filter_code(Filter, Props, UserCtx), |
| couch_util:get_value(<<"query_params">>, Props, {[]})] |
| end, |
| couch_util:to_hex(couch_util:md5(term_to_binary(Base2))). |
| |
| filter_code(Filter, Props, UserCtx) -> |
| {match, [DDocName, FilterName]} = |
| re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]), |
| ProxyParams = parse_proxy_params( |
| couch_util:get_value(<<"proxy">>, Props, [])), |
| Source = open_db( |
| couch_util:get_value(<<"source">>, Props), UserCtx, ProxyParams), |
| try |
| {ok, DDoc} = open_doc(Source, <<"_design/", DDocName/binary>>), |
| Code = couch_util:get_nested_json_value( |
| DDoc#doc.body, [<<"filters">>, FilterName]), |
| re:replace(Code, "^\s*(.*?)\s*$", "\\1", [{return, binary}]) |
| after |
| close_db(Source) |
| end. |
| |
| maybe_add_trailing_slash(Url) -> |
| re:replace(Url, "[^/]$", "&/", [{return, list}]). |
| |
| get_rep_endpoint(_UserCtx, {Props}) -> |
| Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)), |
| {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), |
| {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}), |
| case couch_util:get_value(<<"oauth">>, Auth) of |
| undefined -> |
| {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]}; |
| {OAuth} -> |
| {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth} |
| end; |
| get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) -> |
| {remote, maybe_add_trailing_slash(Url), []}; |
| get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) -> |
| {remote, maybe_add_trailing_slash(Url), []}; |
| get_rep_endpoint(UserCtx, <<DbName/binary>>) -> |
| {local, DbName, UserCtx}. |
| |
| find_replication_logs(DbList, RepId, RepProps, UserCtx) -> |
| LogId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), |
| fold_replication_logs(DbList, ?REP_ID_VERSION, |
| LogId, LogId, RepProps, UserCtx, []). |
| |
| % Accumulate the replication logs |
| % Falls back to older log document ids and migrates them |
| fold_replication_logs([], _Vsn, _LogId, _NewId, _RepProps, _UserCtx, Acc) -> |
| lists:reverse(Acc); |
| fold_replication_logs([Db|Rest]=Dbs, Vsn, LogId, NewId, |
| RepProps, UserCtx, Acc) -> |
| case open_replication_log(Db, LogId) of |
| {error, not_found} when Vsn > 1 -> |
| OldRepId = make_replication_id(RepProps, UserCtx, Vsn - 1), |
| fold_replication_logs(Dbs, Vsn - 1, |
| ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, RepProps, UserCtx, Acc); |
| {error, not_found} -> |
| fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, |
| RepProps, UserCtx, [#doc{id=NewId}|Acc]); |
| {ok, Doc} when LogId =:= NewId -> |
| fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, |
| RepProps, UserCtx, [Doc|Acc]); |
| {ok, Doc} -> |
| MigratedLog = #doc{id=NewId,body=Doc#doc.body}, |
| fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, |
| RepProps, UserCtx, [MigratedLog|Acc]) |
| end. |
| |
| open_replication_log(Db, DocId) -> |
| case open_doc(Db, DocId) of |
| {ok, Doc} -> |
| ?LOG_DEBUG("found a replication log for ~s", [dbname(Db)]), |
| {ok, Doc}; |
| _ -> |
| ?LOG_DEBUG("didn't find a replication log for ~s", [dbname(Db)]), |
| {error, not_found} |
| end. |
| |
| open_doc(#http_db{} = Db, DocId) -> |
| Req = Db#http_db{resource = couch_util:encode_doc_id(DocId)}, |
| case couch_rep_httpc:request(Req) of |
| {[{<<"error">>, _}, {<<"reason">>, _}]} -> |
| {error, not_found}; |
| Doc -> |
| {ok, couch_doc:from_json_obj(Doc)} |
| end; |
| open_doc(Db, DocId) -> |
| couch_db:open_doc(Db, DocId). |
| |
| open_db(Props, UserCtx, ProxyParams) -> |
| open_db(Props, UserCtx, ProxyParams, false). |
| |
| open_db({Props}, _UserCtx, ProxyParams, CreateTarget) -> |
| Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)), |
| {AuthProps} = couch_util:get_value(<<"auth">>, Props, {[]}), |
| {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), |
| Headers = [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], |
| DefaultHeaders = (#http_db{})#http_db.headers, |
| Db1 = #http_db{ |
| url = Url, |
| auth = AuthProps, |
| headers = lists:ukeymerge(1, Headers, DefaultHeaders) |
| }, |
| Db = Db1#http_db{ |
| options = Db1#http_db.options ++ ProxyParams ++ |
| couch_rep_httpc:ssl_options(Db1) |
| }, |
| couch_rep_httpc:db_exists(Db, CreateTarget); |
| open_db(<<"http://",_/binary>>=Url, _, ProxyParams, CreateTarget) -> |
| open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget); |
| open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) -> |
| open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget); |
| open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) -> |
| try |
| case CreateTarget of |
| true -> |
| ok = couch_httpd:verify_is_server_admin(UserCtx), |
| couch_server:create(DbName, [{user_ctx, UserCtx}]); |
| false -> |
| ok |
| end, |
| |
| case couch_db:open(DbName, [{user_ctx, UserCtx}]) of |
| {ok, Db} -> |
| couch_db:monitor(Db), |
| Db; |
| {not_found, no_db_file} -> |
| throw({db_not_found, DbName}) |
| end |
| catch throw:{unauthorized, _} -> |
| throw({unauthorized, DbName}) |
| end. |
| |
| schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) -> |
| Server = self(), |
| case timer:apply_after(5000, couch_rep, checkpoint, [Server]) of |
| {ok, TRef} -> |
| State#state{checkpoint_scheduled = TRef}; |
| Error -> |
| ?LOG_ERROR("tried to schedule a checkpoint but got ~p", [Error]), |
| State |
| end; |
| schedule_checkpoint(State) -> |
| State. |
| |
| do_checkpoint(State) -> |
| #state{ |
| source = Source, |
| target = Target, |
| committed_seq = NewSeqNum, |
| start_seq = StartSeqNum, |
| history = OldHistory, |
| session_id = SessionId, |
| source_log = SourceLog, |
| target_log = TargetLog, |
| rep_starttime = ReplicationStartTime, |
| src_starttime = SrcInstanceStartTime, |
| tgt_starttime = TgtInstanceStartTime, |
| stats = Stats, |
| rep_doc = {RepDoc} |
| } = State, |
| case commit_to_both(Source, Target, NewSeqNum) of |
| {SrcInstanceStartTime, TgtInstanceStartTime} -> |
| ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p", |
| [dbname(Source), dbname(Target), NewSeqNum]), |
| EndTime = ?l2b(httpd_util:rfc1123_date()), |
| StartTime = ?l2b(ReplicationStartTime), |
| DocsRead = ets:lookup_element(Stats, docs_read, 2), |
| DocsWritten = ets:lookup_element(Stats, docs_written, 2), |
| DocWriteFailures = ets:lookup_element(Stats, doc_write_failures, 2), |
| NewHistoryEntry = {[ |
| {<<"session_id">>, SessionId}, |
| {<<"start_time">>, StartTime}, |
| {<<"end_time">>, EndTime}, |
| {<<"start_last_seq">>, StartSeqNum}, |
| {<<"end_last_seq">>, NewSeqNum}, |
| {<<"recorded_seq">>, NewSeqNum}, |
| {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, |
| {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, |
| {<<"docs_read">>, DocsRead}, |
| {<<"docs_written">>, DocsWritten}, |
| {<<"doc_write_failures">>, DocWriteFailures} |
| ]}, |
| BaseHistory = [ |
| {<<"session_id">>, SessionId}, |
| {<<"source_last_seq">>, NewSeqNum}, |
| {<<"replication_id_version">>, ?REP_ID_VERSION} |
| ] ++ case couch_util:get_value(<<"doc_ids">>, RepDoc) of |
| undefined -> |
| []; |
| DocIds when is_list(DocIds) -> |
| % backwards compatibility with the result of a replication by |
| % doc IDs in versions 0.11.x and 1.0.x |
| [ |
| {<<"start_time">>, StartTime}, |
| {<<"end_time">>, EndTime}, |
| {<<"docs_read">>, DocsRead}, |
| {<<"docs_written">>, DocsWritten}, |
| {<<"doc_write_failures">>, DocWriteFailures} |
| ] |
| end, |
| % limit history to 50 entries |
| NewRepHistory = { |
| BaseHistory ++ |
| [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}] |
| }, |
| |
| try |
| {SrcRevPos,SrcRevId} = |
| update_local_doc(Source, SourceLog#doc{body=NewRepHistory}), |
| {TgtRevPos,TgtRevId} = |
| update_local_doc(Target, TargetLog#doc{body=NewRepHistory}), |
| State#state{ |
| checkpoint_scheduled = nil, |
| checkpoint_history = NewRepHistory, |
| source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}}, |
| target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}} |
| } |
| catch throw:conflict -> |
| ?LOG_ERROR("checkpoint failure: conflict (are you replicating to " |
| "yourself?)", []), |
| State |
| end; |
| _Else -> |
| ?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint", |
| [dbname(Source), dbname(Target)]), |
| #state{ |
| changes_feed = CF, |
| missing_revs = MR, |
| reader = Reader, |
| writer = Writer |
| } = State, |
| Pids = [Writer, Reader, MR, CF], |
| [unlink(Pid) || Pid <- Pids], |
| [exit(Pid, shutdown) || Pid <- Pids], |
| close_db(Target), |
| close_db(Source), |
| {ok, NewState} = init(State#state.init_args), |
| NewState#state{listeners=State#state.listeners} |
| end. |
| |
| commit_to_both(Source, Target, RequiredSeq) -> |
| % commit the src async |
| ParentPid = self(), |
| SrcCommitPid = spawn_link(fun() -> |
| ParentPid ! {self(), ensure_full_commit(Source, RequiredSeq)} end), |
| |
| % commit tgt sync |
| TargetStartTime = ensure_full_commit(Target), |
| |
| SourceStartTime = |
| receive |
| {SrcCommitPid, Timestamp} -> |
| Timestamp; |
| {'EXIT', SrcCommitPid, {http_request_failed, _}} -> |
| exit(replication_link_failure) |
| end, |
| {SourceStartTime, TargetStartTime}. |
| |
| ensure_full_commit(#http_db{headers = Headers} = Target) -> |
| Headers1 = [ |
| {"Content-Length", 0} | |
| couch_util:proplist_apply_field( |
| {"Content-Type", "application/json"}, Headers) |
| ], |
| Req = Target#http_db{ |
| resource = "_ensure_full_commit", |
| method = post, |
| headers = Headers1 |
| }, |
| {ResultProps} = couch_rep_httpc:request(Req), |
| true = couch_util:get_value(<<"ok">>, ResultProps), |
| couch_util:get_value(<<"instance_start_time">>, ResultProps); |
| ensure_full_commit(Target) -> |
| {ok, NewDb} = couch_db:open_int(Target#db.name, []), |
| UpdateSeq = couch_db:get_update_seq(Target), |
| CommitSeq = couch_db:get_committed_update_seq(NewDb), |
| InstanceStartTime = NewDb#db.instance_start_time, |
| couch_db:close(NewDb), |
| if UpdateSeq > CommitSeq -> |
| ?LOG_DEBUG("target needs a full commit: update ~p commit ~p", |
| [UpdateSeq, CommitSeq]), |
| {ok, DbStartTime} = couch_db:ensure_full_commit(Target), |
| DbStartTime; |
| true -> |
| ?LOG_DEBUG("target doesn't need a full commit", []), |
| InstanceStartTime |
| end. |
| |
| ensure_full_commit(#http_db{headers = Headers} = Source, RequiredSeq) -> |
| Headers1 = [ |
| {"Content-Length", 0} | |
| couch_util:proplist_apply_field( |
| {"Content-Type", "application/json"}, Headers) |
| ], |
| Req = Source#http_db{ |
| resource = "_ensure_full_commit", |
| method = post, |
| qs = [{seq, RequiredSeq}], |
| headers = Headers1 |
| }, |
| {ResultProps} = couch_rep_httpc:request(Req), |
| case couch_util:get_value(<<"ok">>, ResultProps) of |
| true -> |
| couch_util:get_value(<<"instance_start_time">>, ResultProps); |
| undefined -> nil end; |
| ensure_full_commit(Source, RequiredSeq) -> |
| {ok, NewDb} = couch_db:open_int(Source#db.name, []), |
| CommitSeq = couch_db:get_committed_update_seq(NewDb), |
| InstanceStartTime = NewDb#db.instance_start_time, |
| couch_db:close(NewDb), |
| if RequiredSeq > CommitSeq -> |
| ?LOG_DEBUG("source needs a full commit: required ~p committed ~p", |
| [RequiredSeq, CommitSeq]), |
| {ok, DbStartTime} = couch_db:ensure_full_commit(Source), |
| DbStartTime; |
| true -> |
| ?LOG_DEBUG("source doesn't need a full commit", []), |
| InstanceStartTime |
| end. |
| |
| update_local_doc(#http_db{} = Db, Doc) -> |
| Req = Db#http_db{ |
| resource = couch_util:encode_doc_id(Doc), |
| method = put, |
| body = couch_doc:to_json_obj(Doc, [attachments]), |
| headers = [{"x-couch-full-commit", "false"} | Db#http_db.headers] |
| }, |
| {ResponseMembers} = couch_rep_httpc:request(Req), |
| Rev = couch_util:get_value(<<"rev">>, ResponseMembers), |
| couch_doc:parse_rev(Rev); |
| update_local_doc(Db, Doc) -> |
| {ok, Result} = couch_db:update_doc(Db, Doc, [delay_commit]), |
| Result. |
| |
| up_to_date(#http_db{}, _Seq) -> |
| true; |
| up_to_date(Source, Seq) -> |
| {ok, NewDb} = couch_db:open_int(Source#db.name, []), |
| T = NewDb#db.update_seq == Seq, |
| couch_db:close(NewDb), |
| T. |
| |
| parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) -> |
| parse_proxy_params(?b2l(ProxyUrl)); |
| parse_proxy_params([]) -> |
| []; |
| parse_proxy_params(ProxyUrl) -> |
| #url{ |
| host = Host, |
| port = Port, |
| username = User, |
| password = Passwd |
| } = ibrowse_lib:parse_url(ProxyUrl), |
| [{proxy_host, Host}, {proxy_port, Port}] ++ |
| case is_list(User) andalso is_list(Passwd) of |
| false -> |
| []; |
| true -> |
| [{proxy_user, User}, {proxy_password, Passwd}] |
| end. |
| |
| update_rep_doc({Props} = _RepDoc, KVs) -> |
| case couch_util:get_value(<<"_id">>, Props) of |
| undefined -> |
| % replication triggered by POSTing to _replicate/ |
| ok; |
| RepDocId -> |
| % replication triggered by adding a Rep Doc to the replicator DB |
| {ok, RepDb} = ensure_rep_db_exists(), |
| case couch_db:open_doc(RepDb, RepDocId, []) of |
| {ok, LatestRepDoc} -> |
| update_rep_doc(RepDb, LatestRepDoc, KVs); |
| _ -> |
| ok |
| end, |
| couch_db:close(RepDb) |
| end. |
| |
| update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) -> |
| NewRepDocBody = lists:foldl( |
| fun({<<"_replication_state">> = K, State} = KV, Body) -> |
| case couch_util:get_value(K, Body) of |
| State -> |
| Body; |
| _ -> |
| Body1 = lists:keystore(K, 1, Body, KV), |
| lists:keystore( |
| <<"_replication_state_time">>, 1, |
| Body1, {<<"_replication_state_time">>, timestamp()}) |
| end; |
| ({K, _V} = KV, Body) -> |
| lists:keystore(K, 1, Body, KV) |
| end, |
| RepDocBody, |
| KVs |
| ), |
| case NewRepDocBody of |
| RepDocBody -> |
| ok; |
| _ -> |
| % might not succeed - when the replication doc is deleted right |
| % before this update (not an error) |
| couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, []) |
| end. |
| |
| % RFC3339 timestamps. |
| % Note: doesn't include the time seconds fraction (RFC3339 says it's optional). |
| timestamp() -> |
| {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()), |
| UTime = erlang:universaltime(), |
| LocalTime = calendar:universal_time_to_local_time(UTime), |
| DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) - |
| calendar:datetime_to_gregorian_seconds(UTime), |
| zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60), |
| iolist_to_binary( |
| io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s", |
| [Year, Month, Day, Hour, Min, Sec, |
| zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])). |
| |
| zone(Hr, Min) when Hr >= 0, Min >= 0 -> |
| io_lib:format("+~2..0w:~2..0w", [Hr, Min]); |
| zone(Hr, Min) -> |
| io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]). |
| |
| |
| maybe_set_triggered({RepProps} = RepDoc, RepId) -> |
| case couch_util:get_value(<<"_replication_state">>, RepProps) of |
| <<"triggered">> -> |
| ok; |
| _ -> |
| update_rep_doc( |
| RepDoc, |
| [ |
| {<<"_replication_state">>, <<"triggered">>}, |
| {<<"_replication_id">>, ?l2b(RepId)} |
| ] |
| ) |
| end. |
| |
| ensure_rep_db_exists() -> |
| DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")), |
| Opts = [ |
| {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}, |
| sys_db |
| ], |
| case couch_db:open(DbName, Opts) of |
| {ok, Db} -> |
| Db; |
| _Error -> |
| {ok, Db} = couch_db:create(DbName, Opts) |
| end, |
| ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>), |
| {ok, Db}. |
| |
| ensure_rep_ddoc_exists(RepDb, DDocID) -> |
| case couch_db:open_doc(RepDb, DDocID, []) of |
| {ok, _Doc} -> |
| ok; |
| _ -> |
| DDoc = couch_doc:from_json_obj({[ |
| {<<"_id">>, DDocID}, |
| {<<"language">>, <<"javascript">>}, |
| {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN} |
| ]}), |
| {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, []) |
| end, |
| ok. |
| |
| source_db_update_notifier(#db{name = DbName}) -> |
| Server = self(), |
| {ok, Notifier} = couch_db_update_notifier:start_link( |
| fun({compacted, DbName1}) when DbName1 =:= DbName -> |
| ok = gen_server:cast(Server, reopen_source_db); |
| (_) -> |
| ok |
| end), |
| Notifier; |
| source_db_update_notifier(_) -> |
| nil. |
| |
| target_db_update_notifier(#db{name = DbName}) -> |
| Server = self(), |
| {ok, Notifier} = couch_db_update_notifier:start_link( |
| fun({compacted, DbName1}) when DbName1 =:= DbName -> |
| ok = gen_server:cast(Server, reopen_target_db); |
| (_) -> |
| ok |
| end), |
| Notifier; |
| target_db_update_notifier(_) -> |
| nil. |