Fix stuck changes reader in clean_mailbox

Due to unfortunate timing issues it was possible for a changes reader to
get stuck in clean_mailbox reading an entire changes feed before
exiting. If the ibrowse call timed out right before ibrowse starts
sending messages then we would see clean_mailbox loop until the changes
feed terminated on the source.

This caps the number of messagses that can be cleaned up to a maximum of
sixteen. This limit is rather arbitrary. The cleanup was intended for when
only a couple messages were lingering. This is much larger than that
without being insanely large.

BugzId: 49717
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index e591601..052eb98 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -30,6 +30,16 @@
 -define(STREAM_STATUS, ibrowse_stream_status).
 
 
+% This limit is for the number of messages we're willing to discard
+% from an HTTP stream in clean_mailbox/1 before killing the worker
+% and returning. The original intent for clean_mailbox was to remove
+% a single message or two if the changes feed returned before fully
+% consuming the request. This threshold gives us confidence we'll
+% continue to properly close changes feeds while avoiding any case
+% where we may end up processing an unbounded number of messages.
+-define(MAX_DISCARDED_MESSAGES, 16).
+
+
 setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
     {ok, Pid} = couch_replicator_httpc_pool:start_link(Url, [{max_connections, MaxConns}]),
     {ok, Db#httpdb{httpc_pool = Pid}}.
@@ -169,13 +179,31 @@
 % on the ibrowse_req_id format. This just drops all
 % messages for the given ReqId on the floor since we're
 % no longer in the HTTP request.
-clean_mailbox({ibrowse_req_id, ReqId}) ->
+
+clean_mailbox(ReqId) ->
+    clean_mailbox(ReqId, ?MAX_DISCARDED_MESSAGES).
+
+
+clean_mailbox(_ReqId, 0) ->
+    case get(?STREAM_STATUS) of
+        {streaming, Worker} ->
+            % We kill workers that continue to stream us
+            % messages after we give up but do *not* exit
+            % our selves. This is because we may be running
+            % as an exception unwinds and we don't want to
+            % change any of that subtle logic.
+            exit(Worker, {timeout, ibrowse_stream_cleanup});
+        _ ->
+            ok
+    end,
+    ok;
+clean_mailbox({ibrowse_req_id, ReqId}, Count) when Count > 0 ->
     case get(?STREAM_STATUS) of
         streaming ->
             ibrowse:stream_next(ReqId),
             receive
                 {ibrowse_async_response, ReqId, _} ->
-                    clean_mailbox({ibrowse_req_id, ReqId});
+                    clean_mailbox({ibrowse_req_id, ReqId}, Count - 1);
                 {ibrowse_async_response_end, ReqId} ->
                     put(?STREAM_STATUS, ended),
                     ok
@@ -185,7 +213,7 @@
         Status when Status == init; Status == ended ->
             receive
                 {ibrowse_async_response, ReqId, _} ->
-                    clean_mailbox({ibrowse_req_id, ReqId});
+                    clean_mailbox({ibrowse_req_id, ReqId}, Count - 1);
                 {ibrowse_async_response_end, ReqId} ->
                     put(?STREAM_STATUS, ended),
                     ok
@@ -193,7 +221,7 @@
                     ok
             end
     end;
-clean_mailbox(_) ->
+clean_mailbox(_, Count) when Count > 0 ->
     ok.