Retry when connection_closed is received during a streamed response

The changes_reader uses a streamed response. During the stream, it's
possible to receive a connection_closed error due to timeouts or
network issues. We simply retry the request because for streamed
responses a connection must be established first in order for the
stream to begin. So if the network is truly down, the initial request
will fail and the code path will go through the normal retry clause
which decrements the number of retries. This way we won't be stuck
forever if it's an actual network issue.

BugzId: 70400
COUCHDB-3010
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index ff6b00c..f22cac8 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -483,33 +483,42 @@
         JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}),
         {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2}
     end,
-    send_req(
-        HttpDb,
-        [{method, Method}, {path, "_changes"}, {qs, QArgs},
-            {headers, Headers}, {body, Body},
-            {ibrowse_options, [{stream_to, {self(), once}}]}],
-        fun(200, _, DataStreamFun) ->
-                parse_changes_feed(Options, UserFun, DataStreamFun);
-            (405, _, _) when is_list(DocIds) ->
-                % CouchDB versions < 1.1.0 don't have the builtin _changes feed
-                % filter "_doc_ids" neither support POST
-                send_req(HttpDb, [{method, get}, {path, "_changes"},
-                    {qs, BaseQArgs}, {headers, Headers1},
-                    {ibrowse_options, [{stream_to, {self(), once}}]}],
-                    fun(200, _, DataStreamFun2) ->
-                        UserFun2 = fun(#doc_info{id = Id} = DocInfo) ->
-                            case lists:member(Id, DocIds) of
-                            true ->
-                                UserFun(DocInfo);
-                            false ->
-                                ok
-                            end;
-                        (LastSeq) ->
-                            UserFun(LastSeq)
-                        end,
-                        parse_changes_feed(Options, UserFun2, DataStreamFun2)
-                    end)
-        end);
+    try
+        send_req(
+            HttpDb,
+            [{method, Method}, {path, "_changes"}, {qs, QArgs},
+                {headers, Headers}, {body, Body},
+                {ibrowse_options, [{stream_to, {self(), once}}]}],
+            fun(200, _, DataStreamFun) ->
+                    parse_changes_feed(Options, UserFun, DataStreamFun);
+                (405, _, _) when is_list(DocIds) ->
+                    % CouchDB versions < 1.1.0 don't have the builtin
+                    % _changes feed filter "_doc_ids" neither support POST
+                    send_req(HttpDb, [{method, get}, {path, "_changes"},
+                        {qs, BaseQArgs}, {headers, Headers1},
+                        {ibrowse_options, [{stream_to, {self(), once}}]}],
+                        fun(200, _, DataStreamFun2) ->
+                            UserFun2 = fun(#doc_info{id = Id} = DocInfo) ->
+                                case lists:member(Id, DocIds) of
+                                true ->
+                                    UserFun(DocInfo);
+                                false ->
+                                    ok
+                                end;
+                            (LastSeq) ->
+                                UserFun(LastSeq)
+                            end,
+                            parse_changes_feed(Options, UserFun2,
+                                DataStreamFun2)
+                        end)
+            end)
+    catch
+        exit:{http_request_failed, _, _, {error, {connection_closed,
+                mid_stream}}} ->
+            throw(retry_no_limit);
+        exit:{http_request_failed, _, _, _} = Error ->
+            throw({retry_limit, Error})
+    end;
 changes_since(Db, Style, StartSeq, UserFun, Options) ->
     DocIds = get_value(doc_ids, Options),
     Selector = get_value(selector, Options),
diff --git a/src/couch_replicator_changes_reader.erl b/src/couch_replicator_changes_reader.erl
index b7d18e0..bed318a 100644
--- a/src/couch_replicator_changes_reader.erl
+++ b/src/couch_replicator_changes_reader.erl
@@ -52,7 +52,10 @@
         throw:recurse ->
             LS = get(last_seq),
             read_changes(Parent, LS, Db, ChangesQueue, Options, Ts+1);
-        exit:{http_request_failed, _, _, _} = Error ->
+        throw:retry_no_limit ->
+            LS = get(last_seq),
+            read_changes(Parent, LS, Db, ChangesQueue, Options, Ts);
+        throw:{retry_limit, Error} ->
         couch_stats:increment_counter(
             [couch_replicator, changes_read_failures]
         ),
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 9a4cca4..366c325 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -180,6 +180,9 @@
                 Ret = Callback(Ok, Headers, StreamDataFun),
                 Ret
             catch
+                throw:{maybe_retry_req, connection_closed} ->
+                    maybe_retry({connection_closed, mid_stream},
+                        Worker, HttpDb, Params);
                 throw:{maybe_retry_req, Err} ->
                     maybe_retry(Err, Worker, HttpDb, Params)
             end;