Merge remote-tracking branch 'github/pr/4'
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..f6cd2bc
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index e7d2ee0..f5055e0 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -25,6 +25,8 @@
-export([init/1, terminate/2, code_change/3]).
-export([handle_call/3, handle_cast/2, handle_info/2]).
+-export([details/1]).
+
-include_lib("couch/include/couch_db.hrl").
-include("couch_replicator_api_wrap.hrl").
-include("couch_replicator.hrl").
@@ -210,26 +212,41 @@
true ->
cancel_replication(RepId);
false ->
- {BaseId, Ext} = RepId,
- case lists:keysearch(
- BaseId ++ Ext, 1, supervisor:which_children(couch_replicator_job_sup)) of
- {value, {_, Pid, _, _}} when is_pid(Pid) ->
- case (catch gen_server:call(Pid, get_details, infinity)) of
+ case find_replicator(RepId) of
+ {ok, Pid} ->
+ case details(Pid) of
{ok, #rep{user_ctx = #user_ctx{name = Name}}} ->
cancel_replication(RepId);
{ok, _} ->
throw({unauthorized,
<<"Can't cancel a replication triggered by another user">>});
- {'EXIT', {noproc, {gen_server, call, _}}} ->
- {error, not_found};
Error ->
- throw(Error)
+ Error
end;
- _ ->
- {error, not_found}
+ Error ->
+ Error
end
end.
+find_replicator({BaseId, Ext} = _RepId) ->
+ case lists:keysearch(
+ BaseId ++ Ext, 1, supervisor:which_children(couch_replicator_job_sup)) of
+ {value, {_, Pid, _, _}} when is_pid(Pid) ->
+ {ok, Pid};
+ _ ->
+ {error, not_found}
+ end.
+
+details(Pid) ->
+ case (catch gen_server:call(Pid, get_details)) of
+ {ok, Rep} ->
+ {ok, Rep};
+ {'EXIT', {noproc, {gen_server, call, _}}} ->
+ {error, not_found};
+ Error ->
+ throw(Error)
+ end.
+
init(InitArgs) ->
{ok, InitArgs, 0}.
@@ -278,6 +295,7 @@
{type, replication},
{user, UserCtx#user_ctx.name},
{replication_id, ?l2b(BaseId ++ Ext)},
+ {database, Rep#rep.db_name},
{doc_id, Rep#rep.doc_id},
{source, ?l2b(SourceName)},
{target, ?l2b(TargetName)},
@@ -458,13 +476,20 @@
{noreply, State#rep_state{target = NewTarget}};
handle_cast(checkpoint, State) ->
- case do_checkpoint(State) of
- {ok, NewState} ->
- couch_stats:increment_counter([couch_replicator, checkpoints, success]),
- {noreply, NewState#rep_state{timer = start_timer(State)}};
- Error ->
- couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
- {stop, Error, State}
+ #rep_state{rep_details = #rep{} = Rep} = State,
+ case couch_replicator_manager:continue(Rep) of
+ {true, _} ->
+ case do_checkpoint(State) of
+ {ok, NewState} ->
+ couch_stats:increment_counter([couch_replicator, checkpoints, success]),
+ {noreply, NewState#rep_state{timer = start_timer(State)}};
+ Error ->
+ couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
+ {stop, Error, State}
+ end;
+ {false, Owner} ->
+ couch_replicator_manager:replication_usurped(Rep, Owner),
+ {stop, shutdown, State}
end;
handle_cast({report_seq, Seq},
diff --git a/src/couch_replicator.hrl b/src/couch_replicator.hrl
index dbb1793..d3485c0 100644
--- a/src/couch_replicator.hrl
+++ b/src/couch_replicator.hrl
@@ -20,5 +20,6 @@
user_ctx,
type = db,
view = nil,
- doc_id
+ doc_id,
+ db_name = null
}).
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index b8644da..ab5590b 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -83,9 +83,20 @@
ok
end)
end,
- send_req(Db, [{method, head}],
- fun(200, _, _) ->
- {ok, Db};
+ send_req(Db, [{method, get}],
+ fun(200, _, {Props}) ->
+ UpdateSeq = get_value(<<"update_seq">>, Props),
+ InstanceStart = get_value(<<"instance_start_time">>, Props),
+ case {UpdateSeq, InstanceStart} of
+ {undefined, _} ->
+ throw({db_not_found, ?l2b(db_uri(Db))});
+ {_, undefined} ->
+ throw({db_not_found, ?l2b(db_uri(Db))});
+ _ ->
+ {ok, Db}
+ end;
+ (200, _, _Body) ->
+ throw({db_not_found, ?l2b(db_uri(Db))});
(401, _, _) ->
throw({unauthorized, ?l2b(db_uri(Db))});
(_, _, _) ->
@@ -113,7 +124,7 @@
couch_db:create(DbName, Options)
end,
case couch_db:open(DbName, Options) of
- {error, illegal_database_name, _} ->
+ {error, {illegal_database_name, _}} ->
throw({db_not_found, DbName});
{not_found, _Reason} ->
throw({db_not_found, DbName});
@@ -230,7 +241,7 @@
(200, Headers, StreamDataFun) ->
remote_open_doc_revs_streamer_start(Self),
{<<"--">>, _, _} = couch_httpd:parse_multipart_request(
- get_value("Content-Type", Headers),
+ header_value("Content-Type", Headers),
StreamDataFun,
fun mp_parse_mixed/1
);
@@ -663,7 +674,7 @@
{started_open_doc_revs, NewRef} ->
restart_remote_open_doc_revs(Ref, NewRef);
{headers, Ref, Headers} ->
- case get_value("content-type", Headers) of
+ case header_value("content-type", Headers) of
{"multipart/related", _} = ContentType ->
case couch_doc:doc_from_multi_part_stream(
ContentType,
@@ -936,3 +947,15 @@
receive {data, Ref, Data} ->
{ok, Data, {LenLeft - iolist_size(Data), Id}}
end.
+
+header_value(Key, Headers) ->
+ header_value(Key, Headers, undefined).
+
+header_value(Key, Headers, Default) ->
+ Headers1 = [{string:to_lower(K), V} || {K, V} <- Headers],
+ case lists:keyfind(string:to_lower(Key), 1, Headers1) of
+ {_, Value} ->
+ Value;
+ _ ->
+ Default
+ end.
diff --git a/src/couch_replicator_api_wrap.hrl b/src/couch_replicator_api_wrap.hrl
index 1a6f27a..eee04da 100644
--- a/src/couch_replicator_api_wrap.hrl
+++ b/src/couch_replicator_api_wrap.hrl
@@ -17,7 +17,7 @@
oauth = nil,
headers = [
{"Accept", "application/json"},
- {"User-Agent", "CouchDB/" ++ couch_server:get_version()}
+ {"User-Agent", "CouchDB-Replicator/" ++ couch_server:get_version()}
],
timeout, % milliseconds
ibrowse_options = [],
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 9a6213a..8b34e0e 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -27,6 +27,17 @@
-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
-define(MAX_WAIT, 5 * 60 * 1000).
+-define(STREAM_STATUS, ibrowse_stream_status).
+
+
+% This limit is for the number of messages we're willing to discard
+% from an HTTP stream in clean_mailbox/1 before killing the worker
+% and returning. The original intent for clean_mailbox was to remove
+% a single message or two if the changes feed returned before fully
+% consuming the request. This threshold gives us confidence we'll
+% continue to properly close changes feeds while avoiding any case
+% where we may end up processing an unbounded number of messages.
+-define(MAX_DISCARDED_MESSAGES, 16).
setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
@@ -35,6 +46,7 @@
send_req(HttpDb, Params1, Callback) ->
+ put(?STREAM_STATUS, init),
couch_stats:increment_counter([couch_replicator, requests]),
Params2 = ?replace(Params1, qs,
[{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
@@ -94,6 +106,14 @@
process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, _Cb) ->
throw({retry, HttpDb, Params});
+%% This clause handles un-expected connection closing during pipelined requests.
+%% For example, if server responds to a request, sets Connection: close header
+%% and closes the socket, ibrowse will detect that error when it sends
+%% next request.
+process_response({error, connection_closing}, Worker, HttpDb, Params, _Cb)->
+ ibrowse_http_client:stop(Worker),
+ throw({retry, HttpDb, Params});
+
process_response({error, {'EXIT',{normal,_}}}, _Worker, HttpDb, Params, _Cb) ->
% ibrowse worker terminated because remote peer closed the socket
% -> not an error
@@ -132,6 +152,7 @@
StreamDataFun = fun() ->
stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
end,
+ put(?STREAM_STATUS, {streaming, Worker}),
ibrowse:stream_next(ReqId),
try
Ret = Callback(Ok, Headers, StreamDataFun),
@@ -166,16 +187,50 @@
% on the ibrowse_req_id format. This just drops all
% messages for the given ReqId on the floor since we're
% no longer in the HTTP request.
-clean_mailbox({ibrowse_req_id, ReqId}) ->
- receive
- {ibrowse_async_response, ReqId, _} ->
- clean_mailbox({ibrowse_req_id, ReqId});
- {ibrowse_async_response_end, ReqId} ->
- clean_mailbox({ibrowse_req_id, ReqId})
- after 0 ->
- ok
+
+clean_mailbox(ReqId) ->
+ clean_mailbox(ReqId, ?MAX_DISCARDED_MESSAGES).
+
+
+clean_mailbox(_ReqId, 0) ->
+ case get(?STREAM_STATUS) of
+ {streaming, Worker} ->
+ % We kill workers that continue to stream us
+ % messages after we give up but do *not* exit
+ % our selves. This is because we may be running
+ % as an exception unwinds and we don't want to
+ % change any of that subtle logic.
+ exit(Worker, {timeout, ibrowse_stream_cleanup});
+ _ ->
+ ok
+ end,
+ ok;
+clean_mailbox({ibrowse_req_id, ReqId}, Count) when Count > 0 ->
+ case get(?STREAM_STATUS) of
+ {streaming, Worker} ->
+ ibrowse:stream_next(ReqId),
+ receive
+ {ibrowse_async_response, ReqId, _} ->
+ clean_mailbox({ibrowse_req_id, ReqId}, Count - 1);
+ {ibrowse_async_response_end, ReqId} ->
+ put(?STREAM_STATUS, ended),
+ ok
+ after 30000 ->
+ exit(Worker, {timeout, ibrowse_stream_cleanup}),
+ exit({timeout, ibrowse_stream_cleanup})
+ end;
+ Status when Status == init; Status == ended ->
+ receive
+ {ibrowse_async_response, ReqId, _} ->
+ clean_mailbox({ibrowse_req_id, ReqId}, Count - 1);
+ {ibrowse_async_response_end, ReqId} ->
+ put(?STREAM_STATUS, ended),
+ ok
+ after 0 ->
+ ok
+ end
end;
-clean_mailbox(_) ->
+clean_mailbox(_, Count) when Count > 0 ->
ok.
@@ -222,6 +277,7 @@
ibrowse:stream_next(ReqId),
{Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
{Data, ibrowse_async_response_end} ->
+ put(?STREAM_STATUS, ended),
{Data, fun() -> throw({maybe_retry_req, more_data_expected}) end}
end.
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index d6c88c1..c6f7960 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -12,11 +12,12 @@
-module(couch_replicator_manager).
-behaviour(gen_server).
--vsn(1).
+-vsn(2).
-behaviour(config_listener).
% public API
-export([replication_started/1, replication_completed/2, replication_error/2]).
+-export([continue/1, replication_usurped/2]).
-export([before_doc_update/2, after_doc_read/2]).
@@ -28,10 +29,13 @@
-export([changes_reader/3, changes_reader_cb/3]).
% config_listener callback
--export([handle_config_change/5]).
+-export([handle_config_change/5, handle_config_terminate/3]).
-export([handle_db_event/3]).
+%% exported but private
+-export([start_replication/2]).
+
-include_lib("couch/include/couch_db.hrl").
-include_lib("mem3/include/mem3.hrl").
-include("couch_replicator.hrl").
@@ -49,7 +53,6 @@
-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
-record(rep_state, {
- dbname,
rep,
starting,
retries_left,
@@ -65,7 +68,8 @@
event_listener = nil,
scan_pid = nil,
rep_start_pids = [],
- max_retries
+ max_retries,
+ live = []
}).
start_link() ->
@@ -76,7 +80,7 @@
case rep_state(RepId) of
nil ->
ok;
- #rep_state{dbname = DbName, rep = #rep{doc_id = DocId}} ->
+ #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
update_rep_doc(DbName, DocId, [
{<<"_replication_state">>, <<"triggered">>},
{<<"_replication_state_reason">>, undefined},
@@ -92,7 +96,7 @@
case rep_state(RepId) of
nil ->
ok;
- #rep_state{dbname = DbName, rep = #rep{doc_id = DocId}} ->
+ #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
update_rep_doc(DbName, DocId, [
{<<"_replication_state">>, <<"completed">>},
{<<"_replication_state_reason">>, undefined},
@@ -103,11 +107,22 @@
end.
+replication_usurped(#rep{id = RepId}, By) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ #rep_state{rep = #rep{doc_id = DocId}} ->
+ ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
+ couch_log:notice("Replication `~s` usurped by ~s (triggered by document `~s`)",
+ [pp_rep_id(RepId), By, DocId])
+ end.
+
+
replication_error(#rep{id = {BaseId, _} = RepId}, Error) ->
case rep_state(RepId) of
nil ->
ok;
- #rep_state{dbname = DbName, rep = #rep{doc_id = DocId}} ->
+ #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
update_rep_doc(DbName, DocId, [
{<<"_replication_state">>, <<"error">>},
{<<"_replication_state_reason">>, to_binary(error_reason(Error))},
@@ -115,6 +130,12 @@
ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
end.
+continue(#rep{doc_id = null}) ->
+ {true, no_owner};
+continue(#rep{id = RepId}) ->
+ Owner = gen_server:call(?MODULE, {owner, RepId}, infinity),
+ {node() == Owner, Owner}.
+
handle_config_change("replicator", "db", _, _, S) ->
ok = gen_server:call(S, rep_db_changed),
@@ -125,10 +146,17 @@
handle_config_change(_, _, _, _, S) ->
{ok, S}.
+handle_config_terminate(_, stop, _) -> ok;
+handle_config_terminate(Self, _, _) ->
+ spawn(fun() ->
+ timer:sleep(5000),
+ config:listen_for_changes(?MODULE, Self)
+ end).
init(_) ->
process_flag(trap_exit, true),
net_kernel:monitor_nodes(true),
+ Live = [node() | nodes()],
?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, public]),
?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, public]),
?DB_TO_SEQ = ets:new(?DB_TO_SEQ, [named_table, set, public]),
@@ -144,9 +172,17 @@
scan_pid = ScanPid,
max_retries = retries_value(
config:get("replicator", "max_replication_retry_count", "10")),
- rep_start_pids = [Pid]
+ rep_start_pids = [Pid],
+ live = Live
}}.
+handle_call({owner, RepId}, _From, State) ->
+ case rep_state(RepId) of
+ nil ->
+ {reply, nonode, State};
+ #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
+ {reply, owner(DbName, DocId, State#state.live), State}
+ end;
handle_call({rep_db_update, DbName, {ChangeProps} = Change}, _From, State) ->
NewState = try
@@ -212,11 +248,15 @@
couch_log:error("Replication manager received unexpected cast ~p", [Msg]),
{stop, {error, {unexpected_cast, Msg}}, State}.
-handle_info({nodeup, _Node}, State) ->
- {noreply, rescan(State)};
+handle_info({nodeup, Node}, State) ->
+ couch_log:notice("Rescanning replicator dbs as ~s came up.", [Node]),
+ Live = lists:usort([Node | State#state.live]),
+ {noreply, rescan(State#state{live=Live})};
-handle_info({nodedown, _Node}, State) ->
- {noreply, rescan(State)};
+handle_info({nodedown, Node}, State) ->
+ couch_log:notice("Rescanning replicator dbs ~s went down.", [Node]),
+ Live = State#state.live -- [Node],
+ {noreply, rescan(State#state{live=Live})};
handle_info({'EXIT', From, normal}, #state{scan_pid = From} = State) ->
couch_log:debug("Background scan has completed.", []),
@@ -250,14 +290,6 @@
% From a db monitor created by a replication process. Ignore.
{noreply, State};
-handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) ->
- erlang:send_after(5000, self(), restart_config_listener),
- {noreply, State};
-
-handle_info(restart_config_listener, State) ->
- ok = config:listen_for_changes(?MODULE, self()),
- {noreply, State};
-
handle_info(shutdown, State) ->
{stop, shutdown, State};
@@ -285,6 +317,8 @@
couch_event:stop_listener(Listener).
+code_change(1, State, _Extra) ->
+ {ok, erlang:append_element(State, [node() | nodes()])};
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -380,13 +414,15 @@
process_update(State, DbName, {Change}) ->
{RepProps} = JsonRepDoc = get_json_value(doc, Change),
DocId = get_json_value(<<"_id">>, RepProps),
- case {owner(DbName, DocId), get_json_value(deleted, Change, false)} of
+ case {owner(DbName, DocId, State#state.live), get_json_value(deleted, Change, false)} of
{_, true} ->
rep_doc_deleted(DbName, DocId),
State;
- {false, false} ->
+ {Owner, false} when Owner /= node() ->
+ couch_log:notice("Not starting '~s' as owner is ~s.", [DocId, Owner]),
State;
- {true, false} ->
+ {_Owner, false} ->
+ couch_log:notice("Maybe starting '~s' as I'm the owner", [DocId]),
case get_json_value(<<"_replication_state">>, RepProps) of
undefined ->
maybe_start_replication(State, DbName, DocId, JsonRepDoc);
@@ -405,13 +441,12 @@
end
end.
-owner(<<"shards/", _/binary>> = DbName, DocId) ->
- Live = [node()|nodes()],
+owner(<<"shards/", _/binary>> = DbName, DocId, Live) ->
Nodes = lists:sort([N || #shard{node=N} <- mem3:shards(mem3:dbname(DbName), DocId),
lists:member(N, Live)]),
- node() =:= hd(mem3_util:rotate_list({DbName, DocId}, Nodes));
-owner(_DbName, _DocId) ->
- true.
+ hd(mem3_util:rotate_list({DbName, DocId}, Nodes));
+owner(_DbName, _DocId, _Live) ->
+ node().
rep_db_update_error(Error, DbName, DocId) ->
case Error of
@@ -439,11 +474,11 @@
maybe_start_replication(State, DbName, DocId, RepDoc) ->
- #rep{id = {BaseId, _} = RepId} = Rep = parse_rep_doc(RepDoc),
+ #rep{id = {BaseId, _} = RepId} = Rep0 = parse_rep_doc(RepDoc),
+ Rep = Rep0#rep{db_name = DbName},
case rep_state(RepId) of
nil ->
RepState = #rep_state{
- dbname = DbName,
rep = Rep,
starting = true,
retries_left = State#state.max_retries,
@@ -453,16 +488,23 @@
true = ets:insert(?DOC_TO_REP, {{DbName, DocId}, RepId}),
couch_log:notice("Attempting to start replication `~s` (document `~s`).",
[pp_rep_id(RepId), DocId]),
- Pid = spawn_link(fun() -> start_replication(Rep, 0) end),
+ StartDelaySecs = erlang:max(0,
+ config:get_integer("replicator", "start_delay", 10)),
+ StartSplaySecs = erlang:max(1,
+ config:get_integer("replicator", "start_splay", 50)),
+ DelaySecs = StartDelaySecs + random:uniform(StartSplaySecs),
+ couch_log:notice("Delaying replication `~s` start by ~p seconds.",
+ [pp_rep_id(RepId), DelaySecs]),
+ Pid = spawn_link(?MODULE, start_replication, [Rep, DelaySecs]),
State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
#rep_state{rep = #rep{doc_id = DocId}} ->
State;
- #rep_state{starting = false, dbname = DbName, rep = #rep{doc_id = OtherDocId}} ->
+ #rep_state{starting = false, rep = #rep{db_name = DbName, doc_id = OtherDocId}} ->
couch_log:notice("The replication specified by the document `~s` was already"
" triggered by the document `~s`", [DocId, OtherDocId]),
maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)),
State;
- #rep_state{starting = true, dbname = DbName, rep = #rep{doc_id = OtherDocId}} ->
+ #rep_state{starting = true, rep = #rep{db_name = DbName, doc_id = OtherDocId}} ->
couch_log:notice("The replication specified by the document `~s` is already"
" being triggered by the document `~s`", [DocId, OtherDocId]),
maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)),
@@ -549,8 +591,7 @@
maybe_retry_replication(#rep_state{retries_left = 0} = RepState, Error, State) ->
#rep_state{
- dbname = DbName,
- rep = #rep{id = RepId, doc_id = DocId},
+ rep = #rep{id = RepId, doc_id = DocId, db_name = DbName},
max_retries = MaxRetries
} = RepState,
couch_replicator:cancel_replication(RepId),
@@ -570,7 +611,7 @@
couch_log:error("Error in replication `~s` (triggered by document `~s`): ~s"
"~nRestarting replication in ~p seconds.",
[pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]),
- Pid = spawn_link(fun() -> start_replication(Rep, Wait) end),
+ Pid = spawn_link(?MODULE, start_replication, [Rep, Wait]),
State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}.
@@ -598,11 +639,14 @@
ets:delete(?DB_TO_SEQ,DbName).
-update_rep_doc(RepDbName, RepDocId, KVs) when is_binary(RepDocId) ->
+update_rep_doc(RepDbName, RepDocId, KVs) ->
+ update_rep_doc(RepDbName, RepDocId, KVs, 1).
+
+update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) ->
try
case open_rep_doc(RepDbName, RepDocId) of
{ok, LastRepDoc} ->
- update_rep_doc(RepDbName, LastRepDoc, KVs);
+ update_rep_doc(RepDbName, LastRepDoc, KVs, Wait * 2);
_ ->
ok
end
@@ -610,10 +654,10 @@
throw:conflict ->
Msg = "Conflict when updating replication document `~s`. Retrying.",
couch_log:error(Msg, [RepDocId]),
- ok = timer:sleep(5),
- update_rep_doc(RepDbName, RepDocId, KVs)
+ ok = timer:sleep(random:uniform(erlang:min(128, Wait)) * 100),
+ update_rep_doc(RepDbName, RepDocId, KVs, Wait * 2)
end;
-update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
+update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
NewRepDocBody = lists:foldl(
fun({K, undefined}, Body) ->
lists:keydelete(K, 1, Body);
diff --git a/src/couch_replicator_utils.erl b/src/couch_replicator_utils.erl
index 30afb39..9fc42df 100644
--- a/src/couch_replicator_utils.erl
+++ b/src/couch_replicator_utils.erl
@@ -130,7 +130,7 @@
[filter_code(Filter, Source, UserCtx),
get_value(query_params, Options, {[]})]
end,
- couch_util:to_hex(couch_util:md5(term_to_binary(Base2))).
+ couch_util:to_hex(couch_crypto:hash(md5, term_to_binary(Base2))).
filter_code(Filter, Source, UserCtx) ->
@@ -345,14 +345,14 @@
config:get("replicator", "ssl_certificate_max_depth", "3")
),
VerifyCerts = config:get("replicator", "verify_ssl_certificates"),
- CertFile = config:get("replicator", "cert_file", nil),
- KeyFile = config:get("replicator", "key_file", nil),
- Password = config:get("replicator", "password", nil),
+ CertFile = config:get("replicator", "cert_file", undefined),
+ KeyFile = config:get("replicator", "key_file", undefined),
+ Password = config:get("replicator", "password", undefined),
SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
- SslOpts1 = case CertFile /= nil andalso KeyFile /= nil of
+ SslOpts1 = case CertFile /= undefined andalso KeyFile /= undefined of
true ->
case Password of
- nil ->
+ undefined ->
[{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
_ ->
[{certfile, CertFile}, {keyfile, KeyFile},
diff --git a/test/couch_replicator_compact_tests.erl b/test/couch_replicator_compact_tests.erl
index 8378567..459e42a 100644
--- a/test/couch_replicator_compact_tests.erl
+++ b/test/couch_replicator_compact_tests.erl
@@ -23,11 +23,9 @@
-define(TIMEOUT_WRITER, 3000).
-define(TIMEOUT_EUNIT, ?TIMEOUT div 1000 + 5).
--ifdef(run_broken_tests).
-
setup() ->
DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_USER]),
+ {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
ok = couch_db:close(Db),
DbName.
@@ -36,22 +34,22 @@
setup(remote) ->
{remote, setup()};
setup({A, B}) ->
- ok = test_util:start_couch(),
+ Ctx = test_util:start_couch([couch_replicator]),
Source = setup(A),
Target = setup(B),
- {Source, Target}.
+ {Ctx, {Source, Target}}.
teardown({remote, DbName}) ->
teardown(DbName);
teardown(DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_USER]),
+ ok = couch_server:delete(DbName, [?ADMIN_CTX]),
ok.
-teardown(_, {Source, Target}) ->
+teardown(_, {Ctx, {Source, Target}}) ->
teardown(Source),
teardown(Target),
-
- ok = test_util:stop_couch().
+ ok = application:stop(couch_replicator),
+ ok = test_util:stop_couch(Ctx).
compact_test_() ->
Pairs = [{local, local}, {local, remote},
@@ -67,7 +65,7 @@
}.
-should_populate_replicate_compact({From, To}, {Source, Target}) ->
+should_populate_replicate_compact({From, To}, {_Ctx, {Source, Target}}) ->
{ok, RepPid, RepId} = replicate(Source, Target),
{lists:flatten(io_lib:format("~p -> ~p", [From, To])),
{inorder, [
@@ -110,6 +108,7 @@
end,
FullRepId = ?l2b(BaseId ++ Ext),
Pid = ?l2b(pid_to_list(RepPid)),
+ ok = wait_for_replicator(RepPid),
[RepTask] = couch_task_status:all(),
?assertEqual(Pid, couch_util:get_value(pid, RepTask)),
?assertEqual(FullRepId, couch_util:get_value(replication_id, RepTask)),
@@ -123,9 +122,15 @@
?assert(is_integer(couch_util:get_value(missing_revisions_found, RepTask))),
?assert(is_integer(couch_util:get_value(checkpointed_source_seq, RepTask))),
?assert(is_integer(couch_util:get_value(source_seq, RepTask))),
- Progress = couch_util:get_value(progress, RepTask),
- ?assert(is_integer(Progress)),
- ?assert(Progress =< 100).
+ Pending = couch_util:get_value(changes_pending, RepTask),
+ ?assert(is_integer(Pending)).
+
+wait_for_replicator(Pid) ->
+ %% since replicator started asynchronously
+ %% we need to wait when it would be in couch_task_status
+ %% we query replicator:details to ensure that do_init happen
+ ?assertMatch({ok, _}, couch_replicator:details(Pid)),
+ ok.
should_cancel_replication(RepId, RepPid) ->
?_assertNot(begin
@@ -146,12 +151,12 @@
compact_db("source", SourceDb),
?assert(is_process_alive(RepPid)),
?assert(is_process_alive(SourceDb#db.main_pid)),
- check_ref_counter("source", SourceDb),
+ wait_for_compaction("source", SourceDb),
compact_db("target", TargetDb),
?assert(is_process_alive(RepPid)),
?assert(is_process_alive(TargetDb#db.main_pid)),
- check_ref_counter("target", TargetDb),
+ wait_for_compaction("target", TargetDb),
{ok, SourceDb2} = reopen_db(SourceDb),
{ok, TargetDb2} = reopen_db(TargetDb),
@@ -163,14 +168,14 @@
?assert(is_process_alive(RepPid)),
?assert(is_process_alive(SourceDb2#db.main_pid)),
pause_writer(Writer),
- check_ref_counter("source", SourceDb2),
+ wait_for_compaction("source", SourceDb2),
resume_writer(Writer),
compact_db("target", TargetDb2),
?assert(is_process_alive(RepPid)),
?assert(is_process_alive(TargetDb2#db.main_pid)),
pause_writer(Writer),
- check_ref_counter("target", TargetDb2),
+ wait_for_compaction("target", TargetDb2),
resume_writer(Writer)
end, lists:seq(1, Rounds)),
stop_writer(Writer)
@@ -275,22 +280,17 @@
end,
ok = couch_db:close(Db).
-check_ref_counter(Type, #db{name = Name, fd_ref_counter = OldRefCounter}) ->
- MonRef = erlang:monitor(process, OldRefCounter),
- receive
- {'DOWN', MonRef, process, OldRefCounter, _} ->
- ok
- after ?TIMEOUT ->
+wait_for_compaction(Type, Db) ->
+ case couch_db:wait_for_compaction(Db) of
+ ok ->
+ ok;
+ {error, Reason} ->
erlang:error(
{assertion_failed,
[{module, ?MODULE}, {line, ?LINE},
- {reason, lists:concat(["Old ", Type,
- " database ref counter didn't"
- " terminate"])}]})
- end,
- {ok, #db{fd_ref_counter = NewRefCounter} = Db} = couch_db:open_int(Name, []),
- ok = couch_db:close(Db),
- ?assertNotEqual(OldRefCounter, NewRefCounter).
+ {reason, lists:concat(["Compaction of", Type,
+ " database failed with: ", Reason])}]})
+ end.
db_url(DbName) ->
iolist_to_binary([
@@ -311,7 +311,7 @@
{<<"target">>, Target},
{<<"continuous">>, true}
]},
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_ROLE),
+ {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
{ok, Pid} = couch_replicator:async_replicate(Rep),
{ok, Pid, Rep#rep.id}.
@@ -437,5 +437,3 @@
after 0 ->
ok
end.
-
--endif.
diff --git a/test/couch_replicator_large_atts_tests.erl b/test/couch_replicator_large_atts_tests.erl
index e8c0d1a..67f4d75 100644
--- a/test/couch_replicator_large_atts_tests.erl
+++ b/test/couch_replicator_large_atts_tests.erl
@@ -15,6 +15,12 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
+-import(couch_replicator_test_helper, [
+ db_url/1,
+ replicate/2,
+ compare_dbs/2
+]).
+
-define(ATT_SIZE_1, 2 * 1024 * 1024).
-define(ATT_SIZE_2, round(6.6 * 1024 * 1024)).
-define(DOCS_COUNT, 11).
@@ -24,7 +30,7 @@
setup() ->
DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_USER]),
+ {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
ok = couch_db:close(Db),
DbName.
@@ -33,23 +39,24 @@
setup(remote) ->
{remote, setup()};
setup({A, B}) ->
- ok = test_util:start_couch(),
+ Ctx = test_util:start_couch([couch_replicator]),
config:set("attachments", "compressible_types", "text/*", false),
Source = setup(A),
Target = setup(B),
- {Source, Target}.
+ {Ctx, {Source, Target}}.
teardown({remote, DbName}) ->
teardown(DbName);
teardown(DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_USER]),
+ ok = couch_server:delete(DbName, [?ADMIN_CTX]),
ok.
-teardown(_, {Source, Target}) ->
+teardown(_, {Ctx, {Source, Target}}) ->
teardown(Source),
teardown(Target),
- ok = test_util:stop_couch().
+ ok = application:stop(couch_replicator),
+ ok = test_util:stop_couch(Ctx).
large_atts_test_() ->
Pairs = [{local, local}, {local, remote},
@@ -65,7 +72,7 @@
}.
-should_populate_replicate_compact({From, To}, {Source, Target}) ->
+should_populate_replicate_compact({From, To}, {_Ctx, {Source, Target}}) ->
{lists:flatten(io_lib:format("~p -> ~p", [From, To])),
{inorder, [should_populate_source(Source),
should_replicate(Source, Target),
@@ -109,99 +116,10 @@
{ok, _} = couch_db:update_docs(Db, Docs, []),
couch_db:close(Db).
-compare_dbs(Source, Target) ->
- {ok, SourceDb} = couch_db:open_int(Source, []),
- {ok, TargetDb} = couch_db:open_int(Target, []),
-
- Fun = fun(FullDocInfo, _, Acc) ->
- {ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo),
- Id = DocSource#doc.id,
-
- {ok, DocTarget} = couch_db:open_doc(TargetDb, Id),
- ?assertEqual(DocSource#doc.body, DocTarget#doc.body),
-
- #doc{atts = SourceAtts} = DocSource,
- #doc{atts = TargetAtts} = DocTarget,
- ?assertEqual(lists:sort([N || #att{name = N} <- SourceAtts]),
- lists:sort([N || #att{name = N} <- TargetAtts])),
-
- FunCompareAtts = fun(#att{name = AttName} = Att) ->
- {ok, AttTarget} = find_att(TargetAtts, AttName),
- SourceMd5 = att_md5(Att),
- TargetMd5 = att_md5(AttTarget),
- case AttName of
- <<"att1">> ->
- ?assertEqual(gzip, Att#att.encoding),
- ?assertEqual(gzip, AttTarget#att.encoding),
- DecSourceMd5 = att_decoded_md5(Att),
- DecTargetMd5 = att_decoded_md5(AttTarget),
- ?assertEqual(DecSourceMd5, DecTargetMd5);
- _ ->
- ?assertEqual(identity, Att#att.encoding),
- ?assertEqual(identity, AttTarget#att.encoding)
- end,
- ?assertEqual(SourceMd5, TargetMd5),
- ?assert(is_integer(Att#att.disk_len)),
- ?assert(is_integer(Att#att.att_len)),
- ?assert(is_integer(AttTarget#att.disk_len)),
- ?assert(is_integer(AttTarget#att.att_len)),
- ?assertEqual(Att#att.disk_len, AttTarget#att.disk_len),
- ?assertEqual(Att#att.att_len, AttTarget#att.att_len),
- ?assertEqual(Att#att.type, AttTarget#att.type),
- ?assertEqual(Att#att.md5, AttTarget#att.md5)
- end,
-
- lists:foreach(FunCompareAtts, SourceAtts),
-
- {ok, Acc}
- end,
-
- {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
- ok = couch_db:close(SourceDb),
- ok = couch_db:close(TargetDb).
-
att(Name, Size, Type) ->
- #att{
- name = Name,
- type = Type,
- att_len = Size,
- data = fun(Count) -> crypto:rand_bytes(Count) end
- }.
-
-find_att([], _Name) ->
- nil;
-find_att([#att{name = Name} = Att | _], Name) ->
- {ok, Att};
-find_att([_ | Rest], Name) ->
- find_att(Rest, Name).
-
-att_md5(Att) ->
- Md50 = couch_doc:att_foldl(
- Att,
- fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end,
- couch_util:md5_init()),
- couch_util:md5_final(Md50).
-
-att_decoded_md5(Att) ->
- Md50 = couch_doc:att_foldl_decode(
- Att,
- fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end,
- couch_util:md5_init()),
- couch_util:md5_final(Md50).
-
-db_url(DbName) ->
- iolist_to_binary([
- "http://", config:get("httpd", "bind_address", "127.0.0.1"),
- ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
- "/", DbName
+ couch_att:new([
+ {name, Name},
+ {type, Type},
+ {att_len, Size},
+ {data, fun(Count) -> crypto:rand_bytes(Count) end}
]).
-
-replicate(Source, Target) ->
- RepObject = {[{<<"source">>, Source}, {<<"target">>, Target}]},
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_ROLE),
- {ok, Pid} = couch_replicator:async_replicate(Rep),
- MonRef = erlang:monitor(process, Pid),
- receive
- {'DOWN', MonRef, process, Pid, _} ->
- ok
- end.
diff --git a/test/couch_replicator_many_leaves_tests.erl b/test/couch_replicator_many_leaves_tests.erl
index 04d21ac..40ca611 100644
--- a/test/couch_replicator_many_leaves_tests.erl
+++ b/test/couch_replicator_many_leaves_tests.erl
@@ -29,7 +29,7 @@
setup() ->
DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_USER]),
+ {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
ok = couch_db:close(Db),
DbName.
@@ -38,22 +38,22 @@
setup(remote) ->
{remote, setup()};
setup({A, B}) ->
- ok = test_util:start_couch(),
+ Ctx = test_util:start_couch([couch_replicator]),
Source = setup(A),
Target = setup(B),
- {Source, Target}.
+ {Ctx, {Source, Target}}.
teardown({remote, DbName}) ->
teardown(DbName);
teardown(DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_USER]),
+ ok = couch_server:delete(DbName, [?ADMIN_CTX]),
ok.
-teardown(_, {Source, Target}) ->
+teardown(_, {Ctx, {Source, Target}}) ->
teardown(Source),
teardown(Target),
-
- ok = test_util:stop_couch().
+ ok = application:stop(couch_replicator),
+ ok = test_util:stop_couch(Ctx).
docs_with_many_leaves_test_() ->
Pairs = [{local, local}, {local, remote},
@@ -69,7 +69,7 @@
}.
-should_populate_replicate_compact({From, To}, {Source, Target}) ->
+should_populate_replicate_compact({From, To}, {_Ctx, {Source, Target}}) ->
{lists:flatten(io_lib:format("~p -> ~p", [From, To])),
{inorder, [
should_populate_source(Source),
@@ -137,7 +137,7 @@
add_doc_siblings(Db, DocId, NumLeaves, AccDocs, AccRevs) ->
Value = ?l2b(?i2l(NumLeaves)),
- Rev = couch_util:md5(Value),
+ Rev = couch_crypto:hash(md5, Value),
Doc = #doc{
id = DocId,
revs = {1, [Rev]},
@@ -183,13 +183,13 @@
fun(#doc{atts = Atts, revs = {Pos, [Rev | _]}} = Doc, Acc) ->
NewAtts = lists:foldl(fun(I, AttAcc) ->
AttData = crypto:rand_bytes(100),
- NewAtt = #att{
- name = ?io2b(["att_", ?i2l(I), "_",
- couch_doc:rev_to_str({Pos, Rev})]),
- type = <<"application/foobar">>,
- att_len = byte_size(AttData),
- data = AttData
- },
+ NewAtt = couch_att:new([
+ {name, ?io2b(["att_", ?i2l(I), "_",
+ couch_doc:rev_to_str({Pos, Rev})])},
+ {type, <<"application/foobar">>},
+ {att_len, byte_size(AttData)},
+ {data, AttData}
+ ]),
[NewAtt | AttAcc]
end, [], lists:seq(1, NumAtts)),
[Doc#doc{atts = Atts ++ NewAtts} | Acc]
@@ -212,7 +212,7 @@
{<<"source">>, Source},
{<<"target">>, Target}
]},
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_ROLE),
+ {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
{ok, Pid} = couch_replicator:async_replicate(Rep),
MonRef = erlang:monitor(process, Pid),
receive
diff --git a/test/couch_replicator_missing_stubs_tests.erl b/test/couch_replicator_missing_stubs_tests.erl
index 0bf2f6a..034ca42 100644
--- a/test/couch_replicator_missing_stubs_tests.erl
+++ b/test/couch_replicator_missing_stubs_tests.erl
@@ -15,16 +15,19 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
+-import(couch_replicator_test_helper, [
+ db_url/1,
+ replicate/2,
+ compare_dbs/2
+]).
+
-define(REVS_LIMIT, 3).
-define(TIMEOUT_STOP, 1000).
-define(TIMEOUT_EUNIT, 30).
-
--ifdef(run_broken_tests).
-
setup() ->
DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_USER]),
+ {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
ok = couch_db:close(Db),
DbName.
@@ -33,22 +36,22 @@
setup(remote) ->
{remote, setup()};
setup({A, B}) ->
- ok = test_util:start_couch(),
+ Ctx = test_util:start_couch([couch_replicator]),
Source = setup(A),
Target = setup(B),
- {Source, Target}.
+ {Ctx, {Source, Target}}.
teardown({remote, DbName}) ->
teardown(DbName);
teardown(DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_USER]),
+ ok = couch_server:delete(DbName, [?ADMIN_CTX]),
ok.
-teardown(_, {Source, Target}) ->
+teardown(_, {Ctx, {Source, Target}}) ->
teardown(Source),
teardown(Target),
-
- ok = test_util:stop_couch().
+ ok = application:stop(couch_replicator),
+ ok = test_util:stop_couch(Ctx).
missing_stubs_test_() ->
Pairs = [{local, local}, {local, remote},
@@ -64,7 +67,7 @@
}.
-should_replicate_docs_with_missed_att_stubs({From, To}, {Source, Target}) ->
+should_replicate_docs_with_missed_att_stubs({From, To}, {_Ctx, {Source, Target}}) ->
{lists:flatten(io_lib:format("~p -> ~p", [From, To])),
{inorder, [
should_populate_source(Source),
@@ -92,7 +95,7 @@
should_set_target_revs_limit(Target, RevsLimit);
should_set_target_revs_limit(Target, RevsLimit) ->
?_test(begin
- {ok, Db} = couch_db:open_int(Target, [?ADMIN_USER]),
+ {ok, Db} = couch_db:open_int(Target, [?ADMIN_CTX]),
?assertEqual(ok, couch_db:set_revs_limit(Db, RevsLimit)),
ok = couch_db:close(Db)
end).
@@ -116,12 +119,12 @@
Doc = #doc{
id = <<"doc1">>,
atts = [
- #att{
- name = <<"doc1_att1">>,
- type = <<"application/foobar">>,
- att_len = byte_size(AttData),
- data = AttData
- }
+ couch_att:new([
+ {name, <<"doc1_att1">>},
+ {type, <<"application/foobar">>},
+ {att_len, byte_size(AttData)},
+ {data, AttData}
+ ])
]
},
{ok, _} = couch_db:update_doc(Db, Doc, []),
@@ -153,102 +156,3 @@
lists:seq(1, Times)),
ok = couch_db:close(Db),
{ok, {DbName, Times}}.
-
-compare_dbs(Source, Target) ->
- {ok, SourceDb} = couch_db:open_int(Source, []),
- {ok, TargetDb} = couch_db:open_int(Target, []),
-
- Fun = fun(FullDocInfo, _, Acc) ->
- {ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo,
- [conflicts, deleted_conflicts]),
- Id = DocSource#doc.id,
-
- {ok, DocTarget} = couch_db:open_doc(TargetDb, Id,
- [conflicts, deleted_conflicts]),
- ?assertEqual(DocSource#doc.body, DocTarget#doc.body),
-
- ?assertEqual(couch_doc:to_json_obj(DocSource, []),
- couch_doc:to_json_obj(DocTarget, [])),
-
- #doc{atts = SourceAtts} = DocSource,
- #doc{atts = TargetAtts} = DocTarget,
- ?assertEqual(lists:sort([N || #att{name = N} <- SourceAtts]),
- lists:sort([N || #att{name = N} <- TargetAtts])),
-
- lists:foreach(
- fun(#att{name = AttName} = Att) ->
- {ok, AttTarget} = find_att(TargetAtts, AttName),
- SourceMd5 = att_md5(Att),
- TargetMd5 = att_md5(AttTarget),
- case AttName of
- <<"att1">> ->
- ?assertEqual(gzip, Att#att.encoding),
- ?assertEqual(gzip, AttTarget#att.encoding),
- DecSourceMd5 = att_decoded_md5(Att),
- DecTargetMd5 = att_decoded_md5(AttTarget),
- ?assertEqual(DecSourceMd5, DecTargetMd5);
- _ ->
- ?assertEqual(identity, Att#att.encoding),
- ?assertEqual(identity, AttTarget#att.encoding)
- end,
- ?assertEqual(SourceMd5, TargetMd5),
- ?assert(is_integer(Att#att.disk_len)),
- ?assert(is_integer(Att#att.att_len)),
- ?assert(is_integer(AttTarget#att.disk_len)),
- ?assert(is_integer(AttTarget#att.att_len)),
- ?assertEqual(Att#att.disk_len, AttTarget#att.disk_len),
- ?assertEqual(Att#att.att_len, AttTarget#att.att_len),
- ?assertEqual(Att#att.type, AttTarget#att.type),
- ?assertEqual(Att#att.md5, AttTarget#att.md5)
- end,
- SourceAtts),
- {ok, Acc}
- end,
-
- {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
- ok = couch_db:close(SourceDb),
- ok = couch_db:close(TargetDb).
-
-find_att([], _Name) ->
- nil;
-find_att([#att{name = Name} = Att | _], Name) ->
- {ok, Att};
-find_att([_ | Rest], Name) ->
- find_att(Rest, Name).
-
-att_md5(Att) ->
- Md50 = couch_doc:att_foldl(
- Att,
- fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end,
- couch_util:md5_init()),
- couch_util:md5_final(Md50).
-
-att_decoded_md5(Att) ->
- Md50 = couch_doc:att_foldl_decode(
- Att,
- fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end,
- couch_util:md5_init()),
- couch_util:md5_final(Md50).
-
-db_url(DbName) ->
- iolist_to_binary([
- "http://", config:get("httpd", "bind_address", "127.0.0.1"),
- ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
- "/", DbName
- ]).
-
-replicate(Source, Target) ->
- RepObject = {[
- {<<"source">>, Source},
- {<<"target">>, Target}
- ]},
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_ROLE),
- {ok, Pid} = couch_replicator:async_replicate(Rep),
- MonRef = erlang:monitor(process, Pid),
- receive
- {'DOWN', MonRef, process, Pid, _} ->
- ok
- end.
-
--endif.
-
diff --git a/test/couch_replicator_test_helper.erl b/test/couch_replicator_test_helper.erl
new file mode 100644
index 0000000..5b9d366
--- /dev/null
+++ b/test/couch_replicator_test_helper.erl
@@ -0,0 +1,107 @@
+-module(couch_replicator_test_helper).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-export([compare_dbs/2, db_url/1, replicate/2]).
+
+compare_dbs(Source, Target) ->
+ {ok, SourceDb} = couch_db:open_int(Source, []),
+ {ok, TargetDb} = couch_db:open_int(Target, []),
+
+ Fun = fun(FullDocInfo, _, Acc) ->
+ {ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo),
+ Id = DocSource#doc.id,
+
+ {ok, DocTarget} = couch_db:open_doc(TargetDb, Id),
+ ?assertEqual(DocSource#doc.body, DocTarget#doc.body),
+
+ #doc{atts = SourceAtts} = DocSource,
+ #doc{atts = TargetAtts} = DocTarget,
+ ?assertEqual(lists:sort([couch_att:fetch(name, Att) || Att <- SourceAtts]),
+ lists:sort([couch_att:fetch(name, Att) || Att <- TargetAtts])),
+
+ FunCompareAtts = fun(Att) ->
+ AttName = couch_att:fetch(name, Att),
+ {ok, AttTarget} = find_att(TargetAtts, AttName),
+ SourceMd5 = att_md5(Att),
+ TargetMd5 = att_md5(AttTarget),
+ case AttName of
+ <<"att1">> ->
+ ?assertEqual(gzip, couch_att:fetch(encoding, Att)),
+ ?assertEqual(gzip, couch_att:fetch(encoding, AttTarget)),
+ DecSourceMd5 = att_decoded_md5(Att),
+ DecTargetMd5 = att_decoded_md5(AttTarget),
+ ?assertEqual(DecSourceMd5, DecTargetMd5);
+ _ ->
+ ?assertEqual(identity, couch_att:fetch(encoding, AttTarget)),
+ ?assertEqual(identity, couch_att:fetch(encoding, AttTarget))
+ end,
+ ?assertEqual(SourceMd5, TargetMd5),
+ ?assert(is_integer(couch_att:fetch(disk_len, Att))),
+ ?assert(is_integer(couch_att:fetch(att_len, Att))),
+ ?assert(is_integer(couch_att:fetch(disk_len, AttTarget))),
+ ?assert(is_integer(couch_att:fetch(att_len, AttTarget))),
+ ?assertEqual(couch_att:fetch(disk_len, Att),
+ couch_att:fetch(disk_len, AttTarget)),
+ ?assertEqual(couch_att:fetch(att_len, Att),
+ couch_att:fetch(att_len, AttTarget)),
+ ?assertEqual(couch_att:fetch(type, Att),
+ couch_att:fetch(type, AttTarget)),
+ ?assertEqual(couch_att:fetch(md5, Att),
+ couch_att:fetch(md5, AttTarget))
+ end,
+
+ lists:foreach(FunCompareAtts, SourceAtts),
+
+ {ok, Acc}
+ end,
+
+ {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
+ ok = couch_db:close(SourceDb),
+ ok = couch_db:close(TargetDb).
+
+find_att([], _Name) ->
+ nil;
+find_att([Att | Rest], Name) ->
+ case couch_att:fetch(name, Att) of
+ Name ->
+ {ok, Att};
+ _ ->
+ find_att(Rest, Name)
+ end.
+
+
+att_md5(Att) ->
+ Md50 = couch_att:foldl(
+ Att,
+ fun(Chunk, Acc) -> couch_crypto:hash_update(md5, Acc, Chunk) end,
+ couch_crypto:hash_init(md5)),
+ couch_crypto:hash_final(md5, Md50).
+
+att_decoded_md5(Att) ->
+ Md50 = couch_att:foldl_decode(
+ Att,
+ fun(Chunk, Acc) -> couch_crypto:hash_update(md5, Acc, Chunk) end,
+ couch_crypto:hash_init(md5)),
+ couch_crypto:hash_final(md5, Md50).
+
+db_url(DbName) ->
+ iolist_to_binary([
+ "http://", config:get("httpd", "bind_address", "127.0.0.1"),
+ ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
+ "/", DbName
+ ]).
+
+replicate(Source, Target) ->
+ RepObject = {[
+ {<<"source">>, Source},
+ {<<"target">>, Target}
+ ]},
+ {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+ {ok, Pid} = couch_replicator:async_replicate(Rep),
+ MonRef = erlang:monitor(process, Pid),
+ receive
+ {'DOWN', MonRef, process, Pid, _} ->
+ ok
+ end.
diff --git a/test/couch_replicator_use_checkpoints_tests.erl b/test/couch_replicator_use_checkpoints_tests.erl
index 5b5668d..7b7a55a 100644
--- a/test/couch_replicator_use_checkpoints_tests.erl
+++ b/test/couch_replicator_use_checkpoints_tests.erl
@@ -43,7 +43,7 @@
setup() ->
DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_USER]),
+ {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
ok = couch_db:close(Db),
DbName.
@@ -52,24 +52,25 @@
setup(remote) ->
{remote, setup()};
setup({_, Fun, {A, B}}) ->
- ok = test_util:start_couch(),
+ Ctx = test_util:start_couch([couch_replicator]),
{ok, Listener} = couch_replicator_notifier:start_link(Fun),
Source = setup(A),
Target = setup(B),
- {Source, Target, Listener}.
+ {Ctx, {Source, Target, Listener}}.
teardown({remote, DbName}) ->
teardown(DbName);
teardown(DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_USER]),
+ ok = couch_server:delete(DbName, [?ADMIN_CTX]),
ok.
-teardown(_, {Source, Target, Listener}) ->
+teardown(_, {Ctx, {Source, Target, Listener}}) ->
teardown(Source),
teardown(Target),
couch_replicator_notifier:stop(Listener),
- ok = test_util:stop_couch().
+ ok = application:stop(couch_replicator),
+ ok = test_util:stop_couch(Ctx).
use_checkpoints_test_() ->
{
@@ -95,7 +96,7 @@
}
}.
-should_test_checkpoints({UseCheckpoints, _, {From, To}}, {Source, Target, _}) ->
+should_test_checkpoints({UseCheckpoints, _, {From, To}}, {_Ctx, {Source, Target, _}}) ->
should_test_checkpoints(UseCheckpoints, {From, To}, {Source, Target}).
should_test_checkpoints(UseCheckpoints, {From, To}, {Source, Target}) ->
{lists:flatten(io_lib:format("~p -> ~p", [From, To])),
@@ -180,7 +181,7 @@
{<<"target">>, Target},
{<<"use_checkpoints">>, UseCheckpoints}
]},
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_ROLE),
+ {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
{ok, Pid} = couch_replicator:async_replicate(Rep),
MonRef = erlang:monitor(process, Pid),
receive