| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_query_servers). |
| -behaviour(gen_server). |
| |
| -export([start_link/0]). |
| |
| -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]). |
| -export([start_doc_map/3, map_docs/2, stop_doc_map/1]). |
| -export([reduce/3, rereduce/3,validate_doc_update/5]). |
| -export([filter_docs/5]). |
| |
| -export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]). |
| |
| % For 210-os-proc-pool.t |
| -export([get_os_process/1, ret_os_process/1]). |
| |
| -include("couch_db.hrl"). |
| |
| -record(proc, { |
| pid, |
| lang, |
| ddoc_keys = [], |
| prompt_fun, |
| set_timeout_fun, |
| stop_fun |
| }). |
| |
| -record(qserver, { |
| langs, % Keyed by language name, value is {Mod,Func,Arg} |
| pid_procs, % Keyed by PID, valus is a #proc record. |
| lang_procs, % Keyed by language name, value is a #proc record |
| lang_limits, % Keyed by language name, value is {Lang, Limit, Current} |
| waitlist = [], |
| config |
| }). |
| |
| start_link() -> |
| gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []). |
| |
| start_doc_map(Lang, Functions, Lib) -> |
| Proc = get_os_process(Lang), |
| case Lib of |
| {[]} -> ok; |
| Lib -> |
| true = proc_prompt(Proc, [<<"add_lib">>, Lib]) |
| end, |
| lists:foreach(fun(FunctionSource) -> |
| true = proc_prompt(Proc, [<<"add_fun">>, FunctionSource]) |
| end, Functions), |
| {ok, Proc}. |
| |
| map_docs(Proc, Docs) -> |
| % send the documents |
| Results = lists:map( |
| fun(Doc) -> |
| Json = couch_doc:to_json_obj(Doc, []), |
| |
| FunsResults = proc_prompt(Proc, [<<"map_doc">>, Json]), |
| % the results are a json array of function map yields like this: |
| % [FunResults1, FunResults2 ...] |
| % where funresults is are json arrays of key value pairs: |
| % [[Key1, Value1], [Key2, Value2]] |
| % Convert the key, value pairs to tuples like |
| % [{Key1, Value1}, {Key2, Value2}] |
| lists:map( |
| fun(FunRs) -> |
| [list_to_tuple(FunResult) || FunResult <- FunRs] |
| end, |
| FunsResults) |
| end, |
| Docs), |
| {ok, Results}. |
| |
| |
| stop_doc_map(nil) -> |
| ok; |
| stop_doc_map(Proc) -> |
| ok = ret_os_process(Proc). |
| |
| group_reductions_results([]) -> |
| []; |
| group_reductions_results(List) -> |
| {Heads, Tails} = lists:foldl( |
| fun([H|T], {HAcc,TAcc}) -> |
| {[H|HAcc], [T|TAcc]} |
| end, {[], []}, List), |
| case Tails of |
| [[]|_] -> % no tails left |
| [Heads]; |
| _ -> |
| [Heads | group_reductions_results(Tails)] |
| end. |
| |
| rereduce(_Lang, [], _ReducedValues) -> |
| {ok, []}; |
| rereduce(Lang, RedSrcs, ReducedValues) -> |
| Grouped = group_reductions_results(ReducedValues), |
| Results = lists:zipwith( |
| fun |
| (<<"_", _/binary>> = FunSrc, Values) -> |
| {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [[[], V] || V <- Values], []), |
| Result; |
| (FunSrc, Values) -> |
| os_rereduce(Lang, [FunSrc], Values) |
| end, RedSrcs, Grouped), |
| {ok, Results}. |
| |
| reduce(_Lang, [], _KVs) -> |
| {ok, []}; |
| reduce(Lang, RedSrcs, KVs) -> |
| {OsRedSrcs, BuiltinReds} = lists:partition(fun |
| (<<"_", _/binary>>) -> false; |
| (_OsFun) -> true |
| end, RedSrcs), |
| {ok, OsResults} = os_reduce(Lang, OsRedSrcs, KVs), |
| {ok, BuiltinResults} = builtin_reduce(reduce, BuiltinReds, KVs, []), |
| recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, []). |
| |
| recombine_reduce_results([], [], [], Acc) -> |
| {ok, lists:reverse(Acc)}; |
| recombine_reduce_results([<<"_", _/binary>>|RedSrcs], OsResults, [BRes|BuiltinResults], Acc) -> |
| recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [BRes|Acc]); |
| recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc) -> |
| recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [OsR|Acc]). |
| |
| os_reduce(_Lang, [], _KVs) -> |
| {ok, []}; |
| os_reduce(Lang, OsRedSrcs, KVs) -> |
| Proc = get_os_process(Lang), |
| OsResults = try proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]) of |
| [true, Reductions] -> Reductions |
| after |
| ok = ret_os_process(Proc) |
| end, |
| {ok, OsResults}. |
| |
| os_rereduce(Lang, OsRedSrcs, KVs) -> |
| Proc = get_os_process(Lang), |
| try proc_prompt(Proc, [<<"rereduce">>, OsRedSrcs, KVs]) of |
| [true, [Reduction]] -> Reduction |
| after |
| ok = ret_os_process(Proc) |
| end. |
| |
| |
| builtin_reduce(_Re, [], _KVs, Acc) -> |
| {ok, lists:reverse(Acc)}; |
| builtin_reduce(Re, [<<"_sum",_/binary>>|BuiltinReds], KVs, Acc) -> |
| Sum = builtin_sum_rows(KVs), |
| builtin_reduce(Re, BuiltinReds, KVs, [Sum|Acc]); |
| builtin_reduce(reduce, [<<"_count",_/binary>>|BuiltinReds], KVs, Acc) -> |
| Count = length(KVs), |
| builtin_reduce(reduce, BuiltinReds, KVs, [Count|Acc]); |
| builtin_reduce(rereduce, [<<"_count",_/binary>>|BuiltinReds], KVs, Acc) -> |
| Count = builtin_sum_rows(KVs), |
| builtin_reduce(rereduce, BuiltinReds, KVs, [Count|Acc]); |
| builtin_reduce(Re, [<<"_stats",_/binary>>|BuiltinReds], KVs, Acc) -> |
| Stats = builtin_stats(Re, KVs), |
| builtin_reduce(Re, BuiltinReds, KVs, [Stats|Acc]). |
| |
| builtin_sum_rows(KVs) -> |
| lists:foldl(fun |
| ([_Key, Value], Acc) when is_number(Value), is_number(Acc) -> |
| Acc + Value; |
| ([_Key, Value], Acc) when is_list(Value), is_list(Acc) -> |
| sum_terms(Acc, Value); |
| ([_Key, Value], Acc) when is_number(Value), is_list(Acc) -> |
| sum_terms(Acc, [Value]); |
| ([_Key, Value], Acc) when is_list(Value), is_number(Acc) -> |
| sum_terms([Acc], Value); |
| (_Else, _Acc) -> |
| throw({invalid_value, <<"builtin _sum function requires map values to be numbers or lists of numbers">>}) |
| end, 0, KVs). |
| |
| sum_terms([], []) -> |
| []; |
| sum_terms([_|_]=Xs, []) -> |
| Xs; |
| sum_terms([], [_|_]=Ys) -> |
| Ys; |
| sum_terms([X|Xs], [Y|Ys]) when is_number(X), is_number(Y) -> |
| [X+Y | sum_terms(Xs,Ys)]; |
| sum_terms(_, _) -> |
| throw({invalid_value, <<"builtin _sum function requires map values to be numbers or lists of numbers">>}). |
| |
| builtin_stats(reduce, [[_,First]|Rest]) when is_number(First) -> |
| Stats = lists:foldl(fun([_K,V], {S,C,Mi,Ma,Sq}) when is_number(V) -> |
| {S+V, C+1, lists:min([Mi, V]), lists:max([Ma, V]), Sq+(V*V)}; |
| (_, _) -> |
| throw({invalid_value, |
| <<"builtin _stats function requires map values to be numbers">>}) |
| end, {First,1,First,First,First*First}, Rest), |
| {Sum, Cnt, Min, Max, Sqr} = Stats, |
| {[{sum,Sum}, {count,Cnt}, {min,Min}, {max,Max}, {sumsqr,Sqr}]}; |
| |
| builtin_stats(rereduce, [[_,First]|Rest]) -> |
| {[{sum,Sum0}, {count,Cnt0}, {min,Min0}, {max,Max0}, {sumsqr,Sqr0}]} = First, |
| Stats = lists:foldl(fun([_K,Red], {S,C,Mi,Ma,Sq}) -> |
| {[{sum,Sum}, {count,Cnt}, {min,Min}, {max,Max}, {sumsqr,Sqr}]} = Red, |
| {Sum+S, Cnt+C, lists:min([Min, Mi]), lists:max([Max, Ma]), Sqr+Sq} |
| end, {Sum0,Cnt0,Min0,Max0,Sqr0}, Rest), |
| {Sum, Cnt, Min, Max, Sqr} = Stats, |
| {[{sum,Sum}, {count,Cnt}, {min,Min}, {max,Max}, {sumsqr,Sqr}]}. |
| |
| % use the function stored in ddoc.validate_doc_update to test an update. |
| validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) -> |
| JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]), |
| JsonDiskDoc = json_doc(DiskDoc), |
| case ddoc_prompt(DDoc, [<<"validate_doc_update">>], [JsonEditDoc, JsonDiskDoc, Ctx, SecObj]) of |
| 1 -> |
| ok; |
| {[{<<"forbidden">>, Message}]} -> |
| throw({forbidden, Message}); |
| {[{<<"unauthorized">>, Message}]} -> |
| throw({unauthorized, Message}) |
| end. |
| |
| json_doc(nil) -> null; |
| json_doc(Doc) -> |
| couch_doc:to_json_obj(Doc, [revs]). |
| |
| filter_docs(Req, Db, DDoc, FName, Docs) -> |
| JsonReq = case Req of |
| {json_req, JsonObj} -> |
| JsonObj; |
| #httpd{} = HttpReq -> |
| couch_httpd_external:json_req_obj(HttpReq, Db) |
| end, |
| JsonDocs = [couch_doc:to_json_obj(Doc, [revs]) || Doc <- Docs], |
| [true, Passes] = ddoc_prompt(DDoc, [<<"filters">>, FName], [JsonDocs, JsonReq]), |
| {ok, Passes}. |
| |
| ddoc_proc_prompt({Proc, DDocId}, FunPath, Args) -> |
| proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]). |
| |
| ddoc_prompt(DDoc, FunPath, Args) -> |
| with_ddoc_proc(DDoc, fun({Proc, DDocId}) -> |
| proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]) |
| end). |
| |
| with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) -> |
| Rev = couch_doc:rev_to_str({Start, DiskRev}), |
| DDocKey = {DDocId, Rev}, |
| Proc = get_ddoc_process(DDoc, DDocKey), |
| try Fun({Proc, DDocId}) |
| after |
| ok = ret_os_process(Proc) |
| end. |
| |
| init([]) -> |
| % read config and register for configuration changes |
| |
| % just stop if one of the config settings change. couch_server_sup |
| % will restart us and then we will pick up the new settings. |
| |
| ok = couch_config:register( |
| fun("query_servers" ++ _, _) -> |
| supervisor:terminate_child(couch_secondary_services, query_servers), |
| supervisor:restart_child(couch_secondary_services, query_servers) |
| end), |
| ok = couch_config:register( |
| fun("native_query_servers" ++ _, _) -> |
| supervisor:terminate_child(couch_secondary_services, query_servers), |
| [supervisor:restart_child(couch_secondary_services, query_servers)] |
| end), |
| ok = couch_config:register( |
| fun("query_server_config" ++ _, _) -> |
| supervisor:terminate_child(couch_secondary_services, query_servers), |
| supervisor:restart_child(couch_secondary_services, query_servers) |
| end), |
| |
| Langs = ets:new(couch_query_server_langs, [set, private]), |
| LangLimits = ets:new(couch_query_server_lang_limits, [set, private]), |
| PidProcs = ets:new(couch_query_server_pid_langs, [set, private]), |
| LangProcs = ets:new(couch_query_server_procs, [set, private]), |
| |
| ProcTimeout = list_to_integer(couch_config:get( |
| "couchdb", "os_process_timeout", "5000")), |
| ReduceLimit = list_to_atom( |
| couch_config:get("query_server_config","reduce_limit","true")), |
| OsProcLimit = list_to_integer( |
| couch_config:get("query_server_config","os_process_limit","10")), |
| |
| % 'query_servers' specifies an OS command-line to execute. |
| lists:foreach(fun({Lang, Command}) -> |
| true = ets:insert(LangLimits, {?l2b(Lang), OsProcLimit, 0}), |
| true = ets:insert(Langs, {?l2b(Lang), |
| couch_os_process, start_link, [Command]}) |
| end, couch_config:get("query_servers")), |
| % 'native_query_servers' specifies a {Module, Func, Arg} tuple. |
| lists:foreach(fun({Lang, SpecStr}) -> |
| {ok, {Mod, Fun, SpecArg}} = couch_util:parse_term(SpecStr), |
| true = ets:insert(LangLimits, {?l2b(Lang), 0, 0}), % 0 means no limit |
| true = ets:insert(Langs, {?l2b(Lang), |
| Mod, Fun, SpecArg}) |
| end, couch_config:get("native_query_servers")), |
| |
| |
| process_flag(trap_exit, true), |
| {ok, #qserver{ |
| langs = Langs, % Keyed by language name, value is {Mod,Func,Arg} |
| pid_procs = PidProcs, % Keyed by PID, valus is a #proc record. |
| lang_procs = LangProcs, % Keyed by language name, value is a #proc record |
| lang_limits = LangLimits, % Keyed by language name, value is {Lang, Limit, Current} |
| config = {[{<<"reduce_limit">>, ReduceLimit},{<<"timeout">>, ProcTimeout}]} |
| }}. |
| |
| terminate(_Reason, #qserver{pid_procs=PidProcs}) -> |
| [couch_util:shutdown_sync(P) || {P,_} <- ets:tab2list(PidProcs)], |
| ok. |
| |
| handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, Server) -> |
| Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>), |
| case lang_proc(Lang, Server, fun(Procs) -> |
| % find a proc in the set that has the DDoc |
| proc_with_ddoc(DDoc, DDocKey, Procs) |
| end) of |
| {ok, Proc} -> |
| {reply, {ok, Proc, Server#qserver.config}, Server}; |
| wait -> |
| {noreply, add_to_waitlist({DDoc, DDocKey}, From, Server)}; |
| Error -> |
| {reply, Error, Server} |
| end; |
| handle_call({get_proc, Lang}, From, Server) -> |
| case lang_proc(Lang, Server, fun([P|_Procs]) -> |
| {ok, P} |
| end) of |
| {ok, Proc} -> |
| {reply, {ok, Proc, Server#qserver.config}, Server}; |
| wait -> |
| {noreply, add_to_waitlist({Lang}, From, Server)}; |
| Error -> |
| {reply, Error, Server} |
| end; |
| handle_call({unlink_proc, Pid}, _From, Server) -> |
| unlink(Pid), |
| {reply, ok, Server}; |
| handle_call({ret_proc, Proc}, _From, #qserver{ |
| pid_procs=PidProcs, |
| lang_procs=LangProcs}=Server) -> |
| % Along with max process limit, here we should check |
| % if we're over the limit and discard when we are. |
| case is_process_alive(Proc#proc.pid) of |
| true -> |
| add_value(PidProcs, Proc#proc.pid, Proc), |
| add_to_list(LangProcs, Proc#proc.lang, Proc), |
| link(Proc#proc.pid); |
| false -> |
| ok |
| end, |
| {reply, true, service_waitlist(Server)}. |
| |
| handle_cast(_Whatever, Server) -> |
| {noreply, Server}. |
| |
| handle_info({'EXIT', _, _}, Server) -> |
| {noreply, Server}; |
| handle_info({'DOWN', _, process, Pid, Status}, #qserver{ |
| pid_procs=PidProcs, |
| lang_procs=LangProcs, |
| lang_limits=LangLimits}=Server) -> |
| case ets:lookup(PidProcs, Pid) of |
| [{Pid, Proc}] -> |
| case Status of |
| normal -> ok; |
| _ -> ?LOG_DEBUG("Linked process died abnormally: ~p (reason: ~p)", [Pid, Status]) |
| end, |
| rem_value(PidProcs, Pid), |
| catch rem_from_list(LangProcs, Proc#proc.lang, Proc), |
| [{Lang, Lim, Current}] = ets:lookup(LangLimits, Proc#proc.lang), |
| true = ets:insert(LangLimits, {Lang, Lim, Current-1}), |
| {noreply, service_waitlist(Server)}; |
| [] -> |
| case Status of |
| normal -> |
| {noreply, Server}; |
| _ -> |
| {stop, Status, Server} |
| end |
| end. |
| |
| code_change(_OldVsn, State, _Extra) -> |
| {ok, State}. |
| |
| % Private API |
| |
| add_to_waitlist(Info, From, #qserver{waitlist=Waitlist}=Server) -> |
| Server#qserver{waitlist=[{Info, From}|Waitlist]}. |
| |
| service_waitlist(#qserver{waitlist=[]}=Server) -> |
| Server; |
| service_waitlist(#qserver{waitlist=Waitlist}=Server) -> |
| [Oldest|RevWList] = lists:reverse(Waitlist), |
| case service_waiting(Oldest, Server) of |
| ok -> |
| Server#qserver{waitlist=lists:reverse(RevWList)}; |
| wait -> |
| Server#qserver{waitlist=Waitlist} |
| end. |
| |
| % todo get rid of duplication |
| service_waiting({{#doc{body={Props}}=DDoc, DDocKey}, From}, Server) -> |
| Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>), |
| case lang_proc(Lang, Server, fun(Procs) -> |
| % find a proc in the set that has the DDoc |
| proc_with_ddoc(DDoc, DDocKey, Procs) |
| end) of |
| {ok, Proc} -> |
| gen_server:reply(From, {ok, Proc, Server#qserver.config}), |
| ok; |
| wait -> % this should never happen |
| wait; |
| Error -> |
| gen_server:reply(From, Error), |
| ok |
| end; |
| service_waiting({{Lang}, From}, Server) -> |
| case lang_proc(Lang, Server, fun([P|_Procs]) -> |
| {ok, P} |
| end) of |
| {ok, Proc} -> |
| gen_server:reply(From, {ok, Proc, Server#qserver.config}), |
| ok; |
| wait -> % this should never happen |
| wait; |
| Error -> |
| gen_server:reply(From, Error), |
| ok |
| end. |
| |
| lang_proc(Lang, #qserver{ |
| langs=Langs, |
| pid_procs=PidProcs, |
| lang_procs=LangProcs, |
| lang_limits=LangLimits}, PickFun) -> |
| % Note to future self. Add max process limit. |
| case ets:lookup(LangProcs, Lang) of |
| [{Lang, [P|Procs]}] -> |
| {ok, Proc} = PickFun([P|Procs]), |
| rem_from_list(LangProcs, Lang, Proc), |
| {ok, Proc}; |
| _ -> |
| case (catch new_process(Langs, LangLimits, Lang)) of |
| {ok, Proc} -> |
| add_value(PidProcs, Proc#proc.pid, Proc), |
| PickFun([Proc]); |
| ErrorOrWait -> |
| ErrorOrWait |
| end |
| end. |
| |
| new_process(Langs, LangLimits, Lang) -> |
| [{Lang, Lim, Current}] = ets:lookup(LangLimits, Lang), |
| if (Lim == 0) or (Current < Lim) -> % Lim == 0 means no limit |
| % we are below the limit for our language, make a new one |
| case ets:lookup(Langs, Lang) of |
| [{Lang, Mod, Func, Arg}] -> |
| {ok, Pid} = apply(Mod, Func, Arg), |
| erlang:monitor(process, Pid), |
| true = ets:insert(LangLimits, {Lang, Lim, Current+1}), |
| {ok, #proc{lang=Lang, |
| pid=Pid, |
| % Called via proc_prompt, proc_set_timeout, and proc_stop |
| prompt_fun={Mod, prompt}, |
| set_timeout_fun={Mod, set_timeout}, |
| stop_fun={Mod, stop}}}; |
| _ -> |
| {unknown_query_language, Lang} |
| end; |
| true -> |
| wait |
| end. |
| |
| proc_with_ddoc(DDoc, DDocKey, LangProcs) -> |
| DDocProcs = lists:filter(fun(#proc{ddoc_keys=Keys}) -> |
| lists:any(fun(Key) -> |
| Key == DDocKey |
| end, Keys) |
| end, LangProcs), |
| case DDocProcs of |
| [DDocProc|_] -> |
| ?LOG_DEBUG("DDocProc found for DDocKey: ~p",[DDocKey]), |
| {ok, DDocProc}; |
| [] -> |
| [TeachProc|_] = LangProcs, |
| ?LOG_DEBUG("Teach ddoc to new proc ~p with DDocKey: ~p",[TeachProc, DDocKey]), |
| {ok, SmartProc} = teach_ddoc(DDoc, DDocKey, TeachProc), |
| {ok, SmartProc} |
| end. |
| |
| proc_prompt(Proc, Args) -> |
| {Mod, Func} = Proc#proc.prompt_fun, |
| apply(Mod, Func, [Proc#proc.pid, Args]). |
| |
| proc_stop(Proc) -> |
| {Mod, Func} = Proc#proc.stop_fun, |
| apply(Mod, Func, [Proc#proc.pid]). |
| |
| proc_set_timeout(Proc, Timeout) -> |
| {Mod, Func} = Proc#proc.set_timeout_fun, |
| apply(Mod, Func, [Proc#proc.pid, Timeout]). |
| |
| teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) -> |
| % send ddoc over the wire |
| % we only share the rev with the client we know to update code |
| % but it only keeps the latest copy, per each ddoc, around. |
| true = proc_prompt(Proc, [<<"ddoc">>, <<"new">>, DDocId, couch_doc:to_json_obj(DDoc, [])]), |
| % we should remove any other ddocs keys for this docid |
| % because the query server overwrites without the rev |
| Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId], |
| % add ddoc to the proc |
| {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}. |
| |
| get_ddoc_process(#doc{} = DDoc, DDocKey) -> |
| % remove this case statement |
| case gen_server:call(couch_query_servers, {get_proc, DDoc, DDocKey}) of |
| {ok, Proc, {QueryConfig}} -> |
| % process knows the ddoc |
| case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of |
| true -> |
| proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)), |
| link(Proc#proc.pid), |
| gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}), |
| Proc; |
| _ -> |
| catch proc_stop(Proc), |
| get_ddoc_process(DDoc, DDocKey) |
| end; |
| Error -> |
| throw(Error) |
| end. |
| |
| get_os_process(Lang) -> |
| case gen_server:call(couch_query_servers, {get_proc, Lang}) of |
| {ok, Proc, {QueryConfig}} -> |
| case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of |
| true -> |
| proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)), |
| link(Proc#proc.pid), |
| gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}), |
| Proc; |
| _ -> |
| catch proc_stop(Proc), |
| get_os_process(Lang) |
| end; |
| Error -> |
| throw(Error) |
| end. |
| |
| ret_os_process(Proc) -> |
| true = gen_server:call(couch_query_servers, {ret_proc, Proc}), |
| catch unlink(Proc#proc.pid), |
| ok. |
| |
| add_value(Tid, Key, Value) -> |
| true = ets:insert(Tid, {Key, Value}). |
| |
| rem_value(Tid, Key) -> |
| true = ets:delete(Tid, Key). |
| |
| add_to_list(Tid, Key, Value) -> |
| case ets:lookup(Tid, Key) of |
| [{Key, Vals}] -> |
| true = ets:insert(Tid, {Key, [Value|Vals]}); |
| [] -> |
| true = ets:insert(Tid, {Key, [Value]}) |
| end. |
| |
| rem_from_list(Tid, Key, Value) when is_record(Value, proc)-> |
| Pid = Value#proc.pid, |
| case ets:lookup(Tid, Key) of |
| [{Key, Vals}] -> |
| % make a new values list that doesn't include the Value arg |
| NewValues = [Val || #proc{pid=P}=Val <- Vals, P /= Pid], |
| ets:insert(Tid, {Key, NewValues}); |
| [] -> ok |
| end; |
| rem_from_list(Tid, Key, Value) -> |
| case ets:lookup(Tid, Key) of |
| [{Key, Vals}] -> |
| % make a new values list that doesn't include the Value arg |
| NewValues = [Val || Val <- Vals, Val /= Value], |
| ets:insert(Tid, {Key, NewValues}); |
| [] -> ok |
| end. |