Merge branch '3010-port-429' into apache
COUCHDB-3010
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index d20260f..e5f6253 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -487,33 +487,42 @@
JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}),
{[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2}
end,
- send_req(
- HttpDb,
- [{method, Method}, {path, "_changes"}, {qs, QArgs},
- {headers, Headers}, {body, Body},
- {ibrowse_options, [{stream_to, {self(), once}}]}],
- fun(200, _, DataStreamFun) ->
- parse_changes_feed(Options, UserFun, DataStreamFun);
- (405, _, _) when is_list(DocIds) ->
- % CouchDB versions < 1.1.0 don't have the builtin _changes feed
- % filter "_doc_ids" neither support POST
- send_req(HttpDb, [{method, get}, {path, "_changes"},
- {qs, BaseQArgs}, {headers, Headers1},
- {ibrowse_options, [{stream_to, {self(), once}}]}],
- fun(200, _, DataStreamFun2) ->
- UserFun2 = fun(#doc_info{id = Id} = DocInfo) ->
- case lists:member(Id, DocIds) of
- true ->
- UserFun(DocInfo);
- false ->
- ok
- end;
- (LastSeq) ->
- UserFun(LastSeq)
- end,
- parse_changes_feed(Options, UserFun2, DataStreamFun2)
- end)
- end);
+ try
+ send_req(
+ HttpDb,
+ [{method, Method}, {path, "_changes"}, {qs, QArgs},
+ {headers, Headers}, {body, Body},
+ {ibrowse_options, [{stream_to, {self(), once}}]}],
+ fun(200, _, DataStreamFun) ->
+ parse_changes_feed(Options, UserFun, DataStreamFun);
+ (405, _, _) when is_list(DocIds) ->
+ % CouchDB versions < 1.1.0 don't have the builtin
+ % _changes feed filter "_doc_ids" neither support POST
+ send_req(HttpDb, [{method, get}, {path, "_changes"},
+ {qs, BaseQArgs}, {headers, Headers1},
+ {ibrowse_options, [{stream_to, {self(), once}}]}],
+ fun(200, _, DataStreamFun2) ->
+ UserFun2 = fun(#doc_info{id = Id} = DocInfo) ->
+ case lists:member(Id, DocIds) of
+ true ->
+ UserFun(DocInfo);
+ false ->
+ ok
+ end;
+ (LastSeq) ->
+ UserFun(LastSeq)
+ end,
+ parse_changes_feed(Options, UserFun2,
+ DataStreamFun2)
+ end)
+ end)
+ catch
+ exit:{http_request_failed, _, _, {error, {connection_closed,
+ mid_stream}}} ->
+ throw(retry_no_limit);
+ exit:{http_request_failed, _, _, _} = Error ->
+ throw({retry_limit, Error})
+ end;
changes_since(Db, Style, StartSeq, UserFun, Options) ->
DocIds = get_value(doc_ids, Options),
Selector = get_value(selector, Options),
diff --git a/src/couch_replicator_api_wrap.hrl b/src/couch_replicator_api_wrap.hrl
index eee04da..24e204b 100644
--- a/src/couch_replicator_api_wrap.hrl
+++ b/src/couch_replicator_api_wrap.hrl
@@ -24,7 +24,8 @@
retries = 10,
wait = 250, % milliseconds
httpc_pool = nil,
- http_connections
+ http_connections,
+ backoff = 25
}).
-record(oauth, {
diff --git a/src/couch_replicator_changes_reader.erl b/src/couch_replicator_changes_reader.erl
index b7d18e0..bed318a 100644
--- a/src/couch_replicator_changes_reader.erl
+++ b/src/couch_replicator_changes_reader.erl
@@ -52,7 +52,10 @@
throw:recurse ->
LS = get(last_seq),
read_changes(Parent, LS, Db, ChangesQueue, Options, Ts+1);
- exit:{http_request_failed, _, _, _} = Error ->
+ throw:retry_no_limit ->
+ LS = get(last_seq),
+ read_changes(Parent, LS, Db, ChangesQueue, Options, Ts);
+ throw:{retry_limit, Error} ->
couch_stats:increment_counter(
[couch_replicator, changes_read_failures]
),
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 266b922..309a230 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -138,6 +138,8 @@
process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
case list_to_integer(Code) of
+ 429 ->
+ backoff(Worker, HttpDb, Params);
Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
couch_stats:increment_counter([couch_replicator, responses, success]),
EJson = case Body of
@@ -162,6 +164,9 @@
receive
{ibrowse_async_headers, ReqId, Code, Headers} ->
case list_to_integer(Code) of
+ 429 ->
+ backoff(Worker, HttpDb#httpdb{timeout = get_max_back_off()},
+ Params);
Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
StreamDataFun = fun() ->
stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
@@ -172,6 +177,9 @@
Ret = Callback(Ok, Headers, StreamDataFun),
Ret
catch
+ throw:{maybe_retry_req, connection_closed} ->
+ maybe_retry({connection_closed, mid_stream},
+ Worker, HttpDb, Params);
throw:{maybe_retry_req, Err} ->
maybe_retry(Err, Worker, HttpDb, Params)
end;
@@ -258,21 +266,46 @@
end.
+%% For 429 errors, we perform an exponential backoff up to 2.17 hours.
+%% We use Backoff time as a timeout/failure end.
+backoff(Worker, #httpdb{backoff = Backoff} = HttpDb, Params) ->
+ MaxBackOff = get_max_back_off(),
+ MaxBackOffLog = get_back_off_log_threshold(),
+ ok = timer:sleep(random:uniform(Backoff)),
+ Backoff2 = round(Backoff*get_back_off_exp()),
+ NewBackoff = erlang:min(Backoff2, MaxBackOff),
+ NewHttpDb = HttpDb#httpdb{backoff = NewBackoff},
+ case Backoff2 of
+ W0 when W0 > MaxBackOff ->
+ report_error(Worker, HttpDb, Params, {error,
+ "Long 429-induced Retry Time Out"});
+ W1 when W1 >= MaxBackOffLog -> % Past 8 min, we log retries
+ log_retry_error(Params, HttpDb, Backoff2, "429 Retry"),
+ throw({retry, NewHttpDb, Params});
+ _ ->
+ throw({retry, NewHttpDb, Params})
+ end.
+
+
maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params) ->
report_error(Worker, HttpDb, Params, {error, Error});
maybe_retry(Error, _Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
Params) ->
- Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
- Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
- couch_log:notice("Retrying ~s request to ~s in ~p seconds due to error ~s",
- [Method, Url, Wait / 1000, error_cause(Error)]),
ok = timer:sleep(Wait),
+ log_retry_error(Params, HttpDb, Wait, Error),
Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
NewHttpDb = HttpDb#httpdb{retries = Retries - 1, wait = Wait2},
throw({retry, NewHttpDb, Params}).
+log_retry_error(Params, HttpDb, Wait, Error) ->
+ Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
+ Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
+ couch_log:notice("Retrying ~s request to ~s in ~p seconds due to error ~s",
+ [Method, Url, Wait / 1000, error_cause(Error)]).
+
+
report_error(_Worker, HttpDb, Params, Error) ->
Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
@@ -406,3 +439,12 @@
after_redirect(RedirectUrl, HttpDb, Params) ->
Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)),
{HttpDb#httpdb{url = RedirectUrl}, Params2}.
+
+get_max_back_off() ->
+ config:get_integer("replicator", "max_backoff_wait", 250 * 32768).
+
+get_back_off_log_threshold() ->
+ config:get_integer("replicator", "max_backoff_log_threshold", 512000).
+
+get_back_off_exp() ->
+ config:get_float("replicator", "backoff_exp", 1.5).