blob: 04101f2402113819190d392de37b79be24692556 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_proc_manager).
-behaviour(gen_server).
-behaviour(config_listener).
-vsn(3).
-export([
start_link/0,
get_proc_count/0,
get_stale_proc_count/0,
new_proc/1,
reload/0,
terminate_stale_procs/0
]).
-export([
init/1,
terminate/2,
handle_call/3,
handle_cast/2,
handle_info/2,
code_change/3
]).
-export([
handle_config_change/5,
handle_config_terminate/3
]).
-include_lib("couch/include/couch_db.hrl").
-define(PROCS, couch_proc_manager_procs).
-define(WAITERS, couch_proc_manager_waiters).
-define(OPENING, couch_proc_manager_opening).
-define(RELISTEN_DELAY, 5000).
-record(state, {
config,
counts,
threshold_ts,
hard_limit,
soft_limit
}).
-type docid() :: iodata().
-type revision() :: {integer(), binary()}.
-record(client, {
timestamp :: os:timestamp() | '_',
from :: undefined | {pid(), reference()} | '_',
lang :: binary() | '_',
ddoc :: #doc{} | '_',
ddoc_key :: undefined | {DDocId :: docid(), Rev :: revision()} | '_'
}).
-record(proc_int, {
pid,
lang,
client,
ddoc_keys = [],
prompt_fun,
set_timeout_fun,
stop_fun,
t0 = os:timestamp()
}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
get_proc_count() ->
gen_server:call(?MODULE, get_proc_count).
get_stale_proc_count() ->
gen_server:call(?MODULE, get_stale_proc_count).
reload() ->
gen_server:call(?MODULE, set_threshold_ts).
terminate_stale_procs() ->
gen_server:call(?MODULE, terminate_stale_procs).
init([]) ->
process_flag(trap_exit, true),
ok = config:listen_for_changes(?MODULE, undefined),
TableOpts = [public, named_table, ordered_set],
ets:new(?PROCS, TableOpts ++ [{keypos, #proc_int.pid}]),
ets:new(?WAITERS, TableOpts ++ [{keypos, #client.timestamp}]),
ets:new(?OPENING, [public, named_table, set]),
{ok, #state{
config = get_proc_config(),
counts = dict:new(),
threshold_ts = os:timestamp(),
hard_limit = get_hard_limit(),
soft_limit = get_soft_limit()
}}.
terminate(_Reason, _State) ->
ets:foldl(fun(#proc_int{pid=P}, _) ->
couch_util:shutdown_sync(P)
end, 0, ?PROCS),
ok.
handle_call(get_proc_count, _From, State) ->
NumProcs = ets:info(?PROCS, size),
NumOpening = ets:info(?OPENING, size),
{reply, NumProcs + NumOpening, State};
handle_call(get_stale_proc_count, _From, State) ->
#state{threshold_ts = T0} = State,
MatchSpec = [{#proc_int{t0='$1', _='_'}, [{'<', '$1', {T0}}], [true]}],
{reply, ets:select_count(?PROCS, MatchSpec), State};
handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) ->
LangStr = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
Lang = couch_util:to_binary(LangStr),
Client = #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey},
add_waiting_client(Client),
{noreply, flush_waiters(State, Lang)};
handle_call({get_proc, LangStr}, From, State) ->
Lang = couch_util:to_binary(LangStr),
Client = #client{from=From, lang=Lang},
add_waiting_client(Client),
{noreply, flush_waiters(State, Lang)};
handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) ->
erlang:demonitor(Ref, [flush]),
NewState = case ets:lookup(?PROCS, Proc#proc.pid) of
[#proc_int{}=ProcInt] ->
return_proc(State, ProcInt);
[] ->
% Proc must've died and we already
% cleared it out of the table in
% the handle_info clause.
State
end,
{reply, true, NewState};
handle_call(set_threshold_ts, _From, State) ->
FoldFun = fun
(#proc_int{client = undefined} = Proc, StateAcc) ->
remove_proc(StateAcc, Proc);
(_, StateAcc) ->
StateAcc
end,
NewState = ets:foldl(FoldFun, State, ?PROCS),
{reply, ok, NewState#state{threshold_ts = os:timestamp()}};
handle_call(terminate_stale_procs, _From, #state{threshold_ts = Ts1} = State) ->
FoldFun = fun
(#proc_int{client = undefined, t0 = Ts2} = Proc, StateAcc) ->
case Ts1 > Ts2 of
true ->
remove_proc(StateAcc, Proc);
false ->
StateAcc
end;
(_, StateAcc) ->
StateAcc
end,
NewState = ets:foldl(FoldFun, State, ?PROCS),
{reply, ok, NewState};
handle_call(_Call, _From, State) ->
{reply, ignored, State}.
handle_cast({os_proc_idle, Pid}, #state{counts=Counts}=State) ->
NewState = case ets:lookup(?PROCS, Pid) of
[#proc_int{client=undefined, lang=Lang}=Proc] ->
case dict:find(Lang, Counts) of
{ok, Count} when Count >= State#state.soft_limit ->
couch_log:info("Closing idle OS Process: ~p", [Pid]),
remove_proc(State, Proc);
{ok, _} ->
State
end;
_ ->
State
end,
{noreply, NewState};
handle_cast(reload_config, State) ->
NewState = State#state{
config = get_proc_config(),
hard_limit = get_hard_limit(),
soft_limit = get_soft_limit()
},
{noreply, flush_waiters(NewState)};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(shutdown, State) ->
{stop, shutdown, State};
handle_info({'EXIT', Pid, {spawn_ok, Proc0, {ClientPid,_} = From}}, State) ->
ets:delete(?OPENING, Pid),
link(Proc0#proc_int.pid),
Proc = assign_proc(ClientPid, Proc0),
gen_server:reply(From, {ok, Proc, State#state.config}),
{noreply, State};
handle_info({'EXIT', Pid, spawn_error}, State) ->
[{Pid, #client{lang=Lang}}] = ets:lookup(?OPENING, Pid),
ets:delete(?OPENING, Pid),
NewState = State#state{
counts = dict:update_counter(Lang, -1, State#state.counts)
},
{noreply, flush_waiters(NewState, Lang)};
handle_info({'EXIT', Pid, Reason}, State) ->
couch_log:info("~p ~p died ~p", [?MODULE, Pid, Reason]),
case ets:lookup(?PROCS, Pid) of
[#proc_int{} = Proc] ->
NewState = remove_proc(State, Proc),
{noreply, flush_waiters(NewState, Proc#proc_int.lang)};
[] ->
{noreply, State}
end;
handle_info({'DOWN', Ref, _, _, _Reason}, State0) ->
case ets:match_object(?PROCS, #proc_int{client=Ref, _='_'}) of
[#proc_int{} = Proc] ->
{noreply, return_proc(State0, Proc)};
[] ->
{noreply, State0}
end;
handle_info(restart_config_listener, State) ->
ok = config:listen_for_changes(?MODULE, nil),
{noreply, State};
handle_info(_Msg, State) ->
{noreply, State}.
code_change(_OldVsn, #state{}=State, _Extra) ->
{ok, State}.
handle_config_terminate(_, stop, _) ->
ok;
handle_config_terminate(_Server, _Reason, _State) ->
gen_server:cast(?MODULE, reload_config),
erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener).
handle_config_change("query_server_config", _, _, _, _) ->
gen_server:cast(?MODULE, reload_config),
{ok, undefined};
handle_config_change(_, _, _, _, _) ->
{ok, undefined}.
find_proc(#client{lang = Lang, ddoc_key = undefined}) ->
Pred = fun(_) ->
true
end,
find_proc(Lang, Pred);
find_proc(#client{lang = Lang, ddoc = DDoc, ddoc_key = DDocKey} = Client) ->
Pred = fun(#proc_int{ddoc_keys = DDocKeys}) ->
lists:member(DDocKey, DDocKeys)
end,
case find_proc(Lang, Pred) of
not_found ->
case find_proc(Client#client{ddoc_key=undefined}) of
{ok, Proc} ->
teach_ddoc(DDoc, DDocKey, Proc);
Else ->
Else
end;
Else ->
Else
end.
find_proc(Lang, Fun) ->
try iter_procs(Lang, Fun)
catch error:Reason ->
StackTrace = erlang:get_stacktrace(),
couch_log:error("~p ~p ~p", [?MODULE, Reason, StackTrace]),
{error, Reason}
end.
iter_procs(Lang, Fun) when is_binary(Lang) ->
Pattern = #proc_int{lang=Lang, client=undefined, _='_'},
MSpec = [{Pattern, [], ['$_']}],
case ets:select_reverse(?PROCS, MSpec, 25) of
'$end_of_table' ->
not_found;
Continuation ->
iter_procs_int(Continuation, Fun)
end.
iter_procs_int({[], Continuation0}, Fun) ->
case ets:select_reverse(Continuation0) of
'$end_of_table' ->
not_found;
Continuation1 ->
iter_procs_int(Continuation1, Fun)
end;
iter_procs_int({[Proc | Rest], Continuation}, Fun) ->
case Fun(Proc) of
true ->
{ok, Proc};
false ->
iter_procs_int({Rest, Continuation}, Fun)
end.
spawn_proc(State, Client) ->
Pid = spawn_link(?MODULE, new_proc, [Client]),
ets:insert(?OPENING, {Pid, Client}),
Counts = State#state.counts,
Lang = Client#client.lang,
State#state{
counts = dict:update_counter(Lang, 1, Counts)
}.
new_proc(#client{ddoc=undefined, ddoc_key=undefined}=Client) ->
#client{from=From, lang=Lang} = Client,
Resp = try
case new_proc_int(From, Lang) of
{ok, Proc} ->
{spawn_ok, Proc, From};
Error ->
gen_server:reply(From, {error, Error}),
spawn_error
end
catch _:_ ->
spawn_error
end,
exit(Resp);
new_proc(Client) ->
#client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey} = Client,
Resp = try
case new_proc_int(From, Lang) of
{ok, NewProc} ->
{ok, Proc} = teach_ddoc(DDoc, DDocKey, NewProc),
{spawn_ok, Proc, From};
Error ->
gen_server:reply(From, {error, Error}),
spawn_error
end
catch _:_ ->
spawn_error
end,
exit(Resp).
new_proc_int(From, Lang) when is_binary(Lang) ->
LangStr = binary_to_list(Lang),
case config:get("query_servers", LangStr) of
undefined ->
case config:get("native_query_servers", LangStr) of
undefined ->
gen_server:reply(From, {unknown_query_language, Lang});
SpecStr ->
{ok, {M,F,A}} = couch_util:parse_term(SpecStr),
{ok, Pid} = apply(M, F, A),
make_proc(Pid, Lang, M)
end;
Command ->
{ok, Pid} = couch_os_process:start_link(Command),
make_proc(Pid, Lang, couch_os_process)
end.
teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc_int{ddoc_keys=Keys}=Proc) ->
% send ddoc over the wire
% we only share the rev with the client we know to update code
% but it only keeps the latest copy, per each ddoc, around.
true = couch_query_servers:proc_prompt(
export_proc(Proc),
[<<"ddoc">>, <<"new">>, DDocId, couch_doc:to_json_obj(DDoc, [])]),
% we should remove any other ddocs keys for this docid
% because the query server overwrites without the rev
Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
% add ddoc to the proc
{ok, Proc#proc_int{ddoc_keys=[DDocKey|Keys2]}}.
make_proc(Pid, Lang, Mod) when is_binary(Lang) ->
Proc = #proc_int{
lang = Lang,
pid = Pid,
prompt_fun = {Mod, prompt},
set_timeout_fun = {Mod, set_timeout},
stop_fun = {Mod, stop}
},
unlink(Pid),
{ok, Proc}.
assign_proc(Pid, #proc_int{client=undefined}=Proc0) when is_pid(Pid) ->
Proc = Proc0#proc_int{client = erlang:monitor(process, Pid)},
ets:insert(?PROCS, Proc),
export_proc(Proc);
assign_proc(#client{}=Client, #proc_int{client=undefined}=Proc) ->
{Pid, _} = Client#client.from,
assign_proc(Pid, Proc).
return_proc(#state{} = State, #proc_int{} = ProcInt) ->
#proc_int{pid = Pid, lang = Lang} = ProcInt,
NewState = case is_process_alive(Pid) of true ->
case ProcInt#proc_int.t0 < State#state.threshold_ts of
true ->
remove_proc(State, ProcInt);
false ->
gen_server:cast(Pid, garbage_collect),
true = ets:update_element(?PROCS, Pid, [
{#proc_int.client, undefined}
]),
State
end;
false ->
remove_proc(State, ProcInt)
end,
flush_waiters(NewState, Lang).
remove_proc(State, #proc_int{}=Proc) ->
ets:delete(?PROCS, Proc#proc_int.pid),
case is_process_alive(Proc#proc_int.pid) of true ->
unlink(Proc#proc_int.pid),
gen_server:cast(Proc#proc_int.pid, stop);
false ->
ok
end,
Counts = State#state.counts,
Lang = Proc#proc_int.lang,
State#state{
counts = dict:update_counter(Lang, -1, Counts)
}.
-spec export_proc(#proc_int{}) -> #proc{}.
export_proc(#proc_int{} = ProcInt) ->
ProcIntList = tuple_to_list(ProcInt),
ProcLen = record_info(size, proc),
[_ | Data] = lists:sublist(ProcIntList, ProcLen),
list_to_tuple([proc | Data]).
flush_waiters(State) ->
dict:fold(fun(Lang, Count, StateAcc) ->
case Count < State#state.hard_limit of
true ->
flush_waiters(StateAcc, Lang);
false ->
StateAcc
end
end, State, State#state.counts).
flush_waiters(State, Lang) ->
CanSpawn = can_spawn(State, Lang),
case get_waiting_client(Lang) of
#client{from = From} = Client ->
case find_proc(Client) of
{ok, ProcInt} ->
Proc = assign_proc(Client, ProcInt),
gen_server:reply(From, {ok, Proc, State#state.config}),
remove_waiting_client(Client),
flush_waiters(State, Lang);
{error, Error} ->
gen_server:reply(From, {error, Error}),
remove_waiting_client(Client),
flush_waiters(State, Lang);
not_found when CanSpawn ->
NewState = spawn_proc(State, Client),
remove_waiting_client(Client),
flush_waiters(NewState, Lang);
not_found ->
State
end;
undefined ->
State
end.
add_waiting_client(Client) ->
ets:insert(?WAITERS, Client#client{timestamp=os:timestamp()}).
-spec get_waiting_client(Lang :: binary()) -> undefined | #client{}.
get_waiting_client(Lang) ->
case ets:match_object(?WAITERS, #client{lang=Lang, _='_'}, 1) of
'$end_of_table' ->
undefined;
{[#client{}=Client], _} ->
Client
end.
remove_waiting_client(#client{timestamp = Timestamp}) ->
ets:delete(?WAITERS, Timestamp).
can_spawn(#state{hard_limit = HardLimit, counts = Counts}, Lang) ->
case dict:find(Lang, Counts) of
{ok, Count} -> Count < HardLimit;
error -> true
end.
get_proc_config() ->
Limit = config:get("query_server_config", "reduce_limit", "true"),
Timeout = config:get("couchdb", "os_process_timeout", "5000"),
{[
{<<"reduce_limit">>, list_to_atom(Limit)},
{<<"timeout">>, list_to_integer(Timeout)}
]}.
get_hard_limit() ->
LimStr = config:get("query_server_config", "os_process_limit", "100"),
list_to_integer(LimStr).
get_soft_limit() ->
LimStr = config:get("query_server_config", "os_process_soft_limit", "100"),
list_to_integer(LimStr).