Merge branch '3010-port-429' into apache

COUCHDB-3010
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index d20260f..e5f6253 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -487,33 +487,42 @@
         JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}),
         {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2}
     end,
-    send_req(
-        HttpDb,
-        [{method, Method}, {path, "_changes"}, {qs, QArgs},
-            {headers, Headers}, {body, Body},
-            {ibrowse_options, [{stream_to, {self(), once}}]}],
-        fun(200, _, DataStreamFun) ->
-                parse_changes_feed(Options, UserFun, DataStreamFun);
-            (405, _, _) when is_list(DocIds) ->
-                % CouchDB versions < 1.1.0 don't have the builtin _changes feed
-                % filter "_doc_ids" neither support POST
-                send_req(HttpDb, [{method, get}, {path, "_changes"},
-                    {qs, BaseQArgs}, {headers, Headers1},
-                    {ibrowse_options, [{stream_to, {self(), once}}]}],
-                    fun(200, _, DataStreamFun2) ->
-                        UserFun2 = fun(#doc_info{id = Id} = DocInfo) ->
-                            case lists:member(Id, DocIds) of
-                            true ->
-                                UserFun(DocInfo);
-                            false ->
-                                ok
-                            end;
-                        (LastSeq) ->
-                            UserFun(LastSeq)
-                        end,
-                        parse_changes_feed(Options, UserFun2, DataStreamFun2)
-                    end)
-        end);
+    try
+        send_req(
+            HttpDb,
+            [{method, Method}, {path, "_changes"}, {qs, QArgs},
+                {headers, Headers}, {body, Body},
+                {ibrowse_options, [{stream_to, {self(), once}}]}],
+            fun(200, _, DataStreamFun) ->
+                    parse_changes_feed(Options, UserFun, DataStreamFun);
+                (405, _, _) when is_list(DocIds) ->
+                    % CouchDB versions < 1.1.0 don't have the builtin
+                    % _changes feed filter "_doc_ids" neither support POST
+                    send_req(HttpDb, [{method, get}, {path, "_changes"},
+                        {qs, BaseQArgs}, {headers, Headers1},
+                        {ibrowse_options, [{stream_to, {self(), once}}]}],
+                        fun(200, _, DataStreamFun2) ->
+                            UserFun2 = fun(#doc_info{id = Id} = DocInfo) ->
+                                case lists:member(Id, DocIds) of
+                                true ->
+                                    UserFun(DocInfo);
+                                false ->
+                                    ok
+                                end;
+                            (LastSeq) ->
+                                UserFun(LastSeq)
+                            end,
+                            parse_changes_feed(Options, UserFun2,
+                                DataStreamFun2)
+                        end)
+            end)
+    catch
+        exit:{http_request_failed, _, _, {error, {connection_closed,
+                mid_stream}}} ->
+            throw(retry_no_limit);
+        exit:{http_request_failed, _, _, _} = Error ->
+            throw({retry_limit, Error})
+    end;
 changes_since(Db, Style, StartSeq, UserFun, Options) ->
     DocIds = get_value(doc_ids, Options),
     Selector = get_value(selector, Options),
diff --git a/src/couch_replicator_api_wrap.hrl b/src/couch_replicator_api_wrap.hrl
index eee04da..24e204b 100644
--- a/src/couch_replicator_api_wrap.hrl
+++ b/src/couch_replicator_api_wrap.hrl
@@ -24,7 +24,8 @@
     retries = 10,
     wait = 250,         % milliseconds
     httpc_pool = nil,
-    http_connections
+    http_connections,
+    backoff = 25
 }).
 
 -record(oauth, {
diff --git a/src/couch_replicator_changes_reader.erl b/src/couch_replicator_changes_reader.erl
index b7d18e0..bed318a 100644
--- a/src/couch_replicator_changes_reader.erl
+++ b/src/couch_replicator_changes_reader.erl
@@ -52,7 +52,10 @@
         throw:recurse ->
             LS = get(last_seq),
             read_changes(Parent, LS, Db, ChangesQueue, Options, Ts+1);
-        exit:{http_request_failed, _, _, _} = Error ->
+        throw:retry_no_limit ->
+            LS = get(last_seq),
+            read_changes(Parent, LS, Db, ChangesQueue, Options, Ts);
+        throw:{retry_limit, Error} ->
         couch_stats:increment_counter(
             [couch_replicator, changes_read_failures]
         ),
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 266b922..309a230 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -138,6 +138,8 @@
 
 process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
     case list_to_integer(Code) of
+    429 ->
+        backoff(Worker, HttpDb, Params);
     Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
         couch_stats:increment_counter([couch_replicator, responses, success]),
         EJson = case Body of
@@ -162,6 +164,9 @@
     receive
     {ibrowse_async_headers, ReqId, Code, Headers} ->
         case list_to_integer(Code) of
+        429 ->
+            backoff(Worker, HttpDb#httpdb{timeout = get_max_back_off()},
+                Params);
         Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
             StreamDataFun = fun() ->
                 stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
@@ -172,6 +177,9 @@
                 Ret = Callback(Ok, Headers, StreamDataFun),
                 Ret
             catch
+                throw:{maybe_retry_req, connection_closed} ->
+                    maybe_retry({connection_closed, mid_stream},
+                        Worker, HttpDb, Params);
                 throw:{maybe_retry_req, Err} ->
                     maybe_retry(Err, Worker, HttpDb, Params)
             end;
@@ -258,21 +266,46 @@
     end.
 
 
+%% For 429 errors, we perform an exponential backoff up to 2.17 hours.
+%% We use Backoff time as a timeout/failure end.
+backoff(Worker, #httpdb{backoff = Backoff} = HttpDb, Params) ->
+    MaxBackOff = get_max_back_off(),
+    MaxBackOffLog = get_back_off_log_threshold(),
+    ok = timer:sleep(random:uniform(Backoff)),
+    Backoff2 = round(Backoff*get_back_off_exp()),
+    NewBackoff = erlang:min(Backoff2, MaxBackOff),
+    NewHttpDb = HttpDb#httpdb{backoff = NewBackoff},
+    case Backoff2 of
+        W0 when W0 > MaxBackOff ->
+            report_error(Worker, HttpDb, Params, {error,
+                "Long 429-induced Retry Time Out"});
+        W1 when W1 >=  MaxBackOffLog -> % Past 8 min, we log retries
+            log_retry_error(Params, HttpDb, Backoff2, "429 Retry"),
+            throw({retry, NewHttpDb, Params});
+        _ ->
+            throw({retry, NewHttpDb, Params})
+    end.
+
+
 maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params) ->
     report_error(Worker, HttpDb, Params, {error, Error});
 
 maybe_retry(Error, _Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
     Params) ->
-    Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
-    Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
-    couch_log:notice("Retrying ~s request to ~s in ~p seconds due to error ~s",
-        [Method, Url, Wait / 1000, error_cause(Error)]),
     ok = timer:sleep(Wait),
+    log_retry_error(Params, HttpDb, Wait, Error),
     Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
     NewHttpDb = HttpDb#httpdb{retries = Retries - 1, wait = Wait2},
     throw({retry, NewHttpDb, Params}).
 
 
+log_retry_error(Params, HttpDb, Wait, Error) ->
+    Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
+    Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
+    couch_log:notice("Retrying ~s request to ~s in ~p seconds due to error ~s",
+        [Method, Url, Wait / 1000, error_cause(Error)]).
+
+
 report_error(_Worker, HttpDb, Params, Error) ->
     Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
     Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
@@ -406,3 +439,12 @@
 after_redirect(RedirectUrl, HttpDb, Params) ->
     Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)),
     {HttpDb#httpdb{url = RedirectUrl}, Params2}.
+
+get_max_back_off() ->
+    config:get_integer("replicator", "max_backoff_wait", 250 * 32768).
+
+get_back_off_log_threshold() ->
+    config:get_integer("replicator", "max_backoff_log_threshold", 512000).
+
+get_back_off_exp() ->
+    config:get_float("replicator", "backoff_exp", 1.5).