blob: c8a5b77b74b5433dea92c4fe5b0cb472445fac9f [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_proc_manager).
-behaviour(gen_server).
-behaviour(config_listener).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([start_link/0, get_proc_count/0, new_proc/1]).
% config_listener api
-export([handle_config_change/5]).
-include_lib("couch/include/couch_db.hrl").
-record(state, {
tab,
config,
proc_counts,
waiting
}).
-record(client, {
timestamp,
from,
lang,
ddoc,
ddoc_key
}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
get_proc_count() ->
gen_server:call(?MODULE, get_proc_count).
init([]) ->
process_flag(trap_exit, true),
ok = config:listen_for_changes(?MODULE, nil),
{ok, #state{
tab = ets:new(procs, [ordered_set, {keypos, #proc.pid}]),
config = get_proc_config(),
proc_counts = dict:new(),
waiting = ets:new(couch_proc_manage_waiting,
[ordered_set, {keypos, #client.timestamp}])
}}.
handle_call(get_table, _From, State) ->
{reply, State#state.tab, State};
handle_call(get_proc_count, _From, State) ->
{reply, ets:info(State#state.tab, size), State};
handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) ->
{ClientPid, _} = From,
Lang = couch_util:to_binary(
couch_util:get_value(<<"language">>, Props, <<"javascript">>)),
IterFun = fun(Proc, Acc) ->
case lists:member(DDocKey, Proc#proc.ddoc_keys) of
true ->
{stop, assign_proc(State#state.tab, ClientPid, Proc)};
false ->
{ok, Acc}
end
end,
TeachFun = fun(Proc0, Acc) ->
try
{ok, Proc1} = teach_ddoc(DDoc, DDocKey, Proc0),
{stop, assign_proc(State#state.tab, ClientPid, Proc1)}
catch _:_ ->
{ok, Acc}
end
end,
Client = #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey},
find_proc(State, Client, [IterFun, TeachFun]);
handle_call({get_proc, Lang}, From, State) ->
{ClientPid, _} = From,
IterFun = fun(Proc, _Acc) ->
{stop, assign_proc(State#state.tab, ClientPid, Proc)}
end,
Client = #client{from=From, lang=couch_util:to_binary(Lang)},
find_proc(State, Client, [IterFun]);
handle_call({ret_proc, #proc{client=Ref, lang=Lang0} = Proc}, _From, State) ->
erlang:demonitor(Ref, [flush]),
Lang = couch_util:to_binary(Lang0),
% We need to check if the process is alive here, as the client could be
% handing us a #proc{} with a dead one. We would have already removed the
% #proc{} from our own table, so the alternative is to do a lookup in the
% table before the insert. Don't know which approach is cheaper.
{reply, true, return_proc(State, Proc#proc{lang=Lang})};
handle_call(_Call, _From, State) ->
{reply, ignored, State}.
handle_cast({os_proc_idle, Pid}, #state{tab=Tab, proc_counts=Counts}=State0) ->
Limit = list_to_integer(
config:get("query_server_config", "os_process_soft_limit", "100")),
State = case ets:lookup(Tab, Pid) of
[#proc{client=nil, lang=Lang}] ->
case dict:find(Lang, Counts) of
{ok, Count} when Count > Limit ->
?LOG_INFO("Closing idle OS Process: ~p", [Pid]),
ets:delete(Tab, Pid),
case is_process_alive(Pid) of
true ->
unlink(Pid),
gen_server:cast(Pid, stop);
_ ->
ok
end,
State0#state{
proc_counts=dict:update_counter(Lang, -1, Counts)
};
{ok, _} ->
State0
end;
_ ->
State0
end,
{noreply, State};
handle_cast(reload_config, State) ->
{noreply, State#state{config = get_proc_config()}};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(shutdown, State) ->
{stop, shutdown, State};
handle_info({'EXIT', _, {ok, Proc0, {ClientPid,_} = From}}, State) ->
link(Proc0#proc.pid),
Proc = assign_proc(State#state.tab, ClientPid, Proc0),
gen_server:reply(From, {ok, Proc, State#state.config}),
{noreply, State};
handle_info({'EXIT', Pid, Reason}, State) ->
#state{proc_counts=Counts, waiting=Waiting} = State,
?LOG_INFO("~p ~p died ~p", [?MODULE, Pid, Reason]),
MaybeProc = ets:lookup(State#state.tab, Pid),
ets:delete(State#state.tab, Pid),
case MaybeProc of
[#proc{lang=Lang}] ->
case get_waiting_client(Waiting, Lang) of
nil ->
{noreply, State#state{
proc_counts=dict:update_counter(Lang, -1, Counts)
}};
Client ->
spawn_link(?MODULE, new_proc, [Client]),
{noreply, State}
end;
[] ->
{noreply, State}
end;
handle_info({'DOWN', Ref, _, _, _Reason}, State0) ->
case ets:match_object(State0#state.tab, #proc{client=Ref, _='_'}) of
[] ->
{noreply, State0};
[#proc{} = Proc] ->
{noreply, return_proc(State0, Proc)}
end;
handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) ->
erlang:send_after(5000, self(), restart_config_listener),
{noreply, State};
handle_info(restart_config_lister, State) ->
ok = config:listen_for_changes(?MODULE, nil),
{noreply, State};
handle_info(_Msg, State) ->
{noreply, State}.
terminate(_Reason, #state{tab=Tab}) ->
ets:foldl(fun(#proc{pid=P}, _) -> couch_util:shutdown_sync(P) end, 0, Tab),
ok.
code_change(_OldVsn, #state{tab = Tab} = State, _Extra) ->
NewTab = ets:new(procs, [ordered_set, {keypos, #proc.pid}]),
true = ets:insert(NewTab, ets:tab2list(Tab)),
true = ets:delete(Tab),
{ok, State#state{tab = NewTab}}.
handle_config_change("query_server_config", _, _, _, _) ->
gen_server:cast(?MODULE, reload_config),
{ok, nil};
handle_config_change(_, _, _, _, _) ->
{ok, nil}.
find_proc(State, Client, [Fun|FindFuns]) ->
try iter_procs(State#state.tab, Client#client.lang, Fun, nil) of
{not_found, _} ->
find_proc(State, Client, FindFuns);
{ok, Proc} ->
{reply, {ok, Proc, State#state.config}, State}
catch error:Reason ->
?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
{reply, {error, Reason}, State}
end;
find_proc(State, Client, []) ->
{noreply, maybe_spawn_proc(State, Client)}.
iter_procs(Tab, Lang, Fun, Acc) when is_list(Lang) ->
iter_procs(Tab, list_to_binary(Lang), Fun, Acc);
iter_procs(Tab, Lang, Fun, Acc) ->
Pattern = #proc{lang=Lang, client=nil, _='_'},
MSpec = [{Pattern, [], ['$_']}],
case ets:select_reverse(Tab, MSpec, 25) of
'$end_of_table' ->
{not_found, Acc};
Continuation ->
iter_procs(Continuation, Fun, Acc)
end.
iter_procs({[], Continuation0}, Fun, Acc) ->
case ets:select_reverse(Continuation0) of
'$end_of_table' ->
{not_found, Acc};
Continuation1 ->
iter_procs(Continuation1, Fun, Acc)
end;
iter_procs({[Proc | Rest], Continuation}, Fun, Acc0) ->
case Fun(Proc, Acc0) of
{ok, Acc1} ->
iter_procs({Rest, Continuation}, Fun, Acc1);
{stop, Acc1} ->
{ok, Acc1}
end.
new_proc(#client{ddoc=undefined, ddoc_key=undefined}=Client) ->
#client{from=From, lang=Lang} = Client,
case new_proc_int(From, Lang) of
{ok, Proc} ->
exit({ok, Proc, From});
Error ->
gen_server:reply(From, {error, Error})
end;
new_proc(Client) ->
#client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey} = Client,
case new_proc_int(From, Lang) of
{ok, NewProc} ->
case proc_with_ddoc(DDoc, DDocKey, [NewProc]) of
{ok, Proc} ->
exit({ok, Proc, From});
{error, Reason} ->
gen_server:reply(From, {error, Reason})
end;
Error ->
gen_server:reply(From, {error, Error})
end.
new_proc_int(From, Lang) when is_binary(Lang) ->
new_proc_int(From, binary_to_list(Lang));
new_proc_int(From, Lang) when is_list(Lang) ->
case config:get("query_servers", Lang) of
undefined ->
case config:get("native_query_servers", Lang) of
undefined ->
gen_server:reply(From, {unknown_query_language, Lang});
SpecStr ->
{ok, {M,F,A}} = couch_util:parse_term(SpecStr),
{ok, Pid} = apply(M, F, A),
make_proc(Pid, Lang, M)
end;
Command ->
{ok, Pid} = couch_os_process:start_link(Command),
make_proc(Pid, Lang, couch_os_process)
end.
proc_with_ddoc(DDoc, DDocKey, Procs) ->
Filter = fun(#proc{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) end,
case lists:dropwhile(Filter, Procs) of
[DDocProc|_] ->
{ok, DDocProc};
[] ->
teach_any_proc(DDoc, DDocKey, Procs)
end.
teach_any_proc(DDoc, DDocKey, [Proc|Rest]) ->
try
teach_ddoc(DDoc, DDocKey, Proc)
catch _:_ ->
teach_any_proc(DDoc, DDocKey, Rest)
end;
teach_any_proc(_, _, []) ->
{error, noproc}.
teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->
% send ddoc over the wire
% we only share the rev with the client we know to update code
% but it only keeps the latest copy, per each ddoc, around.
true = couch_query_servers:proc_prompt(Proc, [<<"ddoc">>, <<"new">>,
DDocId, couch_doc:to_json_obj(DDoc, [])]),
% we should remove any other ddocs keys for this docid
% because the query server overwrites without the rev
Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
% add ddoc to the proc
{ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.
make_proc(Pid, Lang, Mod) ->
Proc = #proc{
lang = Lang,
pid = Pid,
prompt_fun = {Mod, prompt},
prompt_many_fun = {Mod, prompt_many},
set_timeout_fun = {Mod, set_timeout},
stop_fun = {Mod, stop}
},
unlink(Pid),
{ok, Proc}.
assign_proc(Tab, ClientPid, #proc{client=nil}=Proc0) when is_pid(ClientPid) ->
Proc = Proc0#proc{client = erlang:monitor(process, ClientPid)},
ets:insert(Tab, Proc),
Proc;
assign_proc(Tab, #client{}=Client, #proc{client=nil}=Proc) ->
{Pid, _} = Client#client.from,
assign_proc(Tab, Pid, Proc).
return_proc(State, #proc{pid=Pid, lang=Lang} = Proc) ->
#state{tab=Tab, waiting=Waiting} = State,
case is_process_alive(Pid) of true ->
case get_waiting_client(Waiting, Lang) of
nil ->
gen_server:cast(Pid, garbage_collect),
ets:insert(Tab, Proc#proc{client=nil}),
State;
#client{}=Client ->
From = Client#client.from,
assign_proc(Tab, Client, Proc#proc{client=nil}),
gen_server:reply(From, {ok, Proc, State#state.config}),
State
end;
false ->
ets:delete(Tab, Pid),
case get_waiting_client(Waiting, Lang) of
nil ->
State;
#client{}=Client ->
maybe_spawn_proc(State, Client)
end
end.
maybe_spawn_proc(State, Client) ->
#state{proc_counts=Counts, waiting=Waiting} = State,
#client{lang=Lang} = Client,
Limit = list_to_integer(config:get(
"query_server_config", "os_process_limit", "100")),
case dict:find(Lang, Counts) of
{ok, Limit} ->
add_waiting_client(Waiting, Client),
State;
_ ->
spawn_link(?MODULE, new_proc, [Client]),
State#state{
proc_counts=dict:update_counter(Lang, 1, Counts)
}
end.
add_waiting_client(Tab, Client) ->
ets:insert(Tab, Client#client{timestamp=now()}).
get_waiting_client(Tab, Lang) when is_list(Lang) ->
get_waiting_client(Tab, couch_util:to_binary(Lang));
get_waiting_client(Tab, Lang) ->
case ets:match_object(Tab, #client{lang=Lang, _='_'}, 1) of
'$end_of_table' ->
nil;
{[#client{}=Client], _} ->
ets:delete(Tab, Client#client.timestamp),
Client
end.
get_proc_config() ->
Limit = config:get("query_server_config", "reduce_limit", "true"),
Timeout = config:get("couchdb", "os_process_timeout", "5000"),
{[
{<<"reduce_limit">>, list_to_atom(Limit)},
{<<"timeout">>, list_to_integer(Timeout)}
]}.