Merge pull request #1 from 'sagelywizard/2002-hard-os-proc-cap'
Add hard limit for OS processes
diff --git a/src/couch_proc_manager.erl b/src/couch_proc_manager.erl
index 8027e76..c8a5b77 100644
--- a/src/couch_proc_manager.erl
+++ b/src/couch_proc_manager.erl
@@ -17,7 +17,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([start_link/0, get_proc_count/0, new_proc/2, new_proc/4]).
+-export([start_link/0, get_proc_count/0, new_proc/1]).
% config_listener api
-export([handle_config_change/5]).
@@ -26,7 +26,17 @@
-record(state, {
tab,
- config
+ config,
+ proc_counts,
+ waiting
+}).
+
+-record(client, {
+ timestamp,
+ from,
+ lang,
+ ddoc,
+ ddoc_key
}).
start_link() ->
@@ -40,7 +50,10 @@
ok = config:listen_for_changes(?MODULE, nil),
{ok, #state{
tab = ets:new(procs, [ordered_set, {keypos, #proc.pid}]),
- config = get_proc_config()
+ config = get_proc_config(),
+ proc_counts = dict:new(),
+ waiting = ets:new(couch_proc_manage_waiting,
+ [ordered_set, {keypos, #client.timestamp}])
}}.
handle_call(get_table, _From, State) ->
@@ -50,12 +63,13 @@
{reply, ets:info(State#state.tab, size), State};
handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) ->
- {Client, _} = From,
- Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
+ {ClientPid, _} = From,
+ Lang = couch_util:to_binary(
+ couch_util:get_value(<<"language">>, Props, <<"javascript">>)),
IterFun = fun(Proc, Acc) ->
case lists:member(DDocKey, Proc#proc.ddoc_keys) of
true ->
- {stop, assign_proc(State#state.tab, Client, Proc)};
+ {stop, assign_proc(State#state.tab, ClientPid, Proc)};
false ->
{ok, Acc}
end
@@ -63,60 +77,41 @@
TeachFun = fun(Proc0, Acc) ->
try
{ok, Proc1} = teach_ddoc(DDoc, DDocKey, Proc0),
- {stop, assign_proc(State#state.tab, Client, Proc1)}
+ {stop, assign_proc(State#state.tab, ClientPid, Proc1)}
catch _:_ ->
{ok, Acc}
end
end,
- try iter_procs(State#state.tab, Lang, IterFun, nil) of
- {not_found, _} ->
- case iter_procs(State#state.tab, Lang, TeachFun, nil) of
- {not_found, _} ->
- spawn_link(?MODULE, new_proc, [From, Lang, DDoc, DDocKey]),
- {noreply, State};
- {ok, Proc} ->
- {reply, {ok, Proc, State#state.config}, State}
- end;
- {ok, Proc} ->
- {reply, {ok, Proc, State#state.config}, State}
- catch error:Reason ->
- ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
- {reply, {error, Reason}, State}
- end;
+ Client = #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey},
+ find_proc(State, Client, [IterFun, TeachFun]);
-handle_call({get_proc, Lang}, {Client, _} = From, State) ->
+handle_call({get_proc, Lang}, From, State) ->
+ {ClientPid, _} = From,
IterFun = fun(Proc, _Acc) ->
- {stop, assign_proc(State#state.tab, Client, Proc)}
+ {stop, assign_proc(State#state.tab, ClientPid, Proc)}
end,
- try iter_procs(State#state.tab, Lang, IterFun, nil) of
- {not_found, _} ->
- spawn_link(?MODULE, new_proc, [From, Lang]),
- {noreply, State};
- {ok, Proc} ->
- {reply, {ok, Proc, State#state.config}, State}
- catch error:Reason ->
- ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
- {reply, {error, Reason}, State}
- end;
+ Client = #client{from=From, lang=couch_util:to_binary(Lang)},
+ find_proc(State, Client, [IterFun]);
-handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) ->
+handle_call({ret_proc, #proc{client=Ref, lang=Lang0} = Proc}, _From, State) ->
erlang:demonitor(Ref, [flush]),
+ Lang = couch_util:to_binary(Lang0),
% We need to check if the process is alive here, as the client could be
% handing us a #proc{} with a dead one. We would have already removed the
% #proc{} from our own table, so the alternative is to do a lookup in the
% table before the insert. Don't know which approach is cheaper.
- return_proc(State#state.tab, Proc),
- {reply, true, State};
+ {reply, true, return_proc(State, Proc#proc{lang=Lang})};
handle_call(_Call, _From, State) ->
{reply, ignored, State}.
-handle_cast({os_proc_idle, Pid}, #state{tab=Tab}=State) ->
- Limit = config:get("query_server_config", "os_process_soft_limit", "100"),
- case ets:lookup(Tab, Pid) of
- [#proc{client=nil}] ->
- case ets:info(Tab, size) > list_to_integer(Limit) of
- true ->
+handle_cast({os_proc_idle, Pid}, #state{tab=Tab, proc_counts=Counts}=State0) ->
+ Limit = list_to_integer(
+ config:get("query_server_config", "os_process_soft_limit", "100")),
+ State = case ets:lookup(Tab, Pid) of
+ [#proc{client=nil, lang=Lang}] ->
+ case dict:find(Lang, Counts) of
+ {ok, Count} when Count > Limit ->
?LOG_INFO("Closing idle OS Process: ~p", [Pid]),
ets:delete(Tab, Pid),
case is_process_alive(Pid) of
@@ -125,12 +120,15 @@
gen_server:cast(Pid, stop);
_ ->
ok
- end;
- _ ->
- ok
+ end,
+ State0#state{
+ proc_counts=dict:update_counter(Lang, -1, Counts)
+ };
+ {ok, _} ->
+ State0
end;
_ ->
- ok
+ State0
end,
{noreply, State};
handle_cast(reload_config, State) ->
@@ -142,25 +140,39 @@
handle_info(shutdown, State) ->
{stop, shutdown, State};
-handle_info({'EXIT', _, {ok, Proc0, {Client,_} = From}}, State) ->
+handle_info({'EXIT', _, {ok, Proc0, {ClientPid,_} = From}}, State) ->
link(Proc0#proc.pid),
- Proc = assign_proc(State#state.tab, Client, Proc0),
+ Proc = assign_proc(State#state.tab, ClientPid, Proc0),
gen_server:reply(From, {ok, Proc, State#state.config}),
{noreply, State};
handle_info({'EXIT', Pid, Reason}, State) ->
+ #state{proc_counts=Counts, waiting=Waiting} = State,
?LOG_INFO("~p ~p died ~p", [?MODULE, Pid, Reason]),
+ MaybeProc = ets:lookup(State#state.tab, Pid),
ets:delete(State#state.tab, Pid),
- {noreply, State};
+ case MaybeProc of
+ [#proc{lang=Lang}] ->
+ case get_waiting_client(Waiting, Lang) of
+ nil ->
+ {noreply, State#state{
+ proc_counts=dict:update_counter(Lang, -1, Counts)
+ }};
+ Client ->
+ spawn_link(?MODULE, new_proc, [Client]),
+ {noreply, State}
+ end;
+ [] ->
+ {noreply, State}
+ end;
-handle_info({'DOWN', Ref, _, _, _Reason}, State) ->
- case ets:match_object(State#state.tab, #proc{client=Ref, _='_'}) of
+handle_info({'DOWN', Ref, _, _, _Reason}, State0) ->
+ case ets:match_object(State0#state.tab, #proc{client=Ref, _='_'}) of
[] ->
- ok;
+ {noreply, State0};
[#proc{} = Proc] ->
- return_proc(State#state.tab, Proc)
- end,
- {noreply, State};
+ {noreply, return_proc(State0, Proc)}
+ end;
handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) ->
erlang:send_after(5000, self(), restart_config_listener),
@@ -189,8 +201,21 @@
handle_config_change(_, _, _, _, _) ->
{ok, nil}.
-iter_procs(Tab, Lang, Fun, Acc) when is_binary(Lang) ->
- iter_procs(Tab, binary_to_list(Lang), Fun, Acc);
+find_proc(State, Client, [Fun|FindFuns]) ->
+ try iter_procs(State#state.tab, Client#client.lang, Fun, nil) of
+ {not_found, _} ->
+ find_proc(State, Client, FindFuns);
+ {ok, Proc} ->
+ {reply, {ok, Proc, State#state.config}, State}
+ catch error:Reason ->
+ ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
+ {reply, {error, Reason}, State}
+ end;
+find_proc(State, Client, []) ->
+ {noreply, maybe_spawn_proc(State, Client)}.
+
+iter_procs(Tab, Lang, Fun, Acc) when is_list(Lang) ->
+ iter_procs(Tab, list_to_binary(Lang), Fun, Acc);
iter_procs(Tab, Lang, Fun, Acc) ->
Pattern = #proc{lang=Lang, client=nil, _='_'},
MSpec = [{Pattern, [], ['$_']}],
@@ -216,15 +241,17 @@
{ok, Acc1}
end.
-new_proc(From, Lang) ->
+new_proc(#client{ddoc=undefined, ddoc_key=undefined}=Client) ->
+ #client{from=From, lang=Lang} = Client,
case new_proc_int(From, Lang) of
{ok, Proc} ->
exit({ok, Proc, From});
Error ->
gen_server:reply(From, {error, Error})
- end.
+ end;
-new_proc(From, Lang, DDoc, DDocKey) ->
+new_proc(Client) ->
+ #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey} = Client,
case new_proc_int(From, Lang) of
{ok, NewProc} ->
case proc_with_ddoc(DDoc, DDocKey, [NewProc]) of
@@ -255,39 +282,6 @@
make_proc(Pid, Lang, couch_os_process)
end.
-make_proc(Pid, Lang, Mod) ->
- Proc = #proc{
- lang = Lang,
- pid = Pid,
- prompt_fun = {Mod, prompt},
- prompt_many_fun = {Mod, prompt_many},
- set_timeout_fun = {Mod, set_timeout},
- stop_fun = {Mod, stop}
- },
- unlink(Pid),
- {ok, Proc}.
-
-assign_proc(Tab, Client, #proc{client=nil}=Proc0) ->
- Proc = Proc0#proc{client = erlang:monitor(process, Client)},
- ets:insert(Tab, Proc),
- Proc.
-
-return_proc(Tab, #proc{pid=Pid} = Proc) ->
- case is_process_alive(Pid) of true ->
- gen_server:cast(Pid, garbage_collect),
- ets:insert(Tab, Proc#proc{client=nil});
- false ->
- ets:delete(Tab, Pid)
- end.
-
-get_proc_config() ->
- Limit = config:get("query_server_config", "reduce_limit", "true"),
- Timeout = config:get("couchdb", "os_process_timeout", "5000"),
- {[
- {<<"reduce_limit">>, list_to_atom(Limit)},
- {<<"timeout">>, list_to_integer(Timeout)}
- ]}.
-
proc_with_ddoc(DDoc, DDocKey, Procs) ->
Filter = fun(#proc{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) end,
case lists:dropwhile(Filter, Procs) of
@@ -317,3 +311,85 @@
Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
% add ddoc to the proc
{ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.
+
+make_proc(Pid, Lang, Mod) ->
+ Proc = #proc{
+ lang = Lang,
+ pid = Pid,
+ prompt_fun = {Mod, prompt},
+ prompt_many_fun = {Mod, prompt_many},
+ set_timeout_fun = {Mod, set_timeout},
+ stop_fun = {Mod, stop}
+ },
+ unlink(Pid),
+ {ok, Proc}.
+
+assign_proc(Tab, ClientPid, #proc{client=nil}=Proc0) when is_pid(ClientPid) ->
+ Proc = Proc0#proc{client = erlang:monitor(process, ClientPid)},
+ ets:insert(Tab, Proc),
+ Proc;
+assign_proc(Tab, #client{}=Client, #proc{client=nil}=Proc) ->
+ {Pid, _} = Client#client.from,
+ assign_proc(Tab, Pid, Proc).
+
+return_proc(State, #proc{pid=Pid, lang=Lang} = Proc) ->
+ #state{tab=Tab, waiting=Waiting} = State,
+ case is_process_alive(Pid) of true ->
+ case get_waiting_client(Waiting, Lang) of
+ nil ->
+ gen_server:cast(Pid, garbage_collect),
+ ets:insert(Tab, Proc#proc{client=nil}),
+ State;
+ #client{}=Client ->
+ From = Client#client.from,
+ assign_proc(Tab, Client, Proc#proc{client=nil}),
+ gen_server:reply(From, {ok, Proc, State#state.config}),
+ State
+ end;
+ false ->
+ ets:delete(Tab, Pid),
+ case get_waiting_client(Waiting, Lang) of
+ nil ->
+ State;
+ #client{}=Client ->
+ maybe_spawn_proc(State, Client)
+ end
+ end.
+
+maybe_spawn_proc(State, Client) ->
+ #state{proc_counts=Counts, waiting=Waiting} = State,
+ #client{lang=Lang} = Client,
+ Limit = list_to_integer(config:get(
+ "query_server_config", "os_process_limit", "100")),
+ case dict:find(Lang, Counts) of
+ {ok, Limit} ->
+ add_waiting_client(Waiting, Client),
+ State;
+ _ ->
+ spawn_link(?MODULE, new_proc, [Client]),
+ State#state{
+ proc_counts=dict:update_counter(Lang, 1, Counts)
+ }
+ end.
+
+add_waiting_client(Tab, Client) ->
+ ets:insert(Tab, Client#client{timestamp=now()}).
+
+get_waiting_client(Tab, Lang) when is_list(Lang) ->
+ get_waiting_client(Tab, couch_util:to_binary(Lang));
+get_waiting_client(Tab, Lang) ->
+ case ets:match_object(Tab, #client{lang=Lang, _='_'}, 1) of
+ '$end_of_table' ->
+ nil;
+ {[#client{}=Client], _} ->
+ ets:delete(Tab, Client#client.timestamp),
+ Client
+ end.
+
+get_proc_config() ->
+ Limit = config:get("query_server_config", "reduce_limit", "true"),
+ Timeout = config:get("couchdb", "os_process_timeout", "5000"),
+ {[
+ {<<"reduce_limit">>, list_to_atom(Limit)},
+ {<<"timeout">>, list_to_integer(Timeout)}
+ ]}.