Ensure that ibrowse streams are ended properly

I found a situation where we had live lock on a running application due
to an ibrowse request that hadn't been properly terminated. This
manifested as a cesation of updates to the _active_tasks information.
Debugging this lead me to see that the main couch_replicator pid was
stuck on a call to get_pending_changes. This call was stuck because the
ibrowse_http_client process being used was stuck waiting for a changes
request to complete.

This changes request as it turns out had been abandoned by the
couch_replicator_changes_reader. The changes reader was then stuck
trying to do a gen_server:call/2 back to the main couch_replicator
process with the report_seq_done message.

Given all this, it became apparent that the changes feed improperly
ending its ibrowse streams was the underlying culprit. Issuing a call to
ibrowse:stream_next/1 with the abandoned ibrowse stream id resulted in
the replication resuming.

This bug was introduced in this commit:
bfa020b43be20c54ab166c51f5c6e55c34d844c2

BugzId: 47306

This is a cherry-pick of:

https://github.com/cloudant/couch_replicator/commit/f9db37a9b293f5f078681e7539fd35a92eb3adec
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 9a6213a..e61fba5 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -27,6 +27,7 @@
 
 -define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
 -define(MAX_WAIT, 5 * 60 * 1000).
+-define(STREAM_STATUS, ibrowse_stream_status).
 
 
 setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
@@ -35,6 +36,7 @@
 
 
 send_req(HttpDb, Params1, Callback) ->
+    put(STREAM_STATUS, init),
     couch_stats:increment_counter([couch_replicator, requests]),
     Params2 = ?replace(Params1, qs,
         [{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
@@ -132,6 +134,7 @@
             StreamDataFun = fun() ->
                 stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
             end,
+            put(STREAM_STATUS, streaming),
             ibrowse:stream_next(ReqId),
             try
                 Ret = Callback(Ok, Headers, StreamDataFun),
@@ -167,13 +170,24 @@
 % messages for the given ReqId on the floor since we're
 % no longer in the HTTP request.
 clean_mailbox({ibrowse_req_id, ReqId}) ->
-    receive
-    {ibrowse_async_response, ReqId, _} ->
-        clean_mailbox({ibrowse_req_id, ReqId});
-    {ibrowse_async_response_end, ReqId} ->
-        clean_mailbox({ibrowse_req_id, ReqId})
-    after 0 ->
-        ok
+    case get(STREAM_STATUS) of
+        streaming ->
+            ibrowse:stream_next(ReqId),
+            receive
+                {ibrowse_async_response, ReqId, _} ->
+                    clean_mailbox({ibrowse_req_id, ReqId});
+                {ibrowse_async_response_end, ReqId} ->
+                    put(STREAM_STATUS, ended),
+                    ok
+            end;
+        _ ->
+            receive
+                {ibrowse_async_response, ReqId, _} ->
+                    clean_mailbox({ibrowse_req_id, ReqId});
+                {ibrowse_async_response_end, ReqId} ->
+                    put(STREAM_STATUS, ended),
+                    ok
+            end
     end;
 clean_mailbox(_) ->
     ok.
@@ -222,6 +236,7 @@
         ibrowse:stream_next(ReqId),
         {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
     {Data, ibrowse_async_response_end} ->
+        put(STREAM_STATUS, ended),
         {Data, fun() -> throw({maybe_retry_req, more_data_expected}) end}
     end.