Merge pull request #1 from 'sagelywizard/2002-hard-os-proc-cap'

Add hard limit for OS processes
diff --git a/src/couch_proc_manager.erl b/src/couch_proc_manager.erl
index 8027e76..c8a5b77 100644
--- a/src/couch_proc_manager.erl
+++ b/src/couch_proc_manager.erl
@@ -17,7 +17,7 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 
     code_change/3]).
 
--export([start_link/0, get_proc_count/0, new_proc/2, new_proc/4]).
+-export([start_link/0, get_proc_count/0, new_proc/1]).
 
 % config_listener api
 -export([handle_config_change/5]).
@@ -26,7 +26,17 @@
 
 -record(state, {
     tab,
-    config
+    config,
+    proc_counts,
+    waiting
+}).
+
+-record(client, {
+    timestamp,
+    from,
+    lang,
+    ddoc,
+    ddoc_key
 }).
 
 start_link() ->
@@ -40,7 +50,10 @@
     ok = config:listen_for_changes(?MODULE, nil),
     {ok, #state{
         tab = ets:new(procs, [ordered_set, {keypos, #proc.pid}]),
-        config = get_proc_config()
+        config = get_proc_config(),
+        proc_counts = dict:new(),
+        waiting = ets:new(couch_proc_manage_waiting,
+                [ordered_set, {keypos, #client.timestamp}])
     }}.
 
 handle_call(get_table, _From, State) ->
@@ -50,12 +63,13 @@
     {reply, ets:info(State#state.tab, size), State};
 
 handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) ->
-    {Client, _} = From,
-    Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
+    {ClientPid, _} = From,
+    Lang = couch_util:to_binary(
+            couch_util:get_value(<<"language">>, Props, <<"javascript">>)),
     IterFun = fun(Proc, Acc) ->
         case lists:member(DDocKey, Proc#proc.ddoc_keys) of
             true ->
-                {stop, assign_proc(State#state.tab, Client, Proc)};
+                {stop, assign_proc(State#state.tab, ClientPid, Proc)};
             false ->
                 {ok, Acc}
         end
@@ -63,60 +77,41 @@
     TeachFun = fun(Proc0, Acc) ->
         try
             {ok, Proc1} = teach_ddoc(DDoc, DDocKey, Proc0),
-            {stop, assign_proc(State#state.tab, Client, Proc1)}
+            {stop, assign_proc(State#state.tab, ClientPid, Proc1)}
         catch _:_ ->
             {ok, Acc}
         end
     end,
-    try iter_procs(State#state.tab, Lang, IterFun, nil) of
-    {not_found, _} ->
-        case iter_procs(State#state.tab, Lang, TeachFun, nil) of
-        {not_found, _} ->
-            spawn_link(?MODULE, new_proc, [From, Lang, DDoc, DDocKey]),
-            {noreply, State};
-        {ok, Proc} ->
-            {reply, {ok, Proc, State#state.config}, State}
-        end;
-    {ok, Proc} ->
-        {reply, {ok, Proc, State#state.config}, State}
-    catch error:Reason ->
-        ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
-        {reply, {error, Reason}, State}
-    end;
+    Client = #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey},
+    find_proc(State, Client, [IterFun, TeachFun]);
 
-handle_call({get_proc, Lang}, {Client, _} = From, State) ->
+handle_call({get_proc, Lang}, From, State) ->
+    {ClientPid, _} = From,
     IterFun = fun(Proc, _Acc) ->
-        {stop, assign_proc(State#state.tab, Client, Proc)}
+        {stop, assign_proc(State#state.tab, ClientPid, Proc)}
     end,
-    try iter_procs(State#state.tab, Lang, IterFun, nil) of
-    {not_found, _} ->
-        spawn_link(?MODULE, new_proc, [From, Lang]),
-        {noreply, State};
-    {ok, Proc} ->
-        {reply, {ok, Proc, State#state.config}, State}
-    catch error:Reason ->
-        ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
-        {reply, {error, Reason}, State}
-    end;
+    Client = #client{from=From, lang=couch_util:to_binary(Lang)},
+    find_proc(State, Client, [IterFun]);
 
-handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) ->
+handle_call({ret_proc, #proc{client=Ref, lang=Lang0} = Proc}, _From, State) ->
     erlang:demonitor(Ref, [flush]),
+    Lang = couch_util:to_binary(Lang0),
     % We need to check if the process is alive here, as the client could be
     % handing us a #proc{} with a dead one.  We would have already removed the
     % #proc{} from our own table, so the alternative is to do a lookup in the
     % table before the insert.  Don't know which approach is cheaper.
-    return_proc(State#state.tab, Proc),
-    {reply, true, State};
+    {reply, true, return_proc(State, Proc#proc{lang=Lang})};
 
 handle_call(_Call, _From, State) ->
     {reply, ignored, State}.
 
-handle_cast({os_proc_idle, Pid}, #state{tab=Tab}=State) ->
-    Limit = config:get("query_server_config", "os_process_soft_limit", "100"),
-    case ets:lookup(Tab, Pid) of
-        [#proc{client=nil}] ->
-            case ets:info(Tab, size) > list_to_integer(Limit) of
-                true ->
+handle_cast({os_proc_idle, Pid}, #state{tab=Tab, proc_counts=Counts}=State0) ->
+    Limit = list_to_integer(
+            config:get("query_server_config", "os_process_soft_limit", "100")),
+    State = case ets:lookup(Tab, Pid) of
+        [#proc{client=nil, lang=Lang}] ->
+            case dict:find(Lang, Counts) of
+                {ok, Count} when Count > Limit ->
                     ?LOG_INFO("Closing idle OS Process: ~p", [Pid]),
                     ets:delete(Tab, Pid),
                     case is_process_alive(Pid) of
@@ -125,12 +120,15 @@
                             gen_server:cast(Pid, stop);
                         _ ->
                             ok
-                    end;
-                _ ->
-                    ok
+                    end,
+                    State0#state{
+                        proc_counts=dict:update_counter(Lang, -1, Counts)
+                    };
+                {ok, _} ->
+                    State0
             end;
         _ ->
-            ok
+            State0
     end,
     {noreply, State};
 handle_cast(reload_config, State) ->
@@ -142,25 +140,39 @@
 handle_info(shutdown, State) ->
     {stop, shutdown, State};
 
-handle_info({'EXIT', _, {ok, Proc0, {Client,_} = From}}, State) ->
+handle_info({'EXIT', _, {ok, Proc0, {ClientPid,_} = From}}, State) ->
     link(Proc0#proc.pid),
-    Proc = assign_proc(State#state.tab, Client, Proc0),
+    Proc = assign_proc(State#state.tab, ClientPid, Proc0),
     gen_server:reply(From, {ok, Proc, State#state.config}),
     {noreply, State};
 
 handle_info({'EXIT', Pid, Reason}, State) ->
+    #state{proc_counts=Counts, waiting=Waiting} = State,
     ?LOG_INFO("~p ~p died ~p", [?MODULE, Pid, Reason]),
+    MaybeProc = ets:lookup(State#state.tab, Pid),
     ets:delete(State#state.tab, Pid),
-    {noreply, State};
+    case MaybeProc of
+        [#proc{lang=Lang}] ->
+            case get_waiting_client(Waiting, Lang) of
+                nil ->
+                    {noreply, State#state{
+                        proc_counts=dict:update_counter(Lang, -1, Counts)
+                    }};
+                Client ->
+                    spawn_link(?MODULE, new_proc, [Client]),
+                    {noreply, State}
+            end;
+        [] ->
+            {noreply, State}
+    end;
 
-handle_info({'DOWN', Ref, _, _, _Reason}, State) ->
-    case ets:match_object(State#state.tab, #proc{client=Ref, _='_'}) of
+handle_info({'DOWN', Ref, _, _, _Reason}, State0) ->
+    case ets:match_object(State0#state.tab, #proc{client=Ref, _='_'}) of
     [] ->
-        ok;
+        {noreply, State0};
     [#proc{} = Proc] ->
-        return_proc(State#state.tab, Proc)
-    end,
-    {noreply, State};
+        {noreply, return_proc(State0, Proc)}
+    end;
 
 handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) ->
     erlang:send_after(5000, self(), restart_config_listener),
@@ -189,8 +201,21 @@
 handle_config_change(_, _, _, _, _) ->
     {ok, nil}.
 
-iter_procs(Tab, Lang, Fun, Acc) when is_binary(Lang) ->
-    iter_procs(Tab, binary_to_list(Lang), Fun, Acc);
+find_proc(State, Client, [Fun|FindFuns]) ->
+    try iter_procs(State#state.tab, Client#client.lang, Fun, nil) of
+    {not_found, _} ->
+        find_proc(State, Client, FindFuns);
+    {ok, Proc} ->
+        {reply, {ok, Proc, State#state.config}, State}
+    catch error:Reason ->
+        ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
+        {reply, {error, Reason}, State}
+    end;
+find_proc(State, Client, []) ->
+    {noreply, maybe_spawn_proc(State, Client)}.
+
+iter_procs(Tab, Lang, Fun, Acc) when is_list(Lang) ->
+    iter_procs(Tab, list_to_binary(Lang), Fun, Acc);
 iter_procs(Tab, Lang, Fun, Acc) ->
     Pattern = #proc{lang=Lang, client=nil, _='_'},
     MSpec = [{Pattern, [], ['$_']}],
@@ -216,15 +241,17 @@
             {ok, Acc1}
     end.
 
-new_proc(From, Lang) ->
+new_proc(#client{ddoc=undefined, ddoc_key=undefined}=Client) ->
+    #client{from=From, lang=Lang} = Client,
     case new_proc_int(From, Lang) of
     {ok, Proc} ->
         exit({ok, Proc, From});
     Error ->
         gen_server:reply(From, {error, Error})
-    end.
+    end;
 
-new_proc(From, Lang, DDoc, DDocKey) ->
+new_proc(Client) ->
+    #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey} = Client,
     case new_proc_int(From, Lang) of
     {ok, NewProc} ->
         case proc_with_ddoc(DDoc, DDocKey, [NewProc]) of
@@ -255,39 +282,6 @@
         make_proc(Pid, Lang, couch_os_process)
     end.
 
-make_proc(Pid, Lang, Mod) ->
-    Proc = #proc{
-        lang = Lang,
-        pid = Pid,
-        prompt_fun = {Mod, prompt},
-        prompt_many_fun = {Mod, prompt_many},
-        set_timeout_fun = {Mod, set_timeout},
-        stop_fun = {Mod, stop}
-    },
-    unlink(Pid),
-    {ok, Proc}.
-
-assign_proc(Tab, Client, #proc{client=nil}=Proc0) ->
-    Proc = Proc0#proc{client = erlang:monitor(process, Client)},
-    ets:insert(Tab, Proc),
-    Proc.
-
-return_proc(Tab, #proc{pid=Pid} = Proc) ->
-    case is_process_alive(Pid) of true ->
-        gen_server:cast(Pid, garbage_collect),
-        ets:insert(Tab, Proc#proc{client=nil});
-    false ->
-        ets:delete(Tab, Pid)
-    end.
-
-get_proc_config() ->
-    Limit = config:get("query_server_config", "reduce_limit", "true"),
-    Timeout = config:get("couchdb", "os_process_timeout", "5000"),
-    {[
-        {<<"reduce_limit">>, list_to_atom(Limit)},
-        {<<"timeout">>, list_to_integer(Timeout)}
-    ]}.
-
 proc_with_ddoc(DDoc, DDocKey, Procs) ->
     Filter = fun(#proc{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) end,
     case lists:dropwhile(Filter, Procs) of
@@ -317,3 +311,85 @@
     Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
     % add ddoc to the proc
     {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.
+
+make_proc(Pid, Lang, Mod) ->
+    Proc = #proc{
+        lang = Lang,
+        pid = Pid,
+        prompt_fun = {Mod, prompt},
+        prompt_many_fun = {Mod, prompt_many},
+        set_timeout_fun = {Mod, set_timeout},
+        stop_fun = {Mod, stop}
+    },
+    unlink(Pid),
+    {ok, Proc}.
+
+assign_proc(Tab, ClientPid, #proc{client=nil}=Proc0) when is_pid(ClientPid) ->
+    Proc = Proc0#proc{client = erlang:monitor(process, ClientPid)},
+    ets:insert(Tab, Proc),
+    Proc;
+assign_proc(Tab, #client{}=Client, #proc{client=nil}=Proc) ->
+    {Pid, _} = Client#client.from,
+    assign_proc(Tab, Pid, Proc).
+
+return_proc(State, #proc{pid=Pid, lang=Lang} = Proc) ->
+    #state{tab=Tab, waiting=Waiting} = State,
+    case is_process_alive(Pid) of true ->
+        case get_waiting_client(Waiting, Lang) of
+            nil ->
+                gen_server:cast(Pid, garbage_collect),
+                ets:insert(Tab, Proc#proc{client=nil}),
+                State;
+            #client{}=Client ->
+                From = Client#client.from,
+                assign_proc(Tab, Client, Proc#proc{client=nil}),
+                gen_server:reply(From, {ok, Proc, State#state.config}),
+                State
+        end;
+    false ->
+        ets:delete(Tab, Pid),
+        case get_waiting_client(Waiting, Lang) of
+            nil ->
+                State;
+            #client{}=Client ->
+                maybe_spawn_proc(State, Client)
+        end
+    end.
+
+maybe_spawn_proc(State, Client) ->
+    #state{proc_counts=Counts, waiting=Waiting} = State,
+    #client{lang=Lang} = Client,
+    Limit = list_to_integer(config:get(
+                "query_server_config", "os_process_limit", "100")),
+    case dict:find(Lang, Counts) of
+    {ok, Limit} ->
+        add_waiting_client(Waiting, Client),
+        State;
+    _ ->
+        spawn_link(?MODULE, new_proc, [Client]),
+        State#state{
+            proc_counts=dict:update_counter(Lang, 1, Counts)
+        }
+    end.
+
+add_waiting_client(Tab, Client) ->
+    ets:insert(Tab, Client#client{timestamp=now()}).
+
+get_waiting_client(Tab, Lang) when is_list(Lang) ->
+    get_waiting_client(Tab, couch_util:to_binary(Lang));
+get_waiting_client(Tab, Lang) ->
+    case ets:match_object(Tab, #client{lang=Lang, _='_'}, 1) of
+        '$end_of_table' ->
+            nil;
+        {[#client{}=Client], _} ->
+            ets:delete(Tab, Client#client.timestamp),
+            Client
+    end.
+
+get_proc_config() ->
+    Limit = config:get("query_server_config", "reduce_limit", "true"),
+    Timeout = config:get("couchdb", "os_process_timeout", "5000"),
+    {[
+        {<<"reduce_limit">>, list_to_atom(Limit)},
+        {<<"timeout">>, list_to_integer(Timeout)}
+    ]}.