| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_replicator_scheduler_job). |
| |
| -behaviour(gen_server). |
| |
| -export([ |
| start_link/1 |
| ]). |
| |
| -export([ |
| init/1, |
| terminate/2, |
| handle_call/3, |
| handle_info/2, |
| handle_cast/2, |
| code_change/3, |
| format_status/2, |
| sum_stats/2, |
| report_seq_done/3 |
| ]). |
| |
| -include_lib("couch/include/couch_db.hrl"). |
| -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). |
| -include("couch_replicator.hrl"). |
| |
| -import(couch_util, [ |
| get_value/2, |
| get_value/3, |
| to_binary/1 |
| ]). |
| |
| -import(couch_replicator_utils, [ |
| pp_rep_id/1 |
| ]). |
| |
| -define(LOWEST_SEQ, 0). |
| -define(DEFAULT_CHECKPOINT_INTERVAL, 30000). |
| -define(STARTUP_JITTER_DEFAULT, 5000). |
| |
| -record(rep_state, { |
| rep_details, |
| source_name, |
| target_name, |
| source, |
| target, |
| history, |
| checkpoint_history, |
| start_seq, |
| committed_seq, |
| current_through_seq, |
| seqs_in_progress = [], |
| highest_seq_done = {0, ?LOWEST_SEQ}, |
| source_log, |
| target_log, |
| rep_starttime, |
| src_starttime, |
| tgt_starttime, |
| % checkpoint timer |
| timer, |
| changes_queue, |
| changes_manager, |
| changes_reader, |
| workers, |
| stats = couch_replicator_stats:new(), |
| session_id, |
| source_seq = nil, |
| use_checkpoints = true, |
| checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL, |
| type = db, |
| view = nil |
| }). |
| |
| start_link(#rep{id = Id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) -> |
| RepChildId = BaseId ++ Ext, |
| Source = couch_replicator_api_wrap:db_uri(Src), |
| Target = couch_replicator_api_wrap:db_uri(Tgt), |
| case couch_replicator_pg:should_start(Id, node()) of |
| yes -> |
| case gen_server:start_link(?MODULE, Rep, []) of |
| {ok, Pid} -> |
| couch_replicator_pg:join(Id, Pid), |
| {ok, Pid}; |
| {error, Reason} -> |
| couch_log:warning( |
| "failed to start replication `~s` (`~s` -> `~s`)", |
| [RepChildId, Source, Target] |
| ), |
| {error, Reason} |
| end; |
| {no, OtherPid} -> |
| {error, {already_started, OtherPid}} |
| end. |
| |
| init(InitArgs) -> |
| {ok, InitArgs, 0}. |
| |
| do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx = UserCtx} = Rep) -> |
| process_flag(trap_exit, true), |
| |
| timer:sleep(startup_jitter()), |
| |
| #rep_state{ |
| source = Source, |
| target = Target, |
| source_name = SourceName, |
| target_name = TargetName, |
| start_seq = {_Ts, StartSeq}, |
| highest_seq_done = {_, HighestSeq}, |
| checkpoint_interval = CheckpointInterval |
| } = State = init_state(Rep), |
| |
| NumWorkers = get_value(worker_processes, Options), |
| BatchSize = get_value(worker_batch_size, Options), |
| {ok, ChangesQueue} = couch_work_queue:new([ |
| {max_items, BatchSize * NumWorkers * 2}, |
| {max_size, 100 * 1024 * NumWorkers} |
| ]), |
| % This starts the _changes reader process. It adds the changes from |
| % the source db to the ChangesQueue. |
| {ok, ChangesReader} = couch_replicator_changes_reader:start_link( |
| StartSeq, Source, ChangesQueue, Options |
| ), |
| % Changes manager - responsible for dequeing batches from the changes queue |
| % and deliver them to the worker processes. |
| ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize), |
| % This starts the worker processes. They ask the changes queue manager for a |
| % a batch of _changes rows to process -> check which revs are missing in the |
| % target, and for the missing ones, it copies them from the source to the target. |
| Workers = lists:map( |
| fun(_) -> |
| couch_stats:increment_counter([couch_replicator, workers_started]), |
| {ok, Pid} = couch_replicator_worker:start_link( |
| self(), Source, Target, ChangesManager, Options |
| ), |
| Pid |
| end, |
| lists:seq(1, NumWorkers) |
| ), |
| |
| couch_task_status:add_task( |
| [ |
| {type, replication}, |
| {user, UserCtx#user_ctx.name}, |
| {replication_id, ?l2b(BaseId ++ Ext)}, |
| {database, Rep#rep.db_name}, |
| {doc_id, Rep#rep.doc_id}, |
| {source, ?l2b(SourceName)}, |
| {target, ?l2b(TargetName)}, |
| {continuous, get_value(continuous, Options, false)}, |
| {source_seq, seq_encode(HighestSeq)}, |
| {checkpoint_interval, CheckpointInterval} |
| ] ++ rep_stats(State) |
| ), |
| couch_task_status:set_update_frequency(1000), |
| |
| % Until OTP R14B03: |
| % |
| % Restarting a temporary supervised child implies that the original arguments |
| % (#rep{} record) specified in the MFA component of the supervisor |
| % child spec will always be used whenever the child is restarted. |
| % This implies the same replication performance tunning parameters will |
| % always be used. The solution is to delete the child spec (see |
| % cancel_replication/1) and then start the replication again, but this is |
| % unfortunately not immune to race conditions. |
| |
| log_replication_start(State), |
| couch_log:debug("Worker pids are: ~p", [Workers]), |
| |
| doc_update_triggered(Rep), |
| |
| {ok, State#rep_state{ |
| changes_queue = ChangesQueue, |
| changes_manager = ChangesManager, |
| changes_reader = ChangesReader, |
| workers = Workers |
| }}. |
| |
| handle_call({report_seq_done, Seq, StatsInc}, From, State) -> |
| #rep_state{ |
| seqs_in_progress = SeqsInProgress, |
| highest_seq_done = HighestDone, |
| current_through_seq = ThroughSeq, |
| stats = Stats |
| } = State, |
| gen_server:reply(From, ok), |
| {NewThroughSeq0, NewSeqsInProgress} = |
| case SeqsInProgress of |
| [] -> |
| {Seq, []}; |
| [Seq | Rest] -> |
| {Seq, Rest}; |
| [_ | _] -> |
| {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)} |
| end, |
| NewHighestDone = lists:max([HighestDone, Seq]), |
| NewThroughSeq = |
| case NewSeqsInProgress of |
| [] -> |
| lists:max([NewThroughSeq0, NewHighestDone]); |
| _ -> |
| NewThroughSeq0 |
| end, |
| couch_log:debug( |
| "Worker reported seq ~p, through seq was ~p, " |
| "new through seq is ~p, highest seq done was ~p, " |
| "new highest seq done is ~p~n" |
| "Seqs in progress were: ~p~nSeqs in progress are now: ~p", |
| [ |
| Seq, |
| ThroughSeq, |
| NewThroughSeq, |
| HighestDone, |
| NewHighestDone, |
| SeqsInProgress, |
| NewSeqsInProgress |
| ] |
| ), |
| NewState = State#rep_state{ |
| stats = couch_replicator_utils:sum_stats(Stats, StatsInc), |
| current_through_seq = NewThroughSeq, |
| seqs_in_progress = NewSeqsInProgress, |
| highest_seq_done = NewHighestDone |
| }, |
| update_task(NewState), |
| {noreply, NewState}. |
| |
| handle_cast({sum_stats, Stats}, State) -> |
| NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats), |
| {noreply, State#rep_state{stats = NewStats}}; |
| handle_cast(checkpoint, #rep_state{rep_details = Rep} = State) -> |
| case couch_replicator_pg:should_run(Rep#rep.id, self()) of |
| yes -> |
| case do_checkpoint(State) of |
| {ok, NewState} -> |
| couch_stats:increment_counter([couch_replicator, checkpoints, success]), |
| {noreply, NewState#rep_state{timer = start_timer(State)}}; |
| Error -> |
| couch_stats:increment_counter([couch_replicator, checkpoints, failure]), |
| {stop, Error, State} |
| end; |
| {no, OtherPid} -> |
| {stop, {shutdown, {duplicate_job, OtherPid}}, State} |
| end; |
| handle_cast( |
| {report_seq, Seq}, |
| #rep_state{seqs_in_progress = SeqsInProgress} = State |
| ) -> |
| NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress), |
| {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}. |
| |
| handle_info(shutdown, St) -> |
| {stop, shutdown, St}; |
| handle_info({'EXIT', Pid, max_backoff}, State) -> |
| couch_log:error("Max backoff reached child process ~p", [Pid]), |
| {stop, {shutdown, max_backoff}, State}; |
| handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) -> |
| couch_log:error("Max backoff reached child process ~p", [Pid]), |
| {stop, {shutdown, max_backoff}, State}; |
| handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader = Pid} = State) -> |
| {noreply, State}; |
| handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader = Pid} = State) -> |
| couch_stats:increment_counter([couch_replicator, changes_reader_deaths]), |
| Reason = |
| case Reason0 of |
| {changes_req_failed, _, _} = HttpFail -> |
| HttpFail; |
| {http_request_failed, _, _, {error, {code, Code}}} -> |
| {changes_req_failed, Code}; |
| {http_request_failed, _, _, {error, Err}} -> |
| {changes_req_failed, Err}; |
| Other -> |
| {changes_reader_died, Other} |
| end, |
| couch_log:error("ChangesReader process died with reason: ~p", [Reason]), |
| {stop, {shutdown, Reason}, cancel_timer(State)}; |
| handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) -> |
| {noreply, State}; |
| handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) -> |
| couch_stats:increment_counter([couch_replicator, changes_manager_deaths]), |
| couch_log:error("ChangesManager process died with reason: ~p", [Reason]), |
| {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timer(State)}; |
| handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue = Pid} = State) -> |
| {noreply, State}; |
| handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue = Pid} = State) -> |
| couch_stats:increment_counter([couch_replicator, changes_queue_deaths]), |
| couch_log:error("ChangesQueue process died with reason: ~p", [Reason]), |
| {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timer(State)}; |
| handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) -> |
| case Workers -- [Pid] of |
| Workers -> |
| couch_log:error("unknown pid bit the dust ~p ~n", [Pid]), |
| {noreply, State#rep_state{workers = Workers}}; |
| %% not clear why a stop was here before |
| %%{stop, {unknown_process_died, Pid, normal}, State}; |
| [] -> |
| catch unlink(State#rep_state.changes_manager), |
| catch exit(State#rep_state.changes_manager, kill), |
| do_last_checkpoint(State); |
| Workers2 -> |
| {noreply, State#rep_state{workers = Workers2}} |
| end; |
| handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) -> |
| State2 = cancel_timer(State), |
| case lists:member(Pid, Workers) of |
| false -> |
| {stop, {unknown_process_died, Pid, Reason}, State2}; |
| true -> |
| couch_stats:increment_counter([couch_replicator, worker_deaths]), |
| StopReason = |
| case Reason of |
| {shutdown, _} = Err -> |
| Err; |
| Other -> |
| couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]), |
| {worker_died, Pid, Other} |
| end, |
| {stop, StopReason, State2} |
| end; |
| handle_info(timeout, InitArgs) -> |
| try do_init(InitArgs) of |
| {ok, State} -> |
| {noreply, State} |
| catch |
| exit:{http_request_failed, _, _, max_backoff} -> |
| {stop, {shutdown, max_backoff}, {error, InitArgs}}; |
| Class:Error:Stack -> |
| ShutdownReason = {error, replication_start_error(Error)}, |
| StackTop2 = lists:sublist(Stack, 2), |
| % Shutdown state is a hack as it is not really the state of the |
| % gen_server (it failed to initialize, so it doesn't have one). |
| % Shutdown state is used to pass extra info about why start failed. |
| ShutdownState = {error, Class, StackTop2, InitArgs}, |
| {stop, {shutdown, ShutdownReason}, ShutdownState} |
| end; |
| handle_info({Ref, Tuple}, State) when is_reference(Ref), is_tuple(Tuple) -> |
| % Ignore responses from timed-out or retried ibrowse calls. Aliases in |
| % Erlang 24 should help with this problem, so we should revisit this clause |
| % when we update our minimum Erlang version to >= 24. |
| {noreply, State}. |
| |
| terminate( |
| normal, |
| #rep_state{ |
| rep_details = #rep{id = RepId} = Rep, |
| checkpoint_history = CheckpointHistory |
| } = State |
| ) -> |
| terminate_cleanup(State), |
| couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}), |
| doc_update_completed(Rep, rep_stats(State)); |
| terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) -> |
| % Replication stopped via _scheduler_sup:terminate_child/1, which can be |
| % occur during regular scheduler operation or when job is removed from |
| % the scheduler. |
| State1 = |
| case do_checkpoint(State) of |
| {ok, NewState} -> |
| NewState; |
| Error -> |
| LogMsg = "~p : Failed last checkpoint. Job: ~p Error: ~p", |
| couch_log:error(LogMsg, [?MODULE, RepId, Error]), |
| State |
| end, |
| couch_replicator_notifier:notify({stopped, RepId, <<"stopped">>}), |
| terminate_cleanup(State1); |
| terminate({shutdown, max_backoff}, {error, InitArgs}) -> |
| #rep{id = {BaseId, Ext} = RepId} = InitArgs, |
| couch_replicator_pg:leave(RepId, self()), |
| couch_stats:increment_counter([couch_replicator, failed_starts]), |
| couch_log:warning("Replication `~s` reached max backoff ", [BaseId ++ Ext]), |
| couch_replicator_notifier:notify({error, RepId, max_backoff}); |
| terminate({shutdown, {error, Error}}, {error, Class, Stack, InitArgs}) -> |
| #rep{ |
| id = {BaseId, Ext} = RepId, |
| source = Source0, |
| target = Target0, |
| doc_id = DocId, |
| db_name = DbName |
| } = InitArgs, |
| couch_replicator_pg:leave(RepId, self()), |
| Source = couch_replicator_api_wrap:db_uri(Source0), |
| Target = couch_replicator_api_wrap:db_uri(Target0), |
| RepIdStr = BaseId ++ Ext, |
| Msg = "~p:~p: Replication ~s failed to start ~p -> ~p doc ~p:~p stack:~p", |
| couch_log:error(Msg, [ |
| Class, |
| Error, |
| RepIdStr, |
| Source, |
| Target, |
| DbName, |
| DocId, |
| Stack |
| ]), |
| couch_stats:increment_counter([couch_replicator, failed_starts]), |
| couch_replicator_notifier:notify({error, RepId, Error}); |
| terminate({shutdown, max_backoff}, State) -> |
| #rep_state{ |
| source_name = Source, |
| target_name = Target, |
| rep_details = #rep{id = {BaseId, Ext} = RepId} |
| } = State, |
| couch_log:error( |
| "Replication `~s` (`~s` -> `~s`) reached max backoff", |
| [BaseId ++ Ext, Source, Target] |
| ), |
| terminate_cleanup(State), |
| couch_replicator_notifier:notify({error, RepId, max_backoff}); |
| terminate({shutdown, {duplicate_job, OtherPid}}, State) -> |
| #rep_state{ |
| source_name = Source, |
| target_name = Target, |
| rep_details = #rep{id = {BaseId, Ext} = RepId} |
| } = State, |
| couch_log:error( |
| "Replication `~s` (`~s` -> `~s`) with pid ~p was usurped by ~p on node ~p", |
| [BaseId ++ Ext, Source, Target, self(), OtherPid, node(OtherPid)] |
| ), |
| terminate_cleanup(State), |
| couch_replicator_notifier:notify({error, RepId, duplicate_job}); |
| terminate({shutdown, Reason}, State) -> |
| % Unwrap so when reporting we don't have an extra {shutdown, ...} tuple |
| % wrapped around the message |
| terminate(Reason, State); |
| terminate(Reason, State) -> |
| #rep_state{ |
| source_name = Source, |
| target_name = Target, |
| rep_details = #rep{id = {BaseId, Ext} = RepId} |
| } = State, |
| couch_log:error( |
| "Replication `~s` (`~s` -> `~s`) failed: ~s", |
| [BaseId ++ Ext, Source, Target, to_binary(Reason)] |
| ), |
| terminate_cleanup(State), |
| couch_replicator_notifier:notify({error, RepId, Reason}). |
| |
| terminate_cleanup(#rep_state{rep_details = #rep{id = RepId}} = State) -> |
| couch_replicator_pg:leave(RepId, self()), |
| update_task(State), |
| couch_replicator_api_wrap:db_close(State#rep_state.source), |
| couch_replicator_api_wrap:db_close(State#rep_state.target). |
| |
| code_change(_OldVsn, #rep_state{} = State, _Extra) -> |
| {ok, State}. |
| |
| format_status(_Opt, [_PDict, State]) -> |
| #rep_state{ |
| source = Source, |
| target = Target, |
| rep_details = RepDetails, |
| start_seq = StartSeq, |
| source_seq = SourceSeq, |
| committed_seq = CommitedSeq, |
| current_through_seq = ThroughSeq, |
| highest_seq_done = HighestSeqDone, |
| session_id = SessionId |
| } = state_strip_creds(State), |
| #rep{ |
| id = RepId, |
| options = Options, |
| doc_id = DocId, |
| db_name = DbName |
| } = RepDetails, |
| [ |
| {rep_id, RepId}, |
| {source, couch_replicator_api_wrap:db_uri(Source)}, |
| {target, couch_replicator_api_wrap:db_uri(Target)}, |
| {db_name, DbName}, |
| {doc_id, DocId}, |
| {options, Options}, |
| {session_id, SessionId}, |
| {start_seq, StartSeq}, |
| {source_seq, SourceSeq}, |
| {committed_seq, CommitedSeq}, |
| {current_through_seq, ThroughSeq}, |
| {highest_seq_done, HighestSeqDone} |
| ]. |
| |
| sum_stats(Pid, Stats) when is_pid(Pid) -> |
| gen_server:cast(Pid, {sum_stats, Stats}). |
| |
| report_seq_done(Pid, ReportSeq, Stats) when is_pid(Pid) -> |
| gen_server:call(Pid, {report_seq_done, ReportSeq, Stats}, infinity). |
| |
| startup_jitter() -> |
| Jitter = config:get_integer( |
| "replicator", |
| "startup_jitter", |
| ?STARTUP_JITTER_DEFAULT |
| ), |
| couch_rand:uniform(erlang:max(1, Jitter)). |
| |
| headers_strip_creds([], Acc) -> |
| lists:reverse(Acc); |
| headers_strip_creds([{Key, Value0} | Rest], Acc) -> |
| Value = |
| case string:to_lower(Key) of |
| "authorization" -> |
| "****"; |
| _ -> |
| Value0 |
| end, |
| headers_strip_creds(Rest, [{Key, Value} | Acc]). |
| |
| httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) -> |
| HttpDb#httpdb{ |
| url = couch_util:url_strip_password(Url), |
| headers = headers_strip_creds(Headers, []) |
| }; |
| httpdb_strip_creds(LocalDb) -> |
| LocalDb. |
| |
| rep_strip_creds(#rep{source = Source, target = Target} = Rep) -> |
| Rep#rep{ |
| source = httpdb_strip_creds(Source), |
| target = httpdb_strip_creds(Target) |
| }. |
| |
| state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) -> |
| % #rep_state contains the source and target at the top level and also |
| % in the nested #rep_details record |
| State#rep_state{ |
| rep_details = rep_strip_creds(Rep), |
| source = httpdb_strip_creds(Source), |
| target = httpdb_strip_creds(Target) |
| }. |
| |
| adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) -> |
| Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p", |
| couch_log:notice(Msg, [RepId]), |
| Src#httpdb{http_connections = 2}; |
| adjust_maxconn(Src, _RepId) -> |
| Src. |
| |
| -spec doc_update_triggered(#rep{}) -> ok. |
| doc_update_triggered(#rep{db_name = null}) -> |
| ok; |
| doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) -> |
| case couch_replicator_doc_processor:update_docs() of |
| true -> |
| couch_replicator_docs:update_triggered(Rep, RepId); |
| false -> |
| ok |
| end, |
| couch_log:notice( |
| "Document `~s` triggered replication `~s`", |
| [DocId, pp_rep_id(RepId)] |
| ), |
| ok. |
| |
| -spec doc_update_completed(#rep{}, list()) -> ok. |
| doc_update_completed(#rep{db_name = null}, _Stats) -> |
| ok; |
| doc_update_completed( |
| #rep{ |
| id = RepId, |
| doc_id = DocId, |
| db_name = DbName, |
| start_time = StartTime |
| }, |
| Stats0 |
| ) -> |
| Stats = Stats0 ++ [{start_time, couch_replicator_utils:iso8601(StartTime)}], |
| couch_replicator_docs:update_doc_completed(DbName, DocId, Stats), |
| couch_log:notice( |
| "Replication `~s` completed (triggered by `~s`)", |
| [pp_rep_id(RepId), DocId] |
| ), |
| ok. |
| |
| do_last_checkpoint( |
| #rep_state{ |
| seqs_in_progress = [], |
| highest_seq_done = {_Ts, ?LOWEST_SEQ} |
| } = State |
| ) -> |
| {stop, normal, cancel_timer(State)}; |
| do_last_checkpoint( |
| #rep_state{ |
| seqs_in_progress = [], |
| highest_seq_done = Seq |
| } = State |
| ) -> |
| case do_checkpoint(State#rep_state{current_through_seq = Seq}) of |
| {ok, NewState} -> |
| couch_stats:increment_counter([couch_replicator, checkpoints, success]), |
| {stop, normal, cancel_timer(NewState)}; |
| Error -> |
| couch_stats:increment_counter([couch_replicator, checkpoints, failure]), |
| {stop, Error, State} |
| end. |
| |
| start_timer(State) -> |
| After = State#rep_state.checkpoint_interval, |
| case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of |
| {ok, Ref} -> |
| Ref; |
| Error -> |
| couch_log:error("Replicator, error scheduling checkpoint: ~p", [Error]), |
| nil |
| end. |
| |
| cancel_timer(#rep_state{timer = nil} = State) -> |
| State; |
| cancel_timer(#rep_state{timer = Timer} = State) -> |
| {ok, cancel} = timer:cancel(Timer), |
| State#rep_state{timer = nil}. |
| |
| init_state(Rep) -> |
| #rep{ |
| id = {BaseId, _Ext}, |
| source = Src0, |
| target = Tgt, |
| options = Options, |
| type = Type, |
| view = View, |
| start_time = StartTime, |
| stats = ArgStats0 |
| } = Rep, |
| % Adjust minimum number of http source connections to 2 to avoid deadlock |
| Src = adjust_maxconn(Src0, BaseId), |
| {ok, Source} = couch_replicator_api_wrap:db_open(Src), |
| {CreateTargetParams} = get_value(create_target_params, Options, {[]}), |
| {ok, Target} = couch_replicator_api_wrap:db_open( |
| Tgt, |
| get_value(create_target, Options, false), |
| CreateTargetParams |
| ), |
| |
| {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source), |
| {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target), |
| |
| [SourceLog, TargetLog] = find_and_migrate_logs([Source, Target], Rep), |
| |
| {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog), |
| |
| ArgStats1 = couch_replicator_stats:new(ArgStats0), |
| HistoryStats = |
| case History of |
| [{[_ | _] = HProps} | _] -> couch_replicator_stats:new(HProps); |
| _ -> couch_replicator_stats:new() |
| end, |
| Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats), |
| |
| StartSeq1 = get_value(since_seq, Options, StartSeq0), |
| StartSeq = {0, StartSeq1}, |
| |
| SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ), |
| |
| #doc{body = {CheckpointHistory}} = SourceLog, |
| State = #rep_state{ |
| rep_details = Rep, |
| source_name = couch_replicator_api_wrap:db_uri(Source), |
| target_name = couch_replicator_api_wrap:db_uri(Target), |
| source = Source, |
| target = Target, |
| history = History, |
| checkpoint_history = {[{<<"no_changes">>, true} | CheckpointHistory]}, |
| start_seq = StartSeq, |
| current_through_seq = StartSeq, |
| committed_seq = StartSeq, |
| source_log = SourceLog, |
| target_log = TargetLog, |
| rep_starttime = StartTime, |
| src_starttime = get_value(<<"instance_start_time">>, SourceInfo), |
| tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo), |
| session_id = couch_uuids:random(), |
| source_seq = SourceSeq, |
| use_checkpoints = get_value(use_checkpoints, Options, true), |
| checkpoint_interval = get_value( |
| checkpoint_interval, |
| Options, |
| ?DEFAULT_CHECKPOINT_INTERVAL |
| ), |
| type = Type, |
| view = View, |
| stats = Stats |
| }, |
| State#rep_state{timer = start_timer(State)}. |
| |
| find_and_migrate_logs(DbList, #rep{id = {BaseId, _}} = Rep) -> |
| LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId), |
| fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []). |
| |
| fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) -> |
| lists:reverse(Acc); |
| fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) -> |
| case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of |
| {error, <<"not_found">>} when Vsn > 1 -> |
| OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1), |
| fold_replication_logs( |
| Dbs, |
| Vsn - 1, |
| ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), |
| NewId, |
| Rep, |
| Acc |
| ); |
| {error, <<"not_found">>} -> |
| fold_replication_logs( |
| Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc] |
| ); |
| {ok, Doc} when LogId =:= NewId -> |
| fold_replication_logs( |
| Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc] |
| ); |
| {ok, Doc} -> |
| MigratedLog = #doc{id = NewId, body = Doc#doc.body}, |
| maybe_save_migrated_log(Rep, Db, MigratedLog, Doc#doc.id), |
| fold_replication_logs( |
| Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc] |
| ) |
| end. |
| |
| maybe_save_migrated_log(Rep, Db, #doc{} = Doc, OldId) -> |
| case get_value(use_checkpoints, Rep#rep.options, true) of |
| true -> |
| update_checkpoint(Db, Doc), |
| Msg = "Migrated replication checkpoint. Db:~p ~p -> ~p", |
| couch_log:notice(Msg, [httpdb_strip_creds(Db), OldId, Doc#doc.id]); |
| false -> |
| ok |
| end. |
| |
| spawn_changes_manager(Parent, ChangesQueue, BatchSize) -> |
| spawn_link(fun() -> |
| changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1) |
| end). |
| |
| changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) -> |
| receive |
| {get_changes, From} -> |
| case couch_work_queue:dequeue(ChangesQueue, BatchSize) of |
| closed -> |
| From ! {closed, self()}; |
| {ok, ChangesOrLastSeqs} -> |
| ReportSeq = |
| case lists:last(ChangesOrLastSeqs) of |
| {last_seq, Seq} -> |
| {Ts, Seq}; |
| #doc_info{high_seq = Seq} -> |
| {Ts, Seq} |
| end, |
| Changes = lists:filter( |
| fun |
| (#doc_info{}) -> |
| true; |
| ({last_seq, _Seq}) -> |
| false |
| end, |
| ChangesOrLastSeqs |
| ), |
| ok = gen_server:cast(Parent, {report_seq, ReportSeq}), |
| From ! {changes, self(), Changes, ReportSeq} |
| end, |
| changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1) |
| end. |
| |
| do_checkpoint(#rep_state{use_checkpoints = false} = State) -> |
| NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]}}, |
| {ok, NewState}; |
| do_checkpoint(#rep_state{current_through_seq = Seq, committed_seq = Seq} = State) -> |
| update_task(State), |
| {ok, State}; |
| do_checkpoint(State) -> |
| #rep_state{ |
| source_name = SourceName, |
| target_name = TargetName, |
| source = Source, |
| target = Target, |
| history = OldHistory, |
| start_seq = {_, StartSeq}, |
| current_through_seq = {_Ts, NewSeq} = NewTsSeq, |
| source_log = SourceLog, |
| target_log = TargetLog, |
| rep_starttime = ReplicationStartTime, |
| src_starttime = SrcInstanceStartTime, |
| tgt_starttime = TgtInstanceStartTime, |
| stats = Stats, |
| rep_details = #rep{options = Options}, |
| session_id = SessionId |
| } = State, |
| case commit_to_both(Source, Target) of |
| {source_error, Reason} -> |
| {checkpoint_commit_failure, |
| <<"Failure on source commit: ", (to_binary(Reason))/binary>>}; |
| {target_error, Reason} -> |
| {checkpoint_commit_failure, |
| <<"Failure on target commit: ", (to_binary(Reason))/binary>>}; |
| {<<S/binary>>, <<T/binary>>} when |
| % Handle upgrades from 3.2 to 3.3 better by expecting 0 to be |
| % returned if endpoints are upgraded while the replication job is running. |
| % TODO: Remove the `0` special case in a future release (4.x or 5.x) |
| (S =:= SrcInstanceStartTime orelse S =:= <<"0">> orelse |
| SrcInstanceStartTime =:= <<"0">>) andalso |
| (T =:= TgtInstanceStartTime orelse T =:= <<"0">> orelse |
| TgtInstanceStartTime =:= <<"0">>) |
| -> |
| couch_log:notice( |
| "recording a checkpoint for `~s` -> `~s` at source update_seq ~p", |
| [SourceName, TargetName, NewSeq] |
| ), |
| LocalStartTime = calendar:now_to_local_time(ReplicationStartTime), |
| StartTime = ?l2b(httpd_util:rfc1123_date(LocalStartTime)), |
| EndTime = ?l2b(httpd_util:rfc1123_date()), |
| NewHistoryEntry = |
| {[ |
| {<<"session_id">>, SessionId}, |
| {<<"start_time">>, StartTime}, |
| {<<"end_time">>, EndTime}, |
| {<<"start_last_seq">>, StartSeq}, |
| {<<"end_last_seq">>, NewSeq}, |
| {<<"recorded_seq">>, NewSeq}, |
| {<<"missing_checked">>, couch_replicator_stats:missing_checked(Stats)}, |
| {<<"missing_found">>, couch_replicator_stats:missing_found(Stats)}, |
| {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)}, |
| {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)}, |
| {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}, |
| {<<"bulk_get_docs">>, couch_replicator_stats:bulk_get_docs(Stats)}, |
| {<<"bulk_get_attempts">>, couch_replicator_stats:bulk_get_attempts(Stats)} |
| ]}, |
| BaseHistory = |
| [ |
| {<<"session_id">>, SessionId}, |
| {<<"source_last_seq">>, NewSeq}, |
| {<<"replication_id_version">>, ?REP_ID_VERSION} |
| ] ++ |
| case get_value(doc_ids, Options) of |
| undefined -> |
| []; |
| _DocIds -> |
| % backwards compatibility with the result of a replication by |
| % doc IDs in versions 0.11.x and 1.0.x |
| % TODO: deprecate (use same history format, simplify code) |
| [ |
| {<<"start_time">>, StartTime}, |
| {<<"end_time">>, EndTime}, |
| {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)}, |
| {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)}, |
| {<<"doc_write_failures">>, |
| couch_replicator_stats:doc_write_failures(Stats)} |
| ] |
| end, |
| % limit history to 50 entries |
| NewRepHistory = { |
| BaseHistory ++ |
| [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}] |
| }, |
| |
| try |
| {SrcRevPos, SrcRevId} = update_checkpoint( |
| Source, SourceLog#doc{body = NewRepHistory}, source |
| ), |
| {TgtRevPos, TgtRevId} = update_checkpoint( |
| Target, TargetLog#doc{body = NewRepHistory}, target |
| ), |
| NewState = State#rep_state{ |
| checkpoint_history = NewRepHistory, |
| committed_seq = NewTsSeq, |
| source_log = SourceLog#doc{revs = {SrcRevPos, [SrcRevId]}}, |
| target_log = TargetLog#doc{revs = {TgtRevPos, [TgtRevId]}} |
| }, |
| update_task(NewState), |
| {ok, NewState} |
| catch |
| throw:{checkpoint_commit_failure, _} = Failure -> |
| Failure |
| end; |
| {SrcInstanceStartTime, _NewTgtInstanceStartTime} -> |
| {checkpoint_commit_failure, << |
| "instance_start_time on target database has changed since last checkpoint." |
| >>}; |
| {_NewSrcInstanceStartTime, TgtInstanceStartTime} -> |
| {checkpoint_commit_failure, << |
| "instance_start_time on source database has changed since last checkpoint." |
| >>}; |
| {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} -> |
| {checkpoint_commit_failure, << |
| "instance_start_time on source and target database has changed since last checkpoint." |
| >>} |
| end. |
| |
| update_checkpoint(Db, Doc, DbType) -> |
| try |
| update_checkpoint(Db, Doc) |
| catch |
| throw:{checkpoint_commit_failure, Reason} -> |
| throw( |
| {checkpoint_commit_failure, |
| <<"Error updating the ", (to_binary(DbType))/binary, " checkpoint document: ", |
| (to_binary(Reason))/binary>>} |
| ) |
| end. |
| |
| update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) -> |
| try |
| case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of |
| {ok, PosRevId} -> |
| PosRevId; |
| {error, Reason} -> |
| throw({checkpoint_commit_failure, Reason}) |
| end |
| catch |
| throw:conflict -> |
| case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of |
| {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} -> |
| % This means that we were able to update successfully the |
| % checkpoint doc in a previous attempt but we got a connection |
| % error (timeout for e.g.) before receiving the success response. |
| % Therefore the request was retried and we got a conflict, as the |
| % revision we sent is not the current one. |
| % We confirm this by verifying the doc body we just got is the same |
| % that we have just sent. |
| {Pos, RevId}; |
| _ -> |
| throw({checkpoint_commit_failure, conflict}) |
| end |
| end. |
| |
| commit_to_both(Source, Target) -> |
| % commit the src async |
| ParentPid = self(), |
| SrcCommitPid = spawn_link( |
| fun() -> |
| Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)), |
| ParentPid ! {self(), Result} |
| end |
| ), |
| |
| % commit tgt sync |
| TargetResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)), |
| |
| SourceResult = |
| receive |
| {SrcCommitPid, Result} -> |
| unlink(SrcCommitPid), |
| receive |
| {'EXIT', SrcCommitPid, _} -> ok |
| after 0 -> ok |
| end, |
| Result; |
| {'EXIT', SrcCommitPid, Reason} -> |
| {error, Reason} |
| end, |
| case TargetResult of |
| {ok, TargetStartTime} -> |
| case SourceResult of |
| {ok, SourceStartTime} -> |
| {SourceStartTime, TargetStartTime}; |
| SourceError -> |
| {source_error, SourceError} |
| end; |
| TargetError -> |
| {target_error, TargetError} |
| end. |
| |
| compare_replication_logs(SrcDoc, TgtDoc) -> |
| #doc{body = {RepRecProps}} = SrcDoc, |
| #doc{body = {RepRecPropsTgt}} = TgtDoc, |
| case |
| get_value(<<"session_id">>, RepRecProps) == |
| get_value(<<"session_id">>, RepRecPropsTgt) |
| of |
| true -> |
| % if the records have the same session id, |
| % then we have a valid replication history |
| OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ), |
| OldHistory = get_value(<<"history">>, RepRecProps, []), |
| {OldSeqNum, OldHistory}; |
| false -> |
| SourceHistory = get_value(<<"history">>, RepRecProps, []), |
| TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []), |
| couch_log:notice( |
| "Replication records differ. " |
| "Scanning histories to find a common ancestor.", |
| [] |
| ), |
| couch_log:debug( |
| "Record on source:~p~nRecord on target:~p~n", |
| [RepRecProps, RepRecPropsTgt] |
| ), |
| compare_rep_history(SourceHistory, TargetHistory) |
| end. |
| |
| compare_rep_history(S, T) when S =:= [] orelse T =:= [] -> |
| couch_log:notice("no common ancestry -- performing full replication", []), |
| {?LOWEST_SEQ, []}; |
| compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) -> |
| SourceId = get_value(<<"session_id">>, S), |
| case has_session_id(SourceId, Target) of |
| true -> |
| RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ), |
| couch_log:notice( |
| "found a common replication record with source_seq ~p", |
| [RecordSeqNum] |
| ), |
| {RecordSeqNum, SourceRest}; |
| false -> |
| TargetId = get_value(<<"session_id">>, T), |
| case has_session_id(TargetId, SourceRest) of |
| true -> |
| RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ), |
| couch_log:notice( |
| "found a common replication record with source_seq ~p", |
| [RecordSeqNum] |
| ), |
| {RecordSeqNum, TargetRest}; |
| false -> |
| compare_rep_history(SourceRest, TargetRest) |
| end |
| end. |
| |
| has_session_id(_SessionId, []) -> |
| false; |
| has_session_id(SessionId, [{Props} | Rest]) -> |
| case get_value(<<"session_id">>, Props, nil) of |
| SessionId -> |
| true; |
| _Else -> |
| has_session_id(SessionId, Rest) |
| end. |
| |
| get_pending_count(St) -> |
| Rep = St#rep_state.rep_details, |
| Timeout = get_value(connection_timeout, Rep#rep.options), |
| TimeoutMicro = Timeout * 1000, |
| case get(pending_count_state) of |
| {LastUpdate, PendingCount} -> |
| case timer:now_diff(os:timestamp(), LastUpdate) > TimeoutMicro of |
| true -> |
| NewPendingCount = get_pending_count_int(St), |
| put(pending_count_state, {os:timestamp(), NewPendingCount}), |
| NewPendingCount; |
| false -> |
| PendingCount |
| end; |
| undefined -> |
| NewPendingCount = get_pending_count_int(St), |
| put(pending_count_state, {os:timestamp(), NewPendingCount}), |
| NewPendingCount |
| end. |
| |
| get_pending_count_int(#rep_state{source = #httpdb{} = Db0} = St) -> |
| {_, Seq} = St#rep_state.highest_seq_done, |
| Db = Db0#httpdb{retries = 3}, |
| case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of |
| {ok, Pending} -> |
| Pending; |
| _ -> |
| null |
| end; |
| get_pending_count_int(#rep_state{source = Db} = St) -> |
| {_, Seq} = St#rep_state.highest_seq_done, |
| {ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq), |
| Pending. |
| |
| seq_encode(Seq) -> |
| couch_replicator_utils:seq_encode(Seq). |
| |
| update_task(State) -> |
| #rep_state{ |
| rep_details = #rep{id = JobId}, |
| current_through_seq = {_, ThroughSeq}, |
| highest_seq_done = {_, HighestSeq} |
| } = State, |
| Status = |
| rep_stats(State) ++ |
| [ |
| {source_seq, seq_encode(HighestSeq)}, |
| {through_seq, seq_encode(ThroughSeq)} |
| ], |
| couch_replicator_scheduler:update_job_stats(JobId, Status), |
| couch_task_status:update(Status). |
| |
| rep_stats(State) -> |
| #rep_state{ |
| committed_seq = {_, CommittedSeq}, |
| stats = Stats |
| } = State, |
| [ |
| {revisions_checked, couch_replicator_stats:missing_checked(Stats)}, |
| {missing_revisions_found, couch_replicator_stats:missing_found(Stats)}, |
| {docs_read, couch_replicator_stats:docs_read(Stats)}, |
| {docs_written, couch_replicator_stats:docs_written(Stats)}, |
| {changes_pending, get_pending_count(State)}, |
| {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)}, |
| {bulk_get_docs, couch_replicator_stats:bulk_get_docs(Stats)}, |
| {bulk_get_attempts, couch_replicator_stats:bulk_get_attempts(Stats)}, |
| {checkpointed_source_seq, seq_encode(CommittedSeq)} |
| ]. |
| |
| replication_start_error({unauthorized, DbUri}) -> |
| {unauthorized, <<"unauthorized to access or create database ", DbUri/binary>>}; |
| replication_start_error({db_not_found, DbUri}) -> |
| {db_not_found, <<"could not open ", DbUri/binary>>}; |
| replication_start_error( |
| {http_request_failed, _Method, Url0, {error, {error, {conn_failed, {error, nxdomain}}}}} |
| ) -> |
| Url = ?l2b(couch_util:url_strip_password(Url0)), |
| {nxdomain, <<"could not resolve ", Url/binary>>}; |
| replication_start_error({http_request_failed, Method0, Url0, {error, {code, Code}}}) when |
| is_integer(Code) |
| -> |
| Url = ?l2b(couch_util:url_strip_password(Url0)), |
| Method = ?l2b(Method0), |
| {http_error_code, Code, <<Method/binary, " ", Url/binary>>}; |
| replication_start_error(Error) -> |
| Error. |
| |
| log_replication_start(#rep_state{rep_details = Rep} = RepState) -> |
| #rep{ |
| id = {BaseId, Ext}, |
| doc_id = DocId, |
| db_name = DbName, |
| options = Options |
| } = Rep, |
| Id = BaseId ++ Ext, |
| Workers = get_value(worker_processes, Options), |
| BatchSize = get_value(worker_batch_size, Options), |
| #rep_state{ |
| % credentials already stripped |
| source_name = Source, |
| % credentials already stripped |
| target_name = Target, |
| session_id = Sid |
| } = RepState, |
| From = |
| case DbName of |
| ShardName when is_binary(ShardName) -> |
| io_lib:format("from doc ~s:~s", [mem3:dbname(ShardName), DocId]); |
| _ -> |
| "from _replicate endpoint" |
| end, |
| Msg = |
| "Starting replication ~s (~s -> ~s) ~s worker_procesess:~p" |
| " worker_batch_size:~p session_id:~s", |
| couch_log:notice(Msg, [Id, Source, Target, From, Workers, BatchSize, Sid]). |
| |
| -ifdef(TEST). |
| |
| -include_lib("couch/include/couch_eunit.hrl"). |
| |
| replication_start_error_test() -> |
| ?assertEqual( |
| {unauthorized, << |
| "unauthorized to access or create database" |
| " http://x/y" |
| >>}, |
| replication_start_error({unauthorized, <<"http://x/y">>}) |
| ), |
| ?assertEqual( |
| {db_not_found, <<"could not open http://x/y">>}, |
| replication_start_error({db_not_found, <<"http://x/y">>}) |
| ), |
| ?assertEqual( |
| {nxdomain, <<"could not resolve http://x/y">>}, |
| replication_start_error( |
| {http_request_failed, "GET", "http://x/y", |
| {error, {error, {conn_failed, {error, nxdomain}}}}} |
| ) |
| ), |
| ?assertEqual( |
| {http_error_code, 503, <<"GET http://x/y">>}, |
| replication_start_error({http_request_failed, "GET", "http://x/y", {error, {code, 503}}}) |
| ). |
| |
| format_status_test_() -> |
| { |
| foreach, |
| fun meck_config/0, |
| fun(_) -> meck:unload() end, |
| [ |
| ?TDEF_FE(t_scheduler_job_format_status) |
| ] |
| }. |
| |
| meck_config() -> |
| meck:expect(config, get, fun(_, _, Default) -> Default end). |
| |
| t_scheduler_job_format_status(_) -> |
| Source = <<"http://u:p@h1/d1">>, |
| Target = <<"http://u:p@h2/d2">>, |
| Rep = #rep{ |
| id = {"base", "+ext"}, |
| source = couch_replicator_parse:parse_rep_db(Source, [], []), |
| target = couch_replicator_parse:parse_rep_db(Target, [], []), |
| options = [{create_target, true}], |
| doc_id = <<"mydoc">>, |
| db_name = <<"mydb">> |
| }, |
| State = #rep_state{ |
| rep_details = Rep, |
| source = Rep#rep.source, |
| target = Rep#rep.target, |
| session_id = <<"a">>, |
| start_seq = <<"1">>, |
| source_seq = <<"2">>, |
| committed_seq = <<"3">>, |
| current_through_seq = <<"4">>, |
| highest_seq_done = <<"5">> |
| }, |
| Format = format_status(opts_ignored, [pdict, State]), |
| ?assertEqual("http://h1/d1/", proplists:get_value(source, Format)), |
| ?assertEqual("http://h2/d2/", proplists:get_value(target, Format)), |
| ?assertEqual({"base", "+ext"}, proplists:get_value(rep_id, Format)), |
| ?assertEqual([{create_target, true}], proplists:get_value(options, Format)), |
| ?assertEqual(<<"mydoc">>, proplists:get_value(doc_id, Format)), |
| ?assertEqual(<<"mydb">>, proplists:get_value(db_name, Format)), |
| ?assertEqual(<<"a">>, proplists:get_value(session_id, Format)), |
| ?assertEqual(<<"1">>, proplists:get_value(start_seq, Format)), |
| ?assertEqual(<<"2">>, proplists:get_value(source_seq, Format)), |
| ?assertEqual(<<"3">>, proplists:get_value(committed_seq, Format)), |
| ?assertEqual(<<"4">>, proplists:get_value(current_through_seq, Format)), |
| ?assertEqual(<<"5">>, proplists:get_value(highest_seq_done, Format)). |
| |
| -endif. |