Fix stuck changes reader in clean_mailbox
Due to unfortunate timing issues it was possible for a changes reader to
get stuck in clean_mailbox reading an entire changes feed before
exiting. If the ibrowse call timed out right before ibrowse starts
sending messages then we would see clean_mailbox loop until the changes
feed terminated on the source.
This caps the number of messagses that can be cleaned up to a maximum of
sixteen. This limit is rather arbitrary. The cleanup was intended for when
only a couple messages were lingering. This is much larger than that
without being insanely large.
BugzId: 49717
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index e591601..052eb98 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -30,6 +30,16 @@
-define(STREAM_STATUS, ibrowse_stream_status).
+% This limit is for the number of messages we're willing to discard
+% from an HTTP stream in clean_mailbox/1 before killing the worker
+% and returning. The original intent for clean_mailbox was to remove
+% a single message or two if the changes feed returned before fully
+% consuming the request. This threshold gives us confidence we'll
+% continue to properly close changes feeds while avoiding any case
+% where we may end up processing an unbounded number of messages.
+-define(MAX_DISCARDED_MESSAGES, 16).
+
+
setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
{ok, Pid} = couch_replicator_httpc_pool:start_link(Url, [{max_connections, MaxConns}]),
{ok, Db#httpdb{httpc_pool = Pid}}.
@@ -169,13 +179,31 @@
% on the ibrowse_req_id format. This just drops all
% messages for the given ReqId on the floor since we're
% no longer in the HTTP request.
-clean_mailbox({ibrowse_req_id, ReqId}) ->
+
+clean_mailbox(ReqId) ->
+ clean_mailbox(ReqId, ?MAX_DISCARDED_MESSAGES).
+
+
+clean_mailbox(_ReqId, 0) ->
+ case get(?STREAM_STATUS) of
+ {streaming, Worker} ->
+ % We kill workers that continue to stream us
+ % messages after we give up but do *not* exit
+ % our selves. This is because we may be running
+ % as an exception unwinds and we don't want to
+ % change any of that subtle logic.
+ exit(Worker, {timeout, ibrowse_stream_cleanup});
+ _ ->
+ ok
+ end,
+ ok;
+clean_mailbox({ibrowse_req_id, ReqId}, Count) when Count > 0 ->
case get(?STREAM_STATUS) of
streaming ->
ibrowse:stream_next(ReqId),
receive
{ibrowse_async_response, ReqId, _} ->
- clean_mailbox({ibrowse_req_id, ReqId});
+ clean_mailbox({ibrowse_req_id, ReqId}, Count - 1);
{ibrowse_async_response_end, ReqId} ->
put(?STREAM_STATUS, ended),
ok
@@ -185,7 +213,7 @@
Status when Status == init; Status == ended ->
receive
{ibrowse_async_response, ReqId, _} ->
- clean_mailbox({ibrowse_req_id, ReqId});
+ clean_mailbox({ibrowse_req_id, ReqId}, Count - 1);
{ibrowse_async_response_end, ReqId} ->
put(?STREAM_STATUS, ended),
ok
@@ -193,7 +221,7 @@
ok
end
end;
-clean_mailbox(_) ->
+clean_mailbox(_, Count) when Count > 0 ->
ok.