Rearrange couch_proc_manager for readability. No functional changes.
diff --git a/src/couch_proc_manager.erl b/src/couch_proc_manager.erl
index 5dd000b..c8a5b77 100644
--- a/src/couch_proc_manager.erl
+++ b/src/couch_proc_manager.erl
@@ -201,6 +201,19 @@
handle_config_change(_, _, _, _, _) ->
{ok, nil}.
+find_proc(State, Client, [Fun|FindFuns]) ->
+ try iter_procs(State#state.tab, Client#client.lang, Fun, nil) of
+ {not_found, _} ->
+ find_proc(State, Client, FindFuns);
+ {ok, Proc} ->
+ {reply, {ok, Proc, State#state.config}, State}
+ catch error:Reason ->
+ ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
+ {reply, {error, Reason}, State}
+ end;
+find_proc(State, Client, []) ->
+ {noreply, maybe_spawn_proc(State, Client)}.
+
iter_procs(Tab, Lang, Fun, Acc) when is_list(Lang) ->
iter_procs(Tab, list_to_binary(Lang), Fun, Acc);
iter_procs(Tab, Lang, Fun, Acc) ->
@@ -269,6 +282,36 @@
make_proc(Pid, Lang, couch_os_process)
end.
+proc_with_ddoc(DDoc, DDocKey, Procs) ->
+ Filter = fun(#proc{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) end,
+ case lists:dropwhile(Filter, Procs) of
+ [DDocProc|_] ->
+ {ok, DDocProc};
+ [] ->
+ teach_any_proc(DDoc, DDocKey, Procs)
+ end.
+
+teach_any_proc(DDoc, DDocKey, [Proc|Rest]) ->
+ try
+ teach_ddoc(DDoc, DDocKey, Proc)
+ catch _:_ ->
+ teach_any_proc(DDoc, DDocKey, Rest)
+ end;
+teach_any_proc(_, _, []) ->
+ {error, noproc}.
+
+teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->
+ % send ddoc over the wire
+ % we only share the rev with the client we know to update code
+ % but it only keeps the latest copy, per each ddoc, around.
+ true = couch_query_servers:proc_prompt(Proc, [<<"ddoc">>, <<"new">>,
+ DDocId, couch_doc:to_json_obj(DDoc, [])]),
+ % we should remove any other ddocs keys for this docid
+ % because the query server overwrites without the rev
+ Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
+ % add ddoc to the proc
+ {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.
+
make_proc(Pid, Lang, Mod) ->
Proc = #proc{
lang = Lang,
@@ -313,58 +356,6 @@
end
end.
-get_waiting_client(Tab, Lang) when is_list(Lang) ->
- get_waiting_client(Tab, couch_util:to_binary(Lang));
-get_waiting_client(Tab, Lang) ->
- case ets:match_object(Tab, #client{lang=Lang, _='_'}, 1) of
- '$end_of_table' ->
- nil;
- {[#client{}=Client], _} ->
- ets:delete(Tab, Client#client.timestamp),
- Client
- end.
-
-add_waiting_client(Tab, Client) ->
- ets:insert(Tab, Client#client{timestamp=now()}).
-
-get_proc_config() ->
- Limit = config:get("query_server_config", "reduce_limit", "true"),
- Timeout = config:get("couchdb", "os_process_timeout", "5000"),
- {[
- {<<"reduce_limit">>, list_to_atom(Limit)},
- {<<"timeout">>, list_to_integer(Timeout)}
- ]}.
-
-proc_with_ddoc(DDoc, DDocKey, Procs) ->
- Filter = fun(#proc{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) end,
- case lists:dropwhile(Filter, Procs) of
- [DDocProc|_] ->
- {ok, DDocProc};
- [] ->
- teach_any_proc(DDoc, DDocKey, Procs)
- end.
-
-teach_any_proc(DDoc, DDocKey, [Proc|Rest]) ->
- try
- teach_ddoc(DDoc, DDocKey, Proc)
- catch _:_ ->
- teach_any_proc(DDoc, DDocKey, Rest)
- end;
-teach_any_proc(_, _, []) ->
- {error, noproc}.
-
-teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->
- % send ddoc over the wire
- % we only share the rev with the client we know to update code
- % but it only keeps the latest copy, per each ddoc, around.
- true = couch_query_servers:proc_prompt(Proc, [<<"ddoc">>, <<"new">>,
- DDocId, couch_doc:to_json_obj(DDoc, [])]),
- % we should remove any other ddocs keys for this docid
- % because the query server overwrites without the rev
- Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
- % add ddoc to the proc
- {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.
-
maybe_spawn_proc(State, Client) ->
#state{proc_counts=Counts, waiting=Waiting} = State,
#client{lang=Lang} = Client,
@@ -381,15 +372,24 @@
}
end.
-find_proc(State, Client, [Fun|FindFuns]) ->
- try iter_procs(State#state.tab, Client#client.lang, Fun, nil) of
- {not_found, _} ->
- find_proc(State, Client, FindFuns);
- {ok, Proc} ->
- {reply, {ok, Proc, State#state.config}, State}
- catch error:Reason ->
- ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
- {reply, {error, Reason}, State}
- end;
-find_proc(State, Client, []) ->
- {noreply, maybe_spawn_proc(State, Client)}.
+add_waiting_client(Tab, Client) ->
+ ets:insert(Tab, Client#client{timestamp=now()}).
+
+get_waiting_client(Tab, Lang) when is_list(Lang) ->
+ get_waiting_client(Tab, couch_util:to_binary(Lang));
+get_waiting_client(Tab, Lang) ->
+ case ets:match_object(Tab, #client{lang=Lang, _='_'}, 1) of
+ '$end_of_table' ->
+ nil;
+ {[#client{}=Client], _} ->
+ ets:delete(Tab, Client#client.timestamp),
+ Client
+ end.
+
+get_proc_config() ->
+ Limit = config:get("query_server_config", "reduce_limit", "true"),
+ Timeout = config:get("couchdb", "os_process_timeout", "5000"),
+ {[
+ {<<"reduce_limit">>, list_to_atom(Limit)},
+ {<<"timeout">>, list_to_integer(Timeout)}
+ ]}.