Ensure that ibrowse streams are ended properly
I found a situation where we had live lock on a running application due
to an ibrowse request that hadn't been properly terminated. This
manifested as a cesation of updates to the _active_tasks information.
Debugging this lead me to see that the main couch_replicator pid was
stuck on a call to get_pending_changes. This call was stuck because the
ibrowse_http_client process being used was stuck waiting for a changes
request to complete.
This changes request as it turns out had been abandoned by the
couch_replicator_changes_reader. The changes reader was then stuck
trying to do a gen_server:call/2 back to the main couch_replicator
process with the report_seq_done message.
Given all this, it became apparent that the changes feed improperly
ending its ibrowse streams was the underlying culprit. Issuing a call to
ibrowse:stream_next/1 with the abandoned ibrowse stream id resulted in
the replication resuming.
This bug was introduced in this commit:
bfa020b43be20c54ab166c51f5c6e55c34d844c2
BugzId: 47306
This is a cherry-pick of:
https://github.com/cloudant/couch_replicator/commit/f9db37a9b293f5f078681e7539fd35a92eb3adec
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 9a6213a..e61fba5 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -27,6 +27,7 @@
-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
-define(MAX_WAIT, 5 * 60 * 1000).
+-define(STREAM_STATUS, ibrowse_stream_status).
setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
@@ -35,6 +36,7 @@
send_req(HttpDb, Params1, Callback) ->
+ put(STREAM_STATUS, init),
couch_stats:increment_counter([couch_replicator, requests]),
Params2 = ?replace(Params1, qs,
[{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
@@ -132,6 +134,7 @@
StreamDataFun = fun() ->
stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
end,
+ put(STREAM_STATUS, streaming),
ibrowse:stream_next(ReqId),
try
Ret = Callback(Ok, Headers, StreamDataFun),
@@ -167,13 +170,24 @@
% messages for the given ReqId on the floor since we're
% no longer in the HTTP request.
clean_mailbox({ibrowse_req_id, ReqId}) ->
- receive
- {ibrowse_async_response, ReqId, _} ->
- clean_mailbox({ibrowse_req_id, ReqId});
- {ibrowse_async_response_end, ReqId} ->
- clean_mailbox({ibrowse_req_id, ReqId})
- after 0 ->
- ok
+ case get(STREAM_STATUS) of
+ streaming ->
+ ibrowse:stream_next(ReqId),
+ receive
+ {ibrowse_async_response, ReqId, _} ->
+ clean_mailbox({ibrowse_req_id, ReqId});
+ {ibrowse_async_response_end, ReqId} ->
+ put(STREAM_STATUS, ended),
+ ok
+ end;
+ _ ->
+ receive
+ {ibrowse_async_response, ReqId, _} ->
+ clean_mailbox({ibrowse_req_id, ReqId});
+ {ibrowse_async_response_end, ReqId} ->
+ put(STREAM_STATUS, ended),
+ ok
+ end
end;
clean_mailbox(_) ->
ok.
@@ -222,6 +236,7 @@
ibrowse:stream_next(ReqId),
{Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
{Data, ibrowse_async_response_end} ->
+ put(STREAM_STATUS, ended),
{Data, fun() -> throw({maybe_retry_req, more_data_expected}) end}
end.