Merge remote-tracking branch 'github/pr/4'
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..f6cd2bc
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,202 @@
+
+                                Apache License
+                          Version 2.0, January 2004
+                       http://www.apache.org/licenses/
+
+  TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+  1. Definitions.
+
+     "License" shall mean the terms and conditions for use, reproduction,
+     and distribution as defined by Sections 1 through 9 of this document.
+
+     "Licensor" shall mean the copyright owner or entity authorized by
+     the copyright owner that is granting the License.
+
+     "Legal Entity" shall mean the union of the acting entity and all
+     other entities that control, are controlled by, or are under common
+     control with that entity. For the purposes of this definition,
+     "control" means (i) the power, direct or indirect, to cause the
+     direction or management of such entity, whether by contract or
+     otherwise, or (ii) ownership of fifty percent (50%) or more of the
+     outstanding shares, or (iii) beneficial ownership of such entity.
+
+     "You" (or "Your") shall mean an individual or Legal Entity
+     exercising permissions granted by this License.
+
+     "Source" form shall mean the preferred form for making modifications,
+     including but not limited to software source code, documentation
+     source, and configuration files.
+
+     "Object" form shall mean any form resulting from mechanical
+     transformation or translation of a Source form, including but
+     not limited to compiled object code, generated documentation,
+     and conversions to other media types.
+
+     "Work" shall mean the work of authorship, whether in Source or
+     Object form, made available under the License, as indicated by a
+     copyright notice that is included in or attached to the work
+     (an example is provided in the Appendix below).
+
+     "Derivative Works" shall mean any work, whether in Source or Object
+     form, that is based on (or derived from) the Work and for which the
+     editorial revisions, annotations, elaborations, or other modifications
+     represent, as a whole, an original work of authorship. For the purposes
+     of this License, Derivative Works shall not include works that remain
+     separable from, or merely link (or bind by name) to the interfaces of,
+     the Work and Derivative Works thereof.
+
+     "Contribution" shall mean any work of authorship, including
+     the original version of the Work and any modifications or additions
+     to that Work or Derivative Works thereof, that is intentionally
+     submitted to Licensor for inclusion in the Work by the copyright owner
+     or by an individual or Legal Entity authorized to submit on behalf of
+     the copyright owner. For the purposes of this definition, "submitted"
+     means any form of electronic, verbal, or written communication sent
+     to the Licensor or its representatives, including but not limited to
+     communication on electronic mailing lists, source code control systems,
+     and issue tracking systems that are managed by, or on behalf of, the
+     Licensor for the purpose of discussing and improving the Work, but
+     excluding communication that is conspicuously marked or otherwise
+     designated in writing by the copyright owner as "Not a Contribution."
+
+     "Contributor" shall mean Licensor and any individual or Legal Entity
+     on behalf of whom a Contribution has been received by Licensor and
+     subsequently incorporated within the Work.
+
+  2. Grant of Copyright License. Subject to the terms and conditions of
+     this License, each Contributor hereby grants to You a perpetual,
+     worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+     copyright license to reproduce, prepare Derivative Works of,
+     publicly display, publicly perform, sublicense, and distribute the
+     Work and such Derivative Works in Source or Object form.
+
+  3. Grant of Patent License. Subject to the terms and conditions of
+     this License, each Contributor hereby grants to You a perpetual,
+     worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+     (except as stated in this section) patent license to make, have made,
+     use, offer to sell, sell, import, and otherwise transfer the Work,
+     where such license applies only to those patent claims licensable
+     by such Contributor that are necessarily infringed by their
+     Contribution(s) alone or by combination of their Contribution(s)
+     with the Work to which such Contribution(s) was submitted. If You
+     institute patent litigation against any entity (including a
+     cross-claim or counterclaim in a lawsuit) alleging that the Work
+     or a Contribution incorporated within the Work constitutes direct
+     or contributory patent infringement, then any patent licenses
+     granted to You under this License for that Work shall terminate
+     as of the date such litigation is filed.
+
+  4. Redistribution. You may reproduce and distribute copies of the
+     Work or Derivative Works thereof in any medium, with or without
+     modifications, and in Source or Object form, provided that You
+     meet the following conditions:
+
+     (a) You must give any other recipients of the Work or
+         Derivative Works a copy of this License; and
+
+     (b) You must cause any modified files to carry prominent notices
+         stating that You changed the files; and
+
+     (c) You must retain, in the Source form of any Derivative Works
+         that You distribute, all copyright, patent, trademark, and
+         attribution notices from the Source form of the Work,
+         excluding those notices that do not pertain to any part of
+         the Derivative Works; and
+
+     (d) If the Work includes a "NOTICE" text file as part of its
+         distribution, then any Derivative Works that You distribute must
+         include a readable copy of the attribution notices contained
+         within such NOTICE file, excluding those notices that do not
+         pertain to any part of the Derivative Works, in at least one
+         of the following places: within a NOTICE text file distributed
+         as part of the Derivative Works; within the Source form or
+         documentation, if provided along with the Derivative Works; or,
+         within a display generated by the Derivative Works, if and
+         wherever such third-party notices normally appear. The contents
+         of the NOTICE file are for informational purposes only and
+         do not modify the License. You may add Your own attribution
+         notices within Derivative Works that You distribute, alongside
+         or as an addendum to the NOTICE text from the Work, provided
+         that such additional attribution notices cannot be construed
+         as modifying the License.
+
+     You may add Your own copyright statement to Your modifications and
+     may provide additional or different license terms and conditions
+     for use, reproduction, or distribution of Your modifications, or
+     for any such Derivative Works as a whole, provided Your use,
+     reproduction, and distribution of the Work otherwise complies with
+     the conditions stated in this License.
+
+  5. Submission of Contributions. Unless You explicitly state otherwise,
+     any Contribution intentionally submitted for inclusion in the Work
+     by You to the Licensor shall be under the terms and conditions of
+     this License, without any additional terms or conditions.
+     Notwithstanding the above, nothing herein shall supersede or modify
+     the terms of any separate license agreement you may have executed
+     with Licensor regarding such Contributions.
+
+  6. Trademarks. This License does not grant permission to use the trade
+     names, trademarks, service marks, or product names of the Licensor,
+     except as required for reasonable and customary use in describing the
+     origin of the Work and reproducing the content of the NOTICE file.
+
+  7. Disclaimer of Warranty. Unless required by applicable law or
+     agreed to in writing, Licensor provides the Work (and each
+     Contributor provides its Contributions) on an "AS IS" BASIS,
+     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+     implied, including, without limitation, any warranties or conditions
+     of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+     PARTICULAR PURPOSE. You are solely responsible for determining the
+     appropriateness of using or redistributing the Work and assume any
+     risks associated with Your exercise of permissions under this License.
+
+  8. Limitation of Liability. In no event and under no legal theory,
+     whether in tort (including negligence), contract, or otherwise,
+     unless required by applicable law (such as deliberate and grossly
+     negligent acts) or agreed to in writing, shall any Contributor be
+     liable to You for damages, including any direct, indirect, special,
+     incidental, or consequential damages of any character arising as a
+     result of this License or out of the use or inability to use the
+     Work (including but not limited to damages for loss of goodwill,
+     work stoppage, computer failure or malfunction, or any and all
+     other commercial damages or losses), even if such Contributor
+     has been advised of the possibility of such damages.
+
+  9. Accepting Warranty or Additional Liability. While redistributing
+     the Work or Derivative Works thereof, You may choose to offer,
+     and charge a fee for, acceptance of support, warranty, indemnity,
+     or other liability obligations and/or rights consistent with this
+     License. However, in accepting such obligations, You may act only
+     on Your own behalf and on Your sole responsibility, not on behalf
+     of any other Contributor, and only if You agree to indemnify,
+     defend, and hold each Contributor harmless for any liability
+     incurred by, or claims asserted against, such Contributor by reason
+     of your accepting any such warranty or additional liability.
+
+  END OF TERMS AND CONDITIONS
+
+  APPENDIX: How to apply the Apache License to your work.
+
+     To apply the Apache License to your work, attach the following
+     boilerplate notice, with the fields enclosed by brackets "[]"
+     replaced with your own identifying information. (Don't include
+     the brackets!)  The text should be enclosed in the appropriate
+     comment syntax for the file format. We also recommend that a
+     file or class name and description of purpose be included on the
+     same "printed page" as the copyright notice for easier
+     identification within third-party archives.
+
+  Copyright [yyyy] [name of copyright owner]
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index e7d2ee0..f5055e0 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -25,6 +25,8 @@
 -export([init/1, terminate/2, code_change/3]).
 -export([handle_call/3, handle_cast/2, handle_info/2]).
 
