Fix race condition in worker release on connection_closing state.
This is exposed in the replicator large attachments tests case,
replicating from local to remote. In the current test configuration
it appears about once in 20-40 times. The failure manifests as
up as an {error, req_timedout} exception in the logs from one of the
PUT methods, during push replication. Then database comparison fails
because not all documents made it to the target.
Gory details:
After ibrowse receives Connection: Close header it will go into
shutdown 'connection_closing' state.
couch_replicator_httpc handles that state by trying to close
the socket and retrying, hoping that it would pick up a new worker from
the pool on next retry in couch_replicator_httpc.erl:
```
process_response({error, connection_closing}, Worker, HttpDb, Params, _Cb) ->
...
```
But it did not directly have a way to ensure socket is really closed,
instead it called ibrowse_http_client:stop(Worker). That didn't wait for
worker to die, also worker was returned back to the pool asynchronously,
in the 'after' clause in couch_replicator_httpc:send_req/3.
This worker which could still be alive but in a dying process,
could have been picked up immediately during the retry.
ibrowse in ibrowse:do_send_req/7 will handle a dead workers
process as {error, req_timedout}, which is what the intermitend
test failure showed in the log:
The fix:
* Make sure worker is really stopped after calling stop.
* Make sure worker is returned to the pool synchronously. So that
on retry, a worker in a known good state is picked up.
COUCHDB-2833
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 8b34e0e..668b218 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -103,6 +103,20 @@
{Worker, Response}.
+%% Stop worker, wait for it to die, then release it. Make sure it is dead before
+%% releasing it to the pool, so there is not race triggered recycling it again.
+%% The reason is recycling a dying worker, could end up that worker returning
+%% {error, req_timedout} error. While in reality is not really a timeout, just
+%% a race condition.
+stop_and_release_worker(Pool, Worker) ->
+ Ref = erlang:monitor(process, Worker),
+ ibrowse_http_client:stop(Worker),
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ ok
+ end,
+ ok = couch_replicator_httpc_pool:release_worker_sync(Pool, Worker).
+
process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, _Cb) ->
throw({retry, HttpDb, Params});
@@ -110,8 +124,8 @@
%% For example, if server responds to a request, sets Connection: close header
%% and closes the socket, ibrowse will detect that error when it sends
%% next request.
-process_response({error, connection_closing}, Worker, HttpDb, Params, _Cb)->
- ibrowse_http_client:stop(Worker),
+process_response({error, connection_closing}, Worker, HttpDb, Params, _Cb) ->
+ stop_and_release_worker(HttpDb#httpdb.httpc_pool, Worker),
throw({retry, HttpDb, Params});
process_response({error, {'EXIT',{normal,_}}}, _Worker, HttpDb, Params, _Cb) ->
diff --git a/src/couch_replicator_httpc_pool.erl b/src/couch_replicator_httpc_pool.erl
index c895048..09e3b23 100644
--- a/src/couch_replicator_httpc_pool.erl
+++ b/src/couch_replicator_httpc_pool.erl
@@ -16,7 +16,7 @@
% public API
-export([start_link/2, stop/1]).
--export([get_worker/1, release_worker/2]).
+-export([get_worker/1, release_worker/2, release_worker_sync/2]).
% gen_server API
-export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
@@ -52,6 +52,8 @@
release_worker(Pool, Worker) ->
ok = gen_server:cast(Pool, {release_worker, Worker}).
+release_worker_sync(Pool, Worker) ->
+ ok = gen_server:call(Pool, {release_worker_sync, Worker}).
init({Url, Options}) ->
process_flag(trap_exit, true),
@@ -91,36 +93,13 @@
end;
handle_call(stop, _From, State) ->
- {stop, normal, ok, State}.
+ {stop, normal, ok, State};
+handle_call({release_worker_sync, Worker}, _From, State) ->
+ {reply, ok, release_worker_internal(Worker, State)}.
handle_cast({release_worker, Worker}, State) ->
- #state{waiting = Waiting, callers = Callers} = State,
- NewCallers0 = demonitor_client(Callers, Worker),
- case is_process_alive(Worker) andalso
- lists:member(Worker, State#state.busy) of
- true ->
- case queue:out(Waiting) of
- {empty, Waiting2} ->
- NewCallers1 = NewCallers0,
- Busy2 = State#state.busy -- [Worker],
- Free2 = [Worker | State#state.free];
- {{value, From}, Waiting2} ->
- NewCallers1 = monitor_client(NewCallers0, Worker, From),
- gen_server:reply(From, {ok, Worker}),
- Busy2 = State#state.busy,
- Free2 = State#state.free
- end,
- NewState = State#state{
- busy = Busy2,
- free = Free2,
- waiting = Waiting2,
- callers = NewCallers1
- },
- {noreply, NewState};
- false ->
- {noreply, State#state{callers = NewCallers0}}
- end.
+ {noreply, release_worker_internal(Worker, State)}.
handle_info({'EXIT', Pid, _Reason}, State) ->
#state{
@@ -183,3 +162,31 @@
false ->
Callers
end.
+
+release_worker_internal(Worker, State) ->
+ #state{waiting = Waiting, callers = Callers} = State,
+ NewCallers0 = demonitor_client(Callers, Worker),
+ case is_process_alive(Worker) andalso
+ lists:member(Worker, State#state.busy) of
+ true ->
+ case queue:out(Waiting) of
+ {empty, Waiting2} ->
+ NewCallers1 = NewCallers0,
+ Busy2 = State#state.busy -- [Worker],
+ Free2 = [Worker | State#state.free];
+ {{value, From}, Waiting2} ->
+ NewCallers1 = monitor_client(NewCallers0, Worker, From),
+ gen_server:reply(From, {ok, Worker}),
+ Busy2 = State#state.busy,
+ Free2 = State#state.free
+ end,
+ NewState = State#state{
+ busy = Busy2,
+ free = Free2,
+ waiting = Waiting2,
+ callers = NewCallers1
+ },
+ NewState;
+ false ->
+ State#state{callers = NewCallers0}
+ end.