% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
% public API
-export([start_link/2, stop/1]).
-export([get_worker/1, release_worker/2, release_worker_sync/2]).
% gen_server API
-export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
-export([code_change/3, terminate/2]).
-import(couch_util, [
-record(state, {
limit, % max # of workers allowed
conns = [],
waiting = queue:new(), % blocked clients waiting for a worker
callers = [] % clients who've been given a worker
start_link(Url, Options) ->
gen_server:start_link(?MODULE, {Url, Options}, []).
stop(Pool) ->
ok = gen_server:call(Pool, stop, infinity).
get_worker(Pool) ->
{ok, _Worker} = gen_server:call(Pool, get_worker, infinity).
release_worker(Pool, Worker) ->
ok = gen_server:cast(Pool, {release_worker, Worker}).
release_worker_sync(Pool, Worker) ->
ok = gen_server:call(Pool, {release_worker_sync, Worker}).
init({Url, Options}) ->
process_flag(trap_exit, true),
State = #state{
url = Url,
limit = get_value(max_connections, Options)
{ok, State}.
handle_call(get_worker, From, State) ->
waiting = Waiting,
callers = Callers,
url = Url,
limit = Limit,
conns = Conns
} = State,
case length(Conns) >= Limit of
true ->
{noreply, State#state{waiting = queue:in(From, Waiting)}};
false ->
% If the call to acquire fails, the worker pool will crash with a
% badmatch.
{ok, Worker} = couch_replicator_connection:acquire(Url),
NewState = State#state{
conns = [Worker | Conns],
callers = monitor_client(Callers, Worker, From)
{reply, {ok, Worker}, NewState}
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call({release_worker_sync, Worker}, _From, State) ->
{reply, ok, release_worker_internal(Worker, State)}.
handle_cast({release_worker, Worker}, State) ->
{noreply, release_worker_internal(Worker, State)}.
handle_info({'EXIT', Pid, _Reason}, State) ->
url = Url,
conns = Conns,
waiting = Waiting,
callers = Callers
} = State,
NewCallers0 = demonitor_client(Callers, Pid),
case Conns -- [Pid] of
Conns ->
{noreply, State#state{callers = NewCallers0}};
Conns2 ->
case queue:out(Waiting) of
{empty, _} ->
{noreply, State#state{conns = Conns2, callers = NewCallers0}};
{{value, From}, Waiting2} ->
{ok, Worker} = couch_replicator_connection:acquire(Url),
NewCallers1 = monitor_client(NewCallers0, Worker, From),
gen_server:reply(From, {ok, Worker}),
NewState = State#state{
conns = [Worker | Conns2],
waiting = Waiting2,
callers = NewCallers1
{noreply, NewState}
handle_info({'DOWN', Ref, process, _, _}, #state{callers = Callers} = State) ->
case lists:keysearch(Ref, 2, Callers) of
{value, {Worker, Ref}} ->
handle_cast({release_worker, Worker}, State);
false ->
{noreply, State}
code_change(_OldVsn, #state{}=State, _Extra) ->
{ok, State}.
terminate(_Reason, _State) ->
monitor_client(Callers, Worker, {ClientPid, _}) ->
[{Worker, erlang:monitor(process, ClientPid)} | Callers].
demonitor_client(Callers, Worker) ->
case lists:keysearch(Worker, 1, Callers) of
{value, {Worker, MonRef}} ->
erlang:demonitor(MonRef, [flush]),
lists:keydelete(Worker, 1, Callers);
false ->
release_worker_internal(Worker, State) ->
#state{waiting = Waiting, callers = Callers} = State,
NewCallers0 = demonitor_client(Callers, Worker),
case is_process_alive(Worker) andalso
lists:member(Worker, State#state.conns) of
true ->
Conns = case queue:out(Waiting) of
{empty, Waiting2} ->
NewCallers1 = NewCallers0,
State#state.conns -- [Worker];
{{value, From}, Waiting2} ->
NewCallers1 = monitor_client(NewCallers0, Worker, From),
gen_server:reply(From, {ok, Worker}),
NewState = State#state{
conns = Conns,
waiting = Waiting2,
callers = NewCallers1
false ->
State#state{callers = NewCallers0}