+-export([details/1]).
+
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator_api_wrap.hrl").
 -include("couch_replicator.hrl").
@@ -210,26 +212,41 @@
     true ->
         cancel_replication(RepId);
     false ->
-        {BaseId, Ext} = RepId,
-        case lists:keysearch(
-            BaseId ++ Ext, 1, supervisor:which_children(couch_replicator_job_sup)) of
-        {value, {_, Pid, _, _}} when is_pid(Pid) ->
-            case (catch gen_server:call(Pid, get_details, infinity)) of
+        case find_replicator(RepId) of
+        {ok, Pid} ->
+            case details(Pid) of
             {ok, #rep{user_ctx = #user_ctx{name = Name}}} ->
                 cancel_replication(RepId);
             {ok, _} ->
                 throw({unauthorized,
                     <<"Can't cancel a replication triggered by another user">>});
-            {'EXIT', {noproc, {gen_server, call, _}}} ->
-                {error, not_found};
             Error ->
-                throw(Error)
+                Error
             end;
-        _ ->
-            {error, not_found}
+        Error ->
+            Error
         end
     end.
 
+find_replicator({BaseId, Ext} = _RepId) ->
+    case lists:keysearch(
+        BaseId ++ Ext, 1, supervisor:which_children(couch_replicator_job_sup)) of
+    {value, {_, Pid, _, _}} when is_pid(Pid) ->
+            {ok, Pid};
+    _ ->
+            {error, not_found}
+    end.
+
+details(Pid) ->
+    case (catch gen_server:call(Pid, get_details)) of
+    {ok, Rep} ->
+        {ok, Rep};
+    {'EXIT', {noproc, {gen_server, call, _}}} ->
+        {error, not_found};
+    Error ->
+        throw(Error)
+    end.
+
 init(InitArgs) ->
     {ok, InitArgs, 0}.
 
@@ -278,6 +295,7 @@
         {type, replication},
         {user, UserCtx#user_ctx.name},
         {replication_id, ?l2b(BaseId ++ Ext)},
+        {database, Rep#rep.db_name},
         {doc_id, Rep#rep.doc_id},
         {source, ?l2b(SourceName)},
         {target, ?l2b(TargetName)},
@@ -458,13 +476,20 @@
     {noreply, State#rep_state{target = NewTarget}};
 
 handle_cast(checkpoint, State) ->
-    case do_checkpoint(State) of
-    {ok, NewState} ->
-        couch_stats:increment_counter([couch_replicator, checkpoints, success]),
-        {noreply, NewState#rep_state{timer = start_timer(State)}};
-    Error ->
-        couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
-        {stop, Error, State}
+    #rep_state{rep_details = #rep{} = Rep} = State,
+    case couch_replicator_manager:continue(Rep) of
+    {true, _} ->
+        case do_checkpoint(State) of
+        {ok, NewState} ->
+            couch_stats:increment_counter([couch_replicator, checkpoints, success]),
+            {noreply, NewState#rep_state{timer = start_timer(State)}};
+        Error ->
+            couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
+            {stop, Error, State}
+        end;
+    {false, Owner} ->
+        couch_replicator_manager:replication_usurped(Rep, Owner),
+        {stop, shutdown, State}
     end;
 
 handle_cast({report_seq, Seq},
diff --git a/src/couch_replicator.hrl b/src/couch_replicator.hrl
index dbb1793..d3485c0 100644
--- a/src/couch_replicator.hrl
+++ b/src/couch_replicator.hrl
@@ -20,5 +20,6 @@
     user_ctx,
     type = db,
     view = nil,
-    doc_id
+    doc_id,
+    db_name = null
 }).
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index b8644da..ab5590b 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -83,9 +83,20 @@
                     ok
                 end)
         end,
-        send_req(Db, [{method, head}],
-            fun(200, _, _) ->
-                {ok, Db};
+        send_req(Db, [{method, get}],
+            fun(200, _, {Props}) ->
+                UpdateSeq = get_value(<<"update_seq">>, Props),
+                InstanceStart = get_value(<<"instance_start_time">>, Props),
+                case {UpdateSeq, InstanceStart} of
+                    {undefined, _} ->
+                        throw({db_not_found, ?l2b(db_uri(Db))});
+                    {_, undefined} ->
+                        throw({db_not_found, ?l2b(db_uri(Db))});
+                    _ ->
+                        {ok, Db}
+                end;
+            (200, _, _Body) ->
+                 throw({db_not_found, ?l2b(db_uri(Db))});
             (401, _, _) ->
                 throw({unauthorized, ?l2b(db_uri(Db))});
             (_, _, _) ->
@@ -113,7 +124,7 @@
             couch_db:create(DbName, Options)
         end,
         case couch_db:open(DbName, Options) of
-        {error, illegal_database_name, _} ->
+        {error, {illegal_database_name, _}} ->
             throw({db_not_found, DbName});
         {not_found, _Reason} ->
             throw({db_not_found, DbName});
@@ -230,7 +241,7 @@
           (200, Headers, StreamDataFun) ->
             remote_open_doc_revs_streamer_start(Self),
             {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
-                get_value("Content-Type", Headers),
+                header_value("Content-Type", Headers),
                 StreamDataFun,
                 fun mp_parse_mixed/1
             );
@@ -663,7 +674,7 @@
     {started_open_doc_revs, NewRef} ->
         restart_remote_open_doc_revs(Ref, NewRef);
     {headers, Ref, Headers} ->
-        case get_value("content-type", Headers) of
+        case header_value("content-type", Headers) of
         {"multipart/related", _} = ContentType ->
             case couch_doc:doc_from_multi_part_stream(
                 ContentType,
@@ -936,3 +947,15 @@
     receive {data, Ref, Data} ->
         {ok, Data, {LenLeft - iolist_size(Data), Id}}
     end.
+
+header_value(Key, Headers) ->
+    header_value(Key, Headers, undefined).
+
+header_value(Key, Headers, Default) ->
+    Headers1 = [{string:to_lower(K), V} || {K, V} <- Headers],
+    case lists:keyfind(string:to_lower(Key), 1, Headers1) of
+        {_, Value} ->
+            Value;
+        _ ->
+            Default
+    end.
diff --git a/src/couch_replicator_api_wrap.hrl b/src/couch_replicator_api_wrap.hrl
index 1a6f27a..eee04da 100644
--- a/src/couch_replicator_api_wrap.hrl
+++ b/src/couch_replicator_api_wrap.hrl
@@ -17,7 +17,7 @@
     oauth = nil,
     headers = [
         {"Accept", "application/json"},
-        {"User-Agent", "CouchDB/" ++ couch_server:get_version()}
+        {"User-Agent", "CouchDB-Replicator/" ++ couch_server:get_version()}
     ],
     timeout,            % milliseconds
     ibrowse_options = [],
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 9a6213a..8b34e0e 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -27,6 +27,17 @@
 
 -define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
 -define(MAX_WAIT, 5 * 60 * 1000).
+-define(STREAM_STATUS, ibrowse_stream_status).
+
+
+% This limit is for the number of messages we're willing to discard
+% from an HTTP stream in clean_mailbox/1 before killing the worker
+% and returning. The original intent for clean_mailbox was to remove
+% a single message or two if the changes feed returned before fully
+% consuming the request. This threshold gives us confidence we'll
+% continue to properly close changes feeds while avoiding any case
+% where we may end up processing an unbounded number of messages.
+-define(MAX_DISCARDED_MESSAGES, 16).
 
 
 setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
@@ -35,6 +46,7 @@
 
 
 send_req(HttpDb, Params1, Callback) ->
+    put(?STREAM_STATUS, init),
     couch_stats:increment_counter([couch_replicator, requests]),
     Params2 = ?replace(Params1, qs,
         [{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
@@ -94,6 +106,14 @@
 process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, _Cb) ->
     throw({retry, HttpDb, Params});
 
+%% This clause handles un-expected connection closing during pipelined requests.
+%% For example, if server responds to a request, sets Connection: close header
+%% and closes the socket, ibrowse will detect that error when it sends
+%% next request.
+process_response({error, connection_closing}, Worker, HttpDb, Params, _Cb)->
+    ibrowse_http_client:stop(Worker),
+    throw({retry, HttpDb, Params});
+
 process_response({error, {'EXIT',{normal,_}}}, _Worker, HttpDb, Params, _Cb) ->
     % ibrowse worker terminated because remote peer closed the socket
     % -> not an error
@@ -132,6 +152,7 @@
             StreamDataFun = fun() ->
                 stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
             end,
+            put(?STREAM_STATUS, {streaming, Worker}),
             ibrowse:stream_next(ReqId),
             try
                 Ret = Callback(Ok, Headers, StreamDataFun),
@@ -166,16 +187,50 @@
 % on the ibrowse_req_id format. This just drops all
 % messages for the given ReqId on the floor since we're
 % no longer in the HTTP request.
-clean_mailbox({ibrowse_req_id, ReqId}) ->
-    receive
-    {ibrowse_async_response, ReqId, _} ->
-        clean_mailbox({ibrowse_req_id, ReqId});
-    {ibrowse_async_response_end, ReqId} ->
-        clean_mailbox({ibrowse_req_id, ReqId})
-    after 0 ->
-        ok
+
+clean_mailbox(ReqId) ->
+    clean_mailbox(ReqId, ?MAX_DISCARDED_MESSAGES).
+
+
+clean_mailbox(_ReqId, 0) ->
+    case get(?STREAM_STATUS) of
+        {streaming, Worker} ->
+            % We kill workers that continue to stream us
+            % messages after we give up but do *not* exit
+            % our selves. This is because we may be running
+            % as an exception unwinds and we don't want to
+            % change any of that subtle logic.
+            exit(Worker, {timeout, ibrowse_stream_cleanup});
+        _ ->
+            ok
+    end,
+    ok;
+clean_mailbox({ibrowse_req_id, ReqId}, Count) when Count > 0 ->
+    case get(?STREAM_STATUS) of
+        {streaming, Worker} ->
+            ibrowse:stream_next(ReqId),
+            receive
+                {ibrowse_async_response, ReqId, _} ->
+                    clean_mailbox({ibrowse_req_id, ReqId}, Count - 1);
+                {ibrowse_async_response_end, ReqId} ->
+                    put(?STREAM_STATUS, ended),
+                    ok
+                after 30000 ->
+                    exit(Worker, {timeout, ibrowse_stream_cleanup}),
+                    exit({timeout, ibrowse_stream_cleanup})
+            end;
+        Status when Status == init; Status == ended ->
+            receive
+                {ibrowse_async_response, ReqId, _} ->
+                    clean_mailbox({ibrowse_req_id, ReqId}, Count - 1);
+                {ibrowse_async_response_end, ReqId} ->
+                    put(?STREAM_STATUS, ended),
+                    ok
+                after 0 ->
+                    ok
+            end
     end;
-clean_mailbox(_) ->
+clean_mailbox(_, Count) when Count > 0 ->
     ok.
 
 
@@ -222,6 +277,7 @@
         ibrowse:stream_next(ReqId),
         {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
     {Data, ibrowse_async_response_end} ->
+        put(?STREAM_STATUS, ended),
         {Data, fun() -> throw({maybe_retry_req, more_data_expected}) end}
     end.
 
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index d6c88c1..c6f7960 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -12,11 +12,12 @@
 
 -module(couch_replicator_manager).
 -behaviour(gen_server).
--vsn(1).
+-vsn(2).
 -behaviour(config_listener).
 
 % public API
 -export([replication_started/1, replication_completed/2, replication_error/2]).
+-export([continue/1, replication_usurped/2]).
 
 -export([before_doc_update/2, after_doc_read/2]).
 
@@ -28,10 +29,13 @@
 -export([changes_reader/3, changes_reader_cb/3]).
 
 % config_listener callback
--export([handle_config_change/5]).
+-export([handle_config_change/5, handle_config_terminate/3]).
 
 -export([handle_db_event/3]).
 
+%% exported but private
+-export([start_replication/2]).
+
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("mem3/include/mem3.hrl").
 -include("couch_replicator.hrl").
@@ -49,7 +53,6 @@
 -define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
 
 -record(rep_state, {
-    dbname,
     rep,
     starting,
     retries_left,
@@ -65,7 +68,8 @@
     event_listener = nil,
     scan_pid = nil,
     rep_start_pids = [],
-    max_retries
+    max_retries,
+    live = []
 }).
 
 start_link() ->
@@ -76,7 +80,7 @@
     case rep_state(RepId) of
     nil ->
         ok;
-    #rep_state{dbname = DbName, rep = #rep{doc_id = DocId}} ->
+    #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
         update_rep_doc(DbName, DocId, [
             {<<"_replication_state">>, <<"triggered">>},
             {<<"_replication_state_reason">>, undefined},
@@ -92,7 +96,7 @@
     case rep_state(RepId) of
     nil ->
         ok;
-    #rep_state{dbname = DbName, rep = #rep{doc_id = DocId}} ->
+    #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
         update_rep_doc(DbName, DocId, [
             {<<"_replication_state">>, <<"completed">>},
             {<<"_replication_state_reason">>, undefined},
@@ -103,11 +107,22 @@
     end.
 
 
+replication_usurped(#rep{id = RepId}, By) ->
+    case rep_state(RepId) of
+    nil ->
+        ok;
+    #rep_state{rep = #rep{doc_id = DocId}} ->
+        ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
+        couch_log:notice("Replication `~s` usurped by ~s (triggered by document `~s`)",
+            [pp_rep_id(RepId), By, DocId])
+    end.
+
+
 replication_error(#rep{id = {BaseId, _} = RepId}, Error) ->
     case rep_state(RepId) of
     nil ->
         ok;
-    #rep_state{dbname = DbName, rep = #rep{doc_id = DocId}} ->
+    #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
         update_rep_doc(DbName, DocId, [
             {<<"_replication_state">>, <<"error">>},
             {<<"_replication_state_reason">>, to_binary(error_reason(Error))},
@@ -115,6 +130,12 @@
         ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
     end.
 
+continue(#rep{doc_id = null}) ->
+    {true, no_owner};
+continue(#rep{id = RepId}) ->
+    Owner = gen_server:call(?MODULE, {owner, RepId}, infinity),
+    {node() == Owner, Owner}.
+
 
 handle_config_change("replicator", "db", _, _, S) ->
     ok = gen_server:call(S, rep_db_changed),
@@ -125,10 +146,17 @@
 handle_config_change(_, _, _, _, S) ->
     {ok, S}.
 
+handle_config_terminate(_, stop, _) -> ok;
+handle_config_terminate(Self, _, _) ->
+    spawn(fun() ->
+        timer:sleep(5000),
+        config:listen_for_changes(?MODULE, Self)
+    end).
 
 init(_) ->
     process_flag(trap_exit, true),
     net_kernel:monitor_nodes(true),
+    Live = [node() | nodes()],
     ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, public]),
     ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, public]),
     ?DB_TO_SEQ = ets:new(?DB_TO_SEQ, [named_table, set, public]),
@@ -144,9 +172,17 @@
         scan_pid = ScanPid,
         max_retries = retries_value(
             config:get("replicator", "max_replication_retry_count", "10")),
-        rep_start_pids = [Pid]
+        rep_start_pids = [Pid],
+        live = Live
     }}.
 
+handle_call({owner, RepId}, _From, State) ->
+    case rep_state(RepId) of
+    nil ->
+        {reply, nonode, State};
+    #rep_state{rep = #rep{db_name = DbName, doc_id = DocId}} ->
+        {reply, owner(DbName, DocId, State#state.live), State}
+    end;
 
 handle_call({rep_db_update, DbName, {ChangeProps} = Change}, _From, State) ->
     NewState = try
@@ -212,11 +248,15 @@
     couch_log:error("Replication manager received unexpected cast ~p", [Msg]),
     {stop, {error, {unexpected_cast, Msg}}, State}.
 
-handle_info({nodeup, _Node}, State) ->
-    {noreply, rescan(State)};
+handle_info({nodeup, Node}, State) ->
+    couch_log:notice("Rescanning replicator dbs as ~s came up.", [Node]),
+    Live = lists:usort([Node | State#state.live]),
+    {noreply, rescan(State#state{live=Live})};
 
-handle_info({nodedown, _Node}, State) ->
-    {noreply, rescan(State)};
+handle_info({nodedown, Node}, State) ->
+    couch_log:notice("Rescanning replicator dbs ~s went down.", [Node]),
+    Live = State#state.live -- [Node],
+    {noreply, rescan(State#state{live=Live})};
 
 handle_info({'EXIT', From, normal}, #state{scan_pid = From} = State) ->
     couch_log:debug("Background scan has completed.", []),
@@ -250,14 +290,6 @@
     % From a db monitor created by a replication process. Ignore.
     {noreply, State};
 
-handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) ->
-    erlang:send_after(5000, self(), restart_config_listener),
-    {noreply, State};
-
-handle_info(restart_config_listener, State) ->
-    ok = config:listen_for_changes(?MODULE, self()),
-    {noreply, State};
-
 handle_info(shutdown, State) ->
     {stop, shutdown, State};
 
@@ -285,6 +317,8 @@
     couch_event:stop_listener(Listener).
 
 
+code_change(1, State, _Extra) ->
+    {ok, erlang:append_element(State, [node() | nodes()])};
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
@@ -380,13 +414,15 @@
 process_update(State, DbName, {Change}) ->
     {RepProps} = JsonRepDoc = get_json_value(doc, Change),
     DocId = get_json_value(<<"_id">>, RepProps),
-    case {owner(DbName, DocId), get_json_value(deleted, Change, false)} of
+    case {owner(DbName, DocId, State#state.live), get_json_value(deleted, Change, false)} of
     {_, true} ->
         rep_doc_deleted(DbName, DocId),
         State;
-    {false, false} ->
+    {Owner, false} when Owner /= node() ->
+        couch_log:notice("Not starting '~s' as owner is ~s.", [DocId, Owner]),
         State;
-    {true, false} ->
+    {_Owner, false} ->
+        couch_log:notice("Maybe starting '~s' as I'm the owner", [DocId]),
         case get_json_value(<<"_replication_state">>, RepProps) of
         undefined ->
             maybe_start_replication(State, DbName, DocId, JsonRepDoc);
@@ -405,13 +441,12 @@
         end
     end.
 
-owner(<<"shards/", _/binary>> = DbName, DocId) ->
-    Live = [node()|nodes()],
+owner(<<"shards/", _/binary>> = DbName, DocId, Live) ->
     Nodes = lists:sort([N || #shard{node=N} <- mem3:shards(mem3:dbname(DbName), DocId),
 			     lists:member(N, Live)]),
-    node() =:= hd(mem3_util:rotate_list({DbName, DocId}, Nodes));
-owner(_DbName, _DocId) ->
-    true.
+    hd(mem3_util:rotate_list({DbName, DocId}, Nodes));
+owner(_DbName, _DocId, _Live) ->
+    node().
 
 rep_db_update_error(Error, DbName, DocId) ->
     case Error of
@@ -439,11 +474,11 @@
 
 
 maybe_start_replication(State, DbName, DocId, RepDoc) ->
-    #rep{id = {BaseId, _} = RepId} = Rep = parse_rep_doc(RepDoc),
+    #rep{id = {BaseId, _} = RepId} = Rep0 = parse_rep_doc(RepDoc),
+    Rep = Rep0#rep{db_name = DbName},
     case rep_state(RepId) of
     nil ->
         RepState = #rep_state{
-            dbname = DbName,
             rep = Rep,
             starting = true,
             retries_left = State#state.max_retries,
@@ -453,16 +488,23 @@
         true = ets:insert(?DOC_TO_REP, {{DbName, DocId}, RepId}),
         couch_log:notice("Attempting to start replication `~s` (document `~s`).",
             [pp_rep_id(RepId), DocId]),
-        Pid = spawn_link(fun() -> start_replication(Rep, 0) end),
+        StartDelaySecs = erlang:max(0,
+            config:get_integer("replicator", "start_delay", 10)),
+        StartSplaySecs = erlang:max(1,
+            config:get_integer("replicator", "start_splay", 50)),
+        DelaySecs = StartDelaySecs + random:uniform(StartSplaySecs),
+        couch_log:notice("Delaying replication `~s` start by ~p seconds.",
+            [pp_rep_id(RepId), DelaySecs]),
+        Pid = spawn_link(?MODULE, start_replication, [Rep, DelaySecs]),
         State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
     #rep_state{rep = #rep{doc_id = DocId}} ->
         State;
-    #rep_state{starting = false, dbname = DbName, rep = #rep{doc_id = OtherDocId}} ->
+    #rep_state{starting = false, rep = #rep{db_name = DbName, doc_id = OtherDocId}} ->
         couch_log:notice("The replication specified by the document `~s` was already"
             " triggered by the document `~s`", [DocId, OtherDocId]),
         maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)),
         State;
-    #rep_state{starting = true, dbname = DbName, rep = #rep{doc_id = OtherDocId}} ->
+    #rep_state{starting = true, rep = #rep{db_name = DbName, doc_id = OtherDocId}} ->
         couch_log:notice("The replication specified by the document `~s` is already"
             " being triggered by the document `~s`", [DocId, OtherDocId]),
         maybe_tag_rep_doc(DbName, DocId, RepDoc, ?l2b(BaseId)),
@@ -549,8 +591,7 @@
 
 maybe_retry_replication(#rep_state{retries_left = 0} = RepState, Error, State) ->
     #rep_state{
-        dbname = DbName,
-        rep = #rep{id = RepId, doc_id = DocId},
+        rep = #rep{id = RepId, doc_id = DocId, db_name = DbName},
         max_retries = MaxRetries
     } = RepState,
     couch_replicator:cancel_replication(RepId),
@@ -570,7 +611,7 @@
     couch_log:error("Error in replication `~s` (triggered by document `~s`): ~s"
         "~nRestarting replication in ~p seconds.",
         [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]),
-    Pid = spawn_link(fun() -> start_replication(Rep, Wait) end),
+    Pid = spawn_link(?MODULE, start_replication, [Rep, Wait]),
     State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}.
 
 
@@ -598,11 +639,14 @@
     ets:delete(?DB_TO_SEQ,DbName).
 
 
-update_rep_doc(RepDbName, RepDocId, KVs) when is_binary(RepDocId) ->
+update_rep_doc(RepDbName, RepDocId, KVs) ->
+    update_rep_doc(RepDbName, RepDocId, KVs, 1).
+
+update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) ->
     try
         case open_rep_doc(RepDbName, RepDocId) of
             {ok, LastRepDoc} ->
-                update_rep_doc(RepDbName, LastRepDoc, KVs);
+                update_rep_doc(RepDbName, LastRepDoc, KVs, Wait * 2);
             _ ->
                 ok
         end
@@ -610,10 +654,10 @@
         throw:conflict ->
             Msg = "Conflict when updating replication document `~s`. Retrying.",
             couch_log:error(Msg, [RepDocId]),
-            ok = timer:sleep(5),
-            update_rep_doc(RepDbName, RepDocId, KVs)
+            ok = timer:sleep(random:uniform(erlang:min(128, Wait)) * 100),
+            update_rep_doc(RepDbName, RepDocId, KVs, Wait * 2)
     end;
-update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
+update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
     NewRepDocBody = lists:foldl(
         fun({K, undefined}, Body) ->
                 lists:keydelete(K, 1, Body);
diff --git a/src/couch_replicator_utils.erl b/src/couch_replicator_utils.erl
index 30afb39..9fc42df 100644
--- a/src/couch_replicator_utils.erl
+++ b/src/couch_replicator_utils.erl
@@ -130,7 +130,7 @@
             [filter_code(Filter, Source, UserCtx),
                 get_value(query_params, Options, {[]})]
         end,
-    couch_util:to_hex(couch_util:md5(term_to_binary(Base2))).
+    couch_util:to_hex(couch_crypto:hash(md5, term_to_binary(Base2))).
 
 
 filter_code(Filter, Source, UserCtx) ->
@@ -345,14 +345,14 @@
             config:get("replicator", "ssl_certificate_max_depth", "3")
         ),
         VerifyCerts = config:get("replicator", "verify_ssl_certificates"),
-        CertFile = config:get("replicator", "cert_file", nil),
-        KeyFile = config:get("replicator", "key_file", nil),
-        Password = config:get("replicator", "password", nil),
+        CertFile = config:get("replicator", "cert_file", undefined),
+        KeyFile = config:get("replicator", "key_file", undefined),
+        Password = config:get("replicator", "password", undefined),
         SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
-        SslOpts1 = case CertFile /= nil andalso KeyFile /= nil of
+        SslOpts1 = case CertFile /= undefined andalso KeyFile /= undefined of
             true ->
                 case Password of
-                    nil ->
+                    undefined ->
                         [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
                     _ ->
                         [{certfile, CertFile}, {keyfile, KeyFile},
diff --git a/test/couch_replicator_compact_tests.erl b/test/couch_replicator_compact_tests.erl
index 8378567..459e42a 100644
--- a/test/couch_replicator_compact_tests.erl
+++ b/test/couch_replicator_compact_tests.erl
@@ -23,11 +23,9 @@
 -define(TIMEOUT_WRITER, 3000).
 -define(TIMEOUT_EUNIT, ?TIMEOUT div 1000 + 5).
 
--ifdef(run_broken_tests).
-
 setup() ->
     DbName = ?tempdb(),
-    {ok, Db} = couch_db:create(DbName, [?ADMIN_USER]),
+    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
     ok = couch_db:close(Db),
     DbName.
 
@@ -36,22 +34,22 @@
 setup(remote) ->
     {remote, setup()};
 setup({A, B}) ->
-    ok = test_util:start_couch(),
+    Ctx = test_util:start_couch([couch_replicator]),
     Source = setup(A),
     Target = setup(B),
-    {Source, Target}.
+    {Ctx, {Source, Target}}.
 
 teardown({remote, DbName}) ->
     teardown(DbName);
 teardown(DbName) ->
-    ok = couch_server:delete(DbName, [?ADMIN_USER]),
+    ok = couch_server:delete(DbName, [?ADMIN_CTX]),
     ok.
 
-teardown(_, {Source, Target}) ->
+teardown(_, {Ctx, {Source, Target}}) ->
     teardown(Source),
     teardown(Target),
-
-    ok = test_util:stop_couch().
+    ok = application:stop(couch_replicator),
+    ok = test_util:stop_couch(Ctx).
 
 compact_test_() ->
     Pairs = [{local, local}, {local, remote},
@@ -67,7 +65,7 @@
     }.
 
 
-should_populate_replicate_compact({From, To}, {Source, Target}) ->
+should_populate_replicate_compact({From, To}, {_Ctx, {Source, Target}}) ->
     {ok, RepPid, RepId} = replicate(Source, Target),
     {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
      {inorder, [
@@ -110,6 +108,7 @@
     end,
     FullRepId = ?l2b(BaseId ++ Ext),
     Pid = ?l2b(pid_to_list(RepPid)),
+    ok = wait_for_replicator(RepPid),
     [RepTask] = couch_task_status:all(),
     ?assertEqual(Pid, couch_util:get_value(pid, RepTask)),
     ?assertEqual(FullRepId, couch_util:get_value(replication_id, RepTask)),
@@ -123,9 +122,15 @@
     ?assert(is_integer(couch_util:get_value(missing_revisions_found, RepTask))),
     ?assert(is_integer(couch_util:get_value(checkpointed_source_seq, RepTask))),
     ?assert(is_integer(couch_util:get_value(source_seq, RepTask))),
-    Progress = couch_util:get_value(progress, RepTask),
-    ?assert(is_integer(Progress)),
-    ?assert(Progress =< 100).
+    Pending = couch_util:get_value(changes_pending, RepTask),
+    ?assert(is_integer(Pending)).
+
+wait_for_replicator(Pid) ->
+    %% since replicator started asynchronously
+    %% we need to wait when it would be in couch_task_status
+    %% we query replicator:details to ensure that do_init happen
+    ?assertMatch({ok, _}, couch_replicator:details(Pid)),
+    ok.
 
 should_cancel_replication(RepId, RepPid) ->
     ?_assertNot(begin
@@ -146,12 +151,12 @@
                 compact_db("source", SourceDb),
                 ?assert(is_process_alive(RepPid)),
                 ?assert(is_process_alive(SourceDb#db.main_pid)),
-                check_ref_counter("source", SourceDb),
+                wait_for_compaction("source", SourceDb),
 
                 compact_db("target", TargetDb),
                 ?assert(is_process_alive(RepPid)),
                 ?assert(is_process_alive(TargetDb#db.main_pid)),
-                check_ref_counter("target", TargetDb),
+                wait_for_compaction("target", TargetDb),
 
                 {ok, SourceDb2} = reopen_db(SourceDb),
                 {ok, TargetDb2} = reopen_db(TargetDb),
@@ -163,14 +168,14 @@
                 ?assert(is_process_alive(RepPid)),
                 ?assert(is_process_alive(SourceDb2#db.main_pid)),
                 pause_writer(Writer),
-                check_ref_counter("source", SourceDb2),
+                wait_for_compaction("source", SourceDb2),
                 resume_writer(Writer),
 
                 compact_db("target", TargetDb2),
                 ?assert(is_process_alive(RepPid)),
                 ?assert(is_process_alive(TargetDb2#db.main_pid)),
                 pause_writer(Writer),
-                check_ref_counter("target", TargetDb2),
+                wait_for_compaction("target", TargetDb2),
                 resume_writer(Writer)
             end, lists:seq(1, Rounds)),
         stop_writer(Writer)
@@ -275,22 +280,17 @@
     end,
     ok = couch_db:close(Db).
 
-check_ref_counter(Type, #db{name = Name, fd_ref_counter = OldRefCounter}) ->
-    MonRef = erlang:monitor(process, OldRefCounter),
-    receive
-        {'DOWN', MonRef, process, OldRefCounter, _} ->
-            ok
-        after ?TIMEOUT ->
+wait_for_compaction(Type, Db) ->
+    case couch_db:wait_for_compaction(Db) of
+        ok ->
+            ok;
+        {error, Reason} ->
             erlang:error(
                 {assertion_failed,
                  [{module, ?MODULE}, {line, ?LINE},
-                  {reason, lists:concat(["Old ", Type,
-                                         " database ref counter didn't"
-                                         " terminate"])}]})
-    end,
-    {ok, #db{fd_ref_counter = NewRefCounter} = Db} = couch_db:open_int(Name, []),
-    ok = couch_db:close(Db),
-    ?assertNotEqual(OldRefCounter, NewRefCounter).
+                  {reason, lists:concat(["Compaction of", Type,
+                                         " database failed with: ", Reason])}]})
+    end.
 
 db_url(DbName) ->
     iolist_to_binary([
@@ -311,7 +311,7 @@
         {<<"target">>, Target},
         {<<"continuous">>, true}
     ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_ROLE),
+    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
     {ok, Pid} = couch_replicator:async_replicate(Rep),
     {ok, Pid, Rep#rep.id}.
 
@@ -437,5 +437,3 @@
     after 0 ->
         ok
     end.
-
--endif.
diff --git a/test/couch_replicator_large_atts_tests.erl b/test/couch_replicator_large_atts_tests.erl
index e8c0d1a..67f4d75 100644
--- a/test/couch_replicator_large_atts_tests.erl
+++ b/test/couch_replicator_large_atts_tests.erl
@@ -15,6 +15,12 @@
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
+-import(couch_replicator_test_helper, [
+    db_url/1,
+    replicate/2,
+    compare_dbs/2
+]).
+
 -define(ATT_SIZE_1, 2 * 1024 * 1024).
 -define(ATT_SIZE_2, round(6.6 * 1024 * 1024)).
 -define(DOCS_COUNT, 11).
@@ -24,7 +30,7 @@
 
 setup() ->
     DbName = ?tempdb(),
-    {ok, Db} = couch_db:create(DbName, [?ADMIN_USER]),
+    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
     ok = couch_db:close(Db),
     DbName.
 
@@ -33,23 +39,24 @@
 setup(remote) ->
     {remote, setup()};
 setup({A, B}) ->
-    ok = test_util:start_couch(),
+    Ctx = test_util:start_couch([couch_replicator]),
     config:set("attachments", "compressible_types", "text/*", false),
     Source = setup(A),
     Target = setup(B),
-    {Source, Target}.
+    {Ctx, {Source, Target}}.
 
 teardown({remote, DbName}) ->
     teardown(DbName);
 teardown(DbName) ->
-    ok = couch_server:delete(DbName, [?ADMIN_USER]),
+    ok = couch_server:delete(DbName, [?ADMIN_CTX]),
     ok.
 
-teardown(_, {Source, Target}) ->
+teardown(_, {Ctx, {Source, Target}}) ->
     teardown(Source),
     teardown(Target),
 
-    ok = test_util:stop_couch().
+    ok = application:stop(couch_replicator),
+    ok = test_util:stop_couch(Ctx).
 
 large_atts_test_() ->
     Pairs = [{local, local}, {local, remote},
@@ -65,7 +72,7 @@
     }.
 
 
-should_populate_replicate_compact({From, To}, {Source, Target}) ->
+should_populate_replicate_compact({From, To}, {_Ctx, {Source, Target}}) ->
     {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
      {inorder, [should_populate_source(Source),
                 should_replicate(Source, Target),
@@ -109,99 +116,10 @@
     {ok, _} = couch_db:update_docs(Db, Docs, []),
     couch_db:close(Db).
 
-compare_dbs(Source, Target) ->
-    {ok, SourceDb} = couch_db:open_int(Source, []),
-    {ok, TargetDb} = couch_db:open_int(Target, []),
-
-    Fun = fun(FullDocInfo, _, Acc) ->
-        {ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo),
-        Id = DocSource#doc.id,
-
-        {ok, DocTarget} = couch_db:open_doc(TargetDb, Id),
-        ?assertEqual(DocSource#doc.body, DocTarget#doc.body),
-
-        #doc{atts = SourceAtts} = DocSource,
-        #doc{atts = TargetAtts} = DocTarget,
-        ?assertEqual(lists:sort([N || #att{name = N} <- SourceAtts]),
-                     lists:sort([N || #att{name = N} <- TargetAtts])),
-
-        FunCompareAtts = fun(#att{name = AttName} = Att) ->
-            {ok, AttTarget} = find_att(TargetAtts, AttName),
-            SourceMd5 = att_md5(Att),
-            TargetMd5 = att_md5(AttTarget),
-            case AttName of
-                <<"att1">> ->
-                    ?assertEqual(gzip, Att#att.encoding),
-                    ?assertEqual(gzip, AttTarget#att.encoding),
-                    DecSourceMd5 = att_decoded_md5(Att),
-                    DecTargetMd5 = att_decoded_md5(AttTarget),
-                    ?assertEqual(DecSourceMd5, DecTargetMd5);
-                _ ->
-                    ?assertEqual(identity, Att#att.encoding),
-                    ?assertEqual(identity, AttTarget#att.encoding)
-            end,
-            ?assertEqual(SourceMd5, TargetMd5),
-            ?assert(is_integer(Att#att.disk_len)),
-            ?assert(is_integer(Att#att.att_len)),
-            ?assert(is_integer(AttTarget#att.disk_len)),
-            ?assert(is_integer(AttTarget#att.att_len)),
-            ?assertEqual(Att#att.disk_len, AttTarget#att.disk_len),
-            ?assertEqual(Att#att.att_len, AttTarget#att.att_len),
-            ?assertEqual(Att#att.type, AttTarget#att.type),
-            ?assertEqual(Att#att.md5, AttTarget#att.md5)
-        end,
-
-        lists:foreach(FunCompareAtts, SourceAtts),
-
-        {ok, Acc}
-    end,
-
-    {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
-    ok = couch_db:close(SourceDb),
-    ok = couch_db:close(TargetDb).
-
 att(Name, Size, Type) ->
-    #att{
-        name = Name,
-        type = Type,
-        att_len = Size,
-        data = fun(Count) -> crypto:rand_bytes(Count) end
-    }.
-
-find_att([], _Name) ->
-    nil;
-find_att([#att{name = Name} = Att | _], Name) ->
-    {ok, Att};
-find_att([_ | Rest], Name) ->
-    find_att(Rest, Name).
-
-att_md5(Att) ->
-    Md50 = couch_doc:att_foldl(
-        Att,
-        fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end,
-        couch_util:md5_init()),
-    couch_util:md5_final(Md50).
-
-att_decoded_md5(Att) ->
-    Md50 = couch_doc:att_foldl_decode(
-        Att,
-        fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end,
-        couch_util:md5_init()),
-    couch_util:md5_final(Md50).
-
-db_url(DbName) ->
-    iolist_to_binary([
-        "http://", config:get("httpd", "bind_address", "127.0.0.1"),
-        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
-        "/", DbName
+    couch_att:new([
+        {name, Name},
+        {type, Type},
+        {att_len, Size},
+        {data, fun(Count) -> crypto:rand_bytes(Count) end}
     ]).
-
-replicate(Source, Target) ->
-    RepObject = {[{<<"source">>, Source}, {<<"target">>, Target}]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_ROLE),
-    {ok, Pid} = couch_replicator:async_replicate(Rep),
-    MonRef = erlang:monitor(process, Pid),
-    receive
-        {'DOWN', MonRef, process, Pid, _} ->
-            ok
-    end.
diff --git a/test/couch_replicator_many_leaves_tests.erl b/test/couch_replicator_many_leaves_tests.erl
index 04d21ac..40ca611 100644
--- a/test/couch_replicator_many_leaves_tests.erl
+++ b/test/couch_replicator_many_leaves_tests.erl
@@ -29,7 +29,7 @@
 
 setup() ->
     DbName = ?tempdb(),
-    {ok, Db} = couch_db:create(DbName, [?ADMIN_USER]),
+    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
     ok = couch_db:close(Db),
     DbName.
 
@@ -38,22 +38,22 @@
 setup(remote) ->
     {remote, setup()};
 setup({A, B}) ->
-    ok = test_util:start_couch(),
+    Ctx = test_util:start_couch([couch_replicator]),
     Source = setup(A),
     Target = setup(B),
-    {Source, Target}.
+    {Ctx, {Source, Target}}.
 
 teardown({remote, DbName}) ->
     teardown(DbName);
 teardown(DbName) ->
-    ok = couch_server:delete(DbName, [?ADMIN_USER]),
+    ok = couch_server:delete(DbName, [?ADMIN_CTX]),
     ok.
 
-teardown(_, {Source, Target}) ->
+teardown(_, {Ctx, {Source, Target}}) ->
     teardown(Source),
     teardown(Target),
-
-    ok = test_util:stop_couch().
+    ok = application:stop(couch_replicator),
+    ok = test_util:stop_couch(Ctx).
 
 docs_with_many_leaves_test_() ->
     Pairs = [{local, local}, {local, remote},
@@ -69,7 +69,7 @@
     }.
 
 
-should_populate_replicate_compact({From, To}, {Source, Target}) ->
+should_populate_replicate_compact({From, To}, {_Ctx, {Source, Target}}) ->
     {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
      {inorder, [
         should_populate_source(Source),
@@ -137,7 +137,7 @@
 
 add_doc_siblings(Db, DocId, NumLeaves, AccDocs, AccRevs) ->
     Value = ?l2b(?i2l(NumLeaves)),
-    Rev = couch_util:md5(Value),
+    Rev = couch_crypto:hash(md5, Value),
     Doc = #doc{
         id = DocId,
         revs = {1, [Rev]},
@@ -183,13 +183,13 @@
         fun(#doc{atts = Atts, revs = {Pos, [Rev | _]}} = Doc, Acc) ->
             NewAtts = lists:foldl(fun(I, AttAcc) ->
                 AttData = crypto:rand_bytes(100),
-                NewAtt = #att{
-                    name = ?io2b(["att_", ?i2l(I), "_",
-                                  couch_doc:rev_to_str({Pos, Rev})]),
-                    type = <<"application/foobar">>,
-                    att_len = byte_size(AttData),
-                    data = AttData
-                },
+                NewAtt = couch_att:new([
+                    {name, ?io2b(["att_", ?i2l(I), "_",
+                        couch_doc:rev_to_str({Pos, Rev})])},
+                    {type, <<"application/foobar">>},
+                    {att_len, byte_size(AttData)},
+                    {data, AttData}
+                ]),
                 [NewAtt | AttAcc]
             end, [], lists:seq(1, NumAtts)),
             [Doc#doc{atts = Atts ++ NewAtts} | Acc]
@@ -212,7 +212,7 @@
         {<<"source">>, Source},
         {<<"target">>, Target}
     ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_ROLE),
+    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
     {ok, Pid} = couch_replicator:async_replicate(Rep),
     MonRef = erlang:monitor(process, Pid),
     receive
diff --git a/test/couch_replicator_missing_stubs_tests.erl b/test/couch_replicator_missing_stubs_tests.erl
index 0bf2f6a..034ca42 100644
--- a/test/couch_replicator_missing_stubs_tests.erl
+++ b/test/couch_replicator_missing_stubs_tests.erl
@@ -15,16 +15,19 @@
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
+-import(couch_replicator_test_helper, [
+    db_url/1,
+    replicate/2,
+    compare_dbs/2
+]).
+
 -define(REVS_LIMIT, 3).
 -define(TIMEOUT_STOP, 1000).
 -define(TIMEOUT_EUNIT, 30).
 
-
--ifdef(run_broken_tests).
-
 setup() ->
     DbName = ?tempdb(),
-    {ok, Db} = couch_db:create(DbName, [?ADMIN_USER]),
+    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
     ok = couch_db:close(Db),
     DbName.
 
@@ -33,22 +36,22 @@
 setup(remote) ->
     {remote, setup()};
 setup({A, B}) ->
-    ok = test_util:start_couch(),
+    Ctx = test_util:start_couch([couch_replicator]),
     Source = setup(A),
     Target = setup(B),
-    {Source, Target}.
+    {Ctx, {Source, Target}}.
 
 teardown({remote, DbName}) ->
     teardown(DbName);
 teardown(DbName) ->
-    ok = couch_server:delete(DbName, [?ADMIN_USER]),
+    ok = couch_server:delete(DbName, [?ADMIN_CTX]),
     ok.
 
-teardown(_, {Source, Target}) ->
+teardown(_, {Ctx, {Source, Target}}) ->
     teardown(Source),
     teardown(Target),
-
-    ok = test_util:stop_couch().
+    ok = application:stop(couch_replicator),
+    ok = test_util:stop_couch(Ctx).
 
 missing_stubs_test_() ->
     Pairs = [{local, local}, {local, remote},
@@ -64,7 +67,7 @@
     }.
 
 
-should_replicate_docs_with_missed_att_stubs({From, To}, {Source, Target}) ->
+should_replicate_docs_with_missed_att_stubs({From, To}, {_Ctx, {Source, Target}}) ->
     {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
      {inorder, [
         should_populate_source(Source),
@@ -92,7 +95,7 @@
     should_set_target_revs_limit(Target, RevsLimit);
 should_set_target_revs_limit(Target, RevsLimit) ->
     ?_test(begin
-        {ok, Db} = couch_db:open_int(Target, [?ADMIN_USER]),
+        {ok, Db} = couch_db:open_int(Target, [?ADMIN_CTX]),
         ?assertEqual(ok, couch_db:set_revs_limit(Db, RevsLimit)),
         ok = couch_db:close(Db)
     end).
@@ -116,12 +119,12 @@
     Doc = #doc{
         id = <<"doc1">>,
         atts = [
-            #att{
-                name = <<"doc1_att1">>,
-                type = <<"application/foobar">>,
-                att_len = byte_size(AttData),
-                data = AttData
-            }
+            couch_att:new([
+                {name, <<"doc1_att1">>},
+                {type, <<"application/foobar">>},
+                {att_len, byte_size(AttData)},
+                {data, AttData}
+           ])
         ]
     },
     {ok, _} = couch_db:update_doc(Db, Doc, []),
@@ -153,102 +156,3 @@
         lists:seq(1, Times)),
     ok = couch_db:close(Db),
     {ok, {DbName, Times}}.
-
-compare_dbs(Source, Target) ->
-    {ok, SourceDb} = couch_db:open_int(Source, []),
-    {ok, TargetDb} = couch_db:open_int(Target, []),
-
-    Fun = fun(FullDocInfo, _, Acc) ->
-        {ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo,
-                                            [conflicts, deleted_conflicts]),
-        Id = DocSource#doc.id,
-
-        {ok, DocTarget} = couch_db:open_doc(TargetDb, Id,
-                                            [conflicts, deleted_conflicts]),
-        ?assertEqual(DocSource#doc.body, DocTarget#doc.body),
-
-        ?assertEqual(couch_doc:to_json_obj(DocSource, []),
-                     couch_doc:to_json_obj(DocTarget, [])),
-
-        #doc{atts = SourceAtts} = DocSource,
-        #doc{atts = TargetAtts} = DocTarget,
-        ?assertEqual(lists:sort([N || #att{name = N} <- SourceAtts]),
-                     lists:sort([N || #att{name = N} <- TargetAtts])),
-
-        lists:foreach(
-            fun(#att{name = AttName} = Att) ->
-                {ok, AttTarget} = find_att(TargetAtts, AttName),
-                SourceMd5 = att_md5(Att),
-                TargetMd5 = att_md5(AttTarget),
-                case AttName of
-                    <<"att1">> ->
-                        ?assertEqual(gzip, Att#att.encoding),
-                        ?assertEqual(gzip, AttTarget#att.encoding),
-                        DecSourceMd5 = att_decoded_md5(Att),
-                        DecTargetMd5 = att_decoded_md5(AttTarget),
-                        ?assertEqual(DecSourceMd5, DecTargetMd5);
-                    _ ->
-                        ?assertEqual(identity, Att#att.encoding),
-                        ?assertEqual(identity, AttTarget#att.encoding)
-                end,
-                ?assertEqual(SourceMd5, TargetMd5),
-                ?assert(is_integer(Att#att.disk_len)),
-                ?assert(is_integer(Att#att.att_len)),
-                ?assert(is_integer(AttTarget#att.disk_len)),
-                ?assert(is_integer(AttTarget#att.att_len)),
-                ?assertEqual(Att#att.disk_len, AttTarget#att.disk_len),
-                ?assertEqual(Att#att.att_len, AttTarget#att.att_len),
-                ?assertEqual(Att#att.type, AttTarget#att.type),
-                ?assertEqual(Att#att.md5, AttTarget#att.md5)
-            end,
-            SourceAtts),
-        {ok, Acc}
-    end,
-
-    {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
-    ok = couch_db:close(SourceDb),
-    ok = couch_db:close(TargetDb).
-
-find_att([], _Name) ->
-    nil;
-find_att([#att{name = Name} = Att | _], Name) ->
-    {ok, Att};
-find_att([_ | Rest], Name) ->
-    find_att(Rest, Name).
-
-att_md5(Att) ->
-    Md50 = couch_doc:att_foldl(
-        Att,
-        fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end,
-        couch_util:md5_init()),
-    couch_util:md5_final(Md50).
-
-att_decoded_md5(Att) ->
-    Md50 = couch_doc:att_foldl_decode(
-        Att,
-        fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end,
-        couch_util:md5_init()),
-    couch_util:md5_final(Md50).
-
-db_url(DbName) ->
-    iolist_to_binary([
-        "http://", config:get("httpd", "bind_address", "127.0.0.1"),
-        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
-        "/", DbName
-    ]).
-
-replicate(Source, Target) ->
-    RepObject = {[
-        {<<"source">>, Source},
-        {<<"target">>, Target}
-    ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_ROLE),
-    {ok, Pid} = couch_replicator:async_replicate(Rep),
-    MonRef = erlang:monitor(process, Pid),
-    receive
-        {'DOWN', MonRef, process, Pid, _} ->
-            ok
-    end.
-
--endif.
-
diff --git a/test/couch_replicator_test_helper.erl b/test/couch_replicator_test_helper.erl
new file mode 100644
index 0000000..5b9d366
--- /dev/null
+++ b/test/couch_replicator_test_helper.erl
@@ -0,0 +1,107 @@
+-module(couch_replicator_test_helper).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-export([compare_dbs/2, db_url/1, replicate/2]).
+
+compare_dbs(Source, Target) ->
+    {ok, SourceDb} = couch_db:open_int(Source, []),
+    {ok, TargetDb} = couch_db:open_int(Target, []),
+
+    Fun = fun(FullDocInfo, _, Acc) ->
+        {ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo),
+        Id = DocSource#doc.id,
+
+        {ok, DocTarget} = couch_db:open_doc(TargetDb, Id),
+        ?assertEqual(DocSource#doc.body, DocTarget#doc.body),
+
+        #doc{atts = SourceAtts} = DocSource,
+        #doc{atts = TargetAtts} = DocTarget,
+        ?assertEqual(lists:sort([couch_att:fetch(name, Att) || Att <- SourceAtts]),
+                     lists:sort([couch_att:fetch(name, Att) || Att <- TargetAtts])),
+
+        FunCompareAtts = fun(Att) ->
+            AttName = couch_att:fetch(name, Att),
+            {ok, AttTarget} = find_att(TargetAtts, AttName),
+            SourceMd5 = att_md5(Att),
+            TargetMd5 = att_md5(AttTarget),
+            case AttName of
+                <<"att1">> ->
+                    ?assertEqual(gzip, couch_att:fetch(encoding, Att)),
+                    ?assertEqual(gzip, couch_att:fetch(encoding, AttTarget)),
+                    DecSourceMd5 = att_decoded_md5(Att),
+                    DecTargetMd5 = att_decoded_md5(AttTarget),
+                    ?assertEqual(DecSourceMd5, DecTargetMd5);
+                _ ->
+                    ?assertEqual(identity, couch_att:fetch(encoding, AttTarget)),
+                    ?assertEqual(identity, couch_att:fetch(encoding, AttTarget))
+            end,
+            ?assertEqual(SourceMd5, TargetMd5),
+            ?assert(is_integer(couch_att:fetch(disk_len, Att))),
+            ?assert(is_integer(couch_att:fetch(att_len, Att))),
+            ?assert(is_integer(couch_att:fetch(disk_len, AttTarget))),
+            ?assert(is_integer(couch_att:fetch(att_len, AttTarget))),
+            ?assertEqual(couch_att:fetch(disk_len, Att),
+                         couch_att:fetch(disk_len, AttTarget)),
+            ?assertEqual(couch_att:fetch(att_len, Att),
+                         couch_att:fetch(att_len, AttTarget)),
+            ?assertEqual(couch_att:fetch(type, Att),
+                         couch_att:fetch(type, AttTarget)),
+            ?assertEqual(couch_att:fetch(md5, Att),
+                         couch_att:fetch(md5, AttTarget))
+        end,
+
+        lists:foreach(FunCompareAtts, SourceAtts),
+
+        {ok, Acc}
+    end,
+
+    {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
+    ok = couch_db:close(SourceDb),
+    ok = couch_db:close(TargetDb).
+
+find_att([], _Name) ->
+    nil;
+find_att([Att | Rest], Name) ->
+    case couch_att:fetch(name, Att) of
+        Name ->
+            {ok, Att};
+        _ ->
+            find_att(Rest, Name)
+    end.
+
+
+att_md5(Att) ->
+    Md50 = couch_att:foldl(
+        Att,
+        fun(Chunk, Acc) -> couch_crypto:hash_update(md5, Acc, Chunk) end,
+        couch_crypto:hash_init(md5)),
+    couch_crypto:hash_final(md5, Md50).
+
+att_decoded_md5(Att) ->
+    Md50 = couch_att:foldl_decode(
+        Att,
+        fun(Chunk, Acc) -> couch_crypto:hash_update(md5, Acc, Chunk) end,
+        couch_crypto:hash_init(md5)),
+    couch_crypto:hash_final(md5, Md50).
+
+db_url(DbName) ->
+    iolist_to_binary([
+        "http://", config:get("httpd", "bind_address", "127.0.0.1"),
+        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
+        "/", DbName
+    ]).
+
+replicate(Source, Target) ->
+    RepObject = {[
+        {<<"source">>, Source},
+        {<<"target">>, Target}
+    ]},
+    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+    {ok, Pid} = couch_replicator:async_replicate(Rep),
+    MonRef = erlang:monitor(process, Pid),
+    receive
+        {'DOWN', MonRef, process, Pid, _} ->
+            ok
+    end.
diff --git a/test/couch_replicator_use_checkpoints_tests.erl b/test/couch_replicator_use_checkpoints_tests.erl
index 5b5668d..7b7a55a 100644
--- a/test/couch_replicator_use_checkpoints_tests.erl
+++ b/test/couch_replicator_use_checkpoints_tests.erl
@@ -43,7 +43,7 @@
 
 setup() ->
     DbName = ?tempdb(),
-    {ok, Db} = couch_db:create(DbName, [?ADMIN_USER]),
+    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
     ok = couch_db:close(Db),
     DbName.
 
@@ -52,24 +52,25 @@
 setup(remote) ->
     {remote, setup()};
 setup({_, Fun, {A, B}}) ->
-    ok = test_util:start_couch(),
+    Ctx = test_util:start_couch([couch_replicator]),
     {ok, Listener} = couch_replicator_notifier:start_link(Fun),
     Source = setup(A),
     Target = setup(B),
-    {Source, Target, Listener}.
+    {Ctx, {Source, Target, Listener}}.
 
 teardown({remote, DbName}) ->
     teardown(DbName);
 teardown(DbName) ->
-    ok = couch_server:delete(DbName, [?ADMIN_USER]),
+    ok = couch_server:delete(DbName, [?ADMIN_CTX]),
     ok.
 
-teardown(_, {Source, Target, Listener}) ->
+teardown(_, {Ctx, {Source, Target, Listener}}) ->
     teardown(Source),
     teardown(Target),
 
     couch_replicator_notifier:stop(Listener),
-    ok = test_util:stop_couch().
+    ok = application:stop(couch_replicator),
+    ok = test_util:stop_couch(Ctx).
 
 use_checkpoints_test_() ->
     {
@@ -95,7 +96,7 @@
         }
     }.
 
-should_test_checkpoints({UseCheckpoints, _, {From, To}}, {Source, Target, _}) ->
+should_test_checkpoints({UseCheckpoints, _, {From, To}}, {_Ctx, {Source, Target, _}}) ->
     should_test_checkpoints(UseCheckpoints, {From, To}, {Source, Target}).
 should_test_checkpoints(UseCheckpoints, {From, To}, {Source, Target}) ->
     {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
@@ -180,7 +181,7 @@
         {<<"target">>, Target},
         {<<"use_checkpoints">>, UseCheckpoints}
     ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_ROLE),
+    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
     {ok, Pid} = couch_replicator:async_replicate(Rep),
     MonRef = erlang:monitor(process, Pid),
     receive