Fix race condition in worker release on connection_closing state.

This is exposed in the replicator large attachments tests case,
replicating from local to remote. In the current test configuration
it appears about once in 20-40 times. The failure manifests as
up as an {error, req_timedout} exception in the logs from one of the
PUT methods, during push replication. Then database comparison fails
because not all documents made it to the target.

Gory details:

After ibrowse receives Connection: Close header it will go into
shutdown 'connection_closing' state.

couch_replicator_httpc handles that state by trying to close
the socket and retrying, hoping that it would pick up a new worker from
the pool on next retry in couch_replicator_httpc.erl:

```
process_response({error, connection_closing}, Worker, HttpDb, Params, _Cb) ->
    ...
```

But it did not directly have a way to ensure socket is really closed,
instead it called ibrowse_http_client:stop(Worker). That didn't wait for
worker to die, also worker was returned back to the pool asynchronously,
in the 'after' clause in couch_replicator_httpc:send_req/3.

This worker which could still be alive but in a dying process,
could have been picked up immediately during the retry.
ibrowse in ibrowse:do_send_req/7 will handle a dead workers
process as {error, req_timedout}, which is what the intermitend
test failure showed in the log:

The fix:

 * Make sure worker is really stopped after calling stop.

 * Make sure worker is returned to the pool synchronously. So that
   on retry, a worker in a known good state is picked up.

COUCHDB-2833
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 8b34e0e..668b218 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -103,6 +103,20 @@
     {Worker, Response}.
 
 
+%% Stop worker, wait for it to die, then release it. Make sure it is dead before
+%% releasing it to the pool, so there is not race triggered recycling it again.
+%% The reason is recycling a dying worker, could end up that worker returning
+%% {error, req_timedout} error. While in reality is not really a timeout, just
+%% a race condition.
+stop_and_release_worker(Pool, Worker) ->
+    Ref = erlang:monitor(process, Worker),
+    ibrowse_http_client:stop(Worker),
+    receive
+        {'DOWN', Ref, _, _, _} ->
+            ok
+    end,
+    ok = couch_replicator_httpc_pool:release_worker_sync(Pool, Worker).
+
 process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, _Cb) ->
     throw({retry, HttpDb, Params});
 
@@ -110,8 +124,8 @@
 %% For example, if server responds to a request, sets Connection: close header
 %% and closes the socket, ibrowse will detect that error when it sends
 %% next request.
-process_response({error, connection_closing}, Worker, HttpDb, Params, _Cb)->
-    ibrowse_http_client:stop(Worker),
+process_response({error, connection_closing}, Worker, HttpDb, Params, _Cb) ->
+    stop_and_release_worker(HttpDb#httpdb.httpc_pool, Worker),
     throw({retry, HttpDb, Params});
 
 process_response({error, {'EXIT',{normal,_}}}, _Worker, HttpDb, Params, _Cb) ->
diff --git a/src/couch_replicator_httpc_pool.erl b/src/couch_replicator_httpc_pool.erl
index c895048..09e3b23 100644
--- a/src/couch_replicator_httpc_pool.erl
+++ b/src/couch_replicator_httpc_pool.erl
@@ -16,7 +16,7 @@
 
 % public API
 -export([start_link/2, stop/1]).
--export([get_worker/1, release_worker/2]).
+-export([get_worker/1, release_worker/2, release_worker_sync/2]).
 
 % gen_server API
 -export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
@@ -52,6 +52,8 @@
 release_worker(Pool, Worker) ->
     ok = gen_server:cast(Pool, {release_worker, Worker}).
 
+release_worker_sync(Pool, Worker) ->
+    ok = gen_server:call(Pool, {release_worker_sync, Worker}).
 
 init({Url, Options}) ->
     process_flag(trap_exit, true),
@@ -91,36 +93,13 @@
     end;
 
 handle_call(stop, _From, State) ->
-    {stop, normal, ok, State}.
+    {stop, normal, ok, State};
 
+handle_call({release_worker_sync, Worker}, _From, State) ->
+    {reply, ok, release_worker_internal(Worker, State)}.
 
 handle_cast({release_worker, Worker}, State) ->
-    #state{waiting = Waiting, callers = Callers} = State,
-    NewCallers0 = demonitor_client(Callers, Worker),
-    case is_process_alive(Worker) andalso
-        lists:member(Worker, State#state.busy) of
-    true ->
-        case queue:out(Waiting) of
-        {empty, Waiting2} ->
-            NewCallers1 = NewCallers0,
-            Busy2 = State#state.busy -- [Worker],
-            Free2 = [Worker | State#state.free];
-        {{value, From}, Waiting2} ->
-            NewCallers1 = monitor_client(NewCallers0, Worker, From),
-            gen_server:reply(From, {ok, Worker}),
-            Busy2 = State#state.busy,
-            Free2 = State#state.free
-        end,
-        NewState = State#state{
-           busy = Busy2,
-           free = Free2,
-           waiting = Waiting2,
-           callers = NewCallers1
-        },
-        {noreply, NewState};
-   false ->
-        {noreply, State#state{callers = NewCallers0}}
-   end.
+    {noreply, release_worker_internal(Worker, State)}.
 
 handle_info({'EXIT', Pid, _Reason}, State) ->
     #state{
@@ -183,3 +162,31 @@
         false ->
             Callers
     end.
+
+release_worker_internal(Worker, State) ->
+    #state{waiting = Waiting, callers = Callers} = State,
+    NewCallers0 = demonitor_client(Callers, Worker),
+    case is_process_alive(Worker) andalso
+        lists:member(Worker, State#state.busy) of
+    true ->
+        case queue:out(Waiting) of
+        {empty, Waiting2} ->
+            NewCallers1 = NewCallers0,
+            Busy2 = State#state.busy -- [Worker],
+            Free2 = [Worker | State#state.free];
+        {{value, From}, Waiting2} ->
+            NewCallers1 = monitor_client(NewCallers0, Worker, From),
+            gen_server:reply(From, {ok, Worker}),
+            Busy2 = State#state.busy,
+            Free2 = State#state.free
+        end,
+        NewState = State#state{
+           busy = Busy2,
+           free = Free2,
+           waiting = Waiting2,
+           callers = NewCallers1
+        },
+        NewState;
+   false ->
+        State#state{callers = NewCallers0}
+   end.