| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_query_servers). |
| -behaviour(gen_server). |
| |
| -export([start_link/0]). |
| |
| -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]). |
| -export([start_doc_map/2, map_docs/2, stop_doc_map/1]). |
| -export([reduce/3, rereduce/3,validate_doc_update/5]). |
| -export([render_doc_show/6, render_doc_update/6, start_view_list/2, |
| render_list_head/4, render_list_row/4, render_list_tail/1]). |
| -export([start_filter/2, filter_doc/4, end_filter/1]). |
| % -export([test/0]). |
| |
| -include("couch_db.hrl"). |
| |
| -record(proc, { |
| pid, |
| lang, |
| prompt_fun, |
| set_timeout_fun, |
| stop_fun |
| }). |
| |
| start_link() -> |
| gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []). |
| |
| stop() -> |
| exit(whereis(couch_query_servers), close). |
| |
| start_doc_map(Lang, Functions) -> |
| Proc = get_os_process(Lang), |
| lists:foreach(fun(FunctionSource) -> |
| true = proc_prompt(Proc, [<<"add_fun">>, FunctionSource]) |
| end, Functions), |
| {ok, Proc}. |
| |
| map_docs(Proc, Docs) -> |
| % send the documents |
| Results = lists:map( |
| fun(Doc) -> |
| Json = couch_doc:to_json_obj(Doc, []), |
| |
| FunsResults = proc_prompt(Proc, [<<"map_doc">>, Json]), |
| % the results are a json array of function map yields like this: |
| % [FunResults1, FunResults2 ...] |
| % where funresults is are json arrays of key value pairs: |
| % [[Key1, Value1], [Key2, Value2]] |
| % Convert the key, value pairs to tuples like |
| % [{Key1, Value1}, {Key2, Value2}] |
| lists:map( |
| fun(FunRs) -> |
| [list_to_tuple(FunResult) || FunResult <- FunRs] |
| end, |
| FunsResults) |
| end, |
| Docs), |
| {ok, Results}. |
| |
| |
| stop_doc_map(nil) -> |
| ok; |
| stop_doc_map(Proc) -> |
| ok = ret_os_process(Proc). |
| |
| group_reductions_results([]) -> |
| []; |
| group_reductions_results(List) -> |
| {Heads, Tails} = lists:foldl( |
| fun([H|T], {HAcc,TAcc}) -> |
| {[H|HAcc], [T|TAcc]} |
| end, {[], []}, List), |
| case Tails of |
| [[]|_] -> % no tails left |
| [Heads]; |
| _ -> |
| [Heads | group_reductions_results(Tails)] |
| end. |
| |
| rereduce(_Lang, [], _ReducedValues) -> |
| {ok, []}; |
| rereduce(Lang, RedSrcs, ReducedValues) -> |
| Proc = get_os_process(Lang), |
| Grouped = group_reductions_results(ReducedValues), |
| Results = try lists:zipwith( |
| fun |
| (<<"_", _/binary>> = FunSrc, Values) -> |
| {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [[[], V] || V <- Values], []), |
| Result; |
| (FunSrc, Values) -> |
| [true, [Result]] = |
| proc_prompt(Proc, [<<"rereduce">>, [FunSrc], Values]), |
| Result |
| end, RedSrcs, Grouped) |
| after |
| ok = ret_os_process(Proc) |
| end, |
| {ok, Results}. |
| |
| reduce(_Lang, [], _KVs) -> |
| {ok, []}; |
| reduce(Lang, RedSrcs, KVs) -> |
| {OsRedSrcs, BuiltinReds} = lists:partition(fun |
| (<<"_", _/binary>>) -> false; |
| (_OsFun) -> true |
| end, RedSrcs), |
| {ok, OsResults} = os_reduce(Lang, OsRedSrcs, KVs), |
| {ok, BuiltinResults} = builtin_reduce(reduce, BuiltinReds, KVs, []), |
| recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, []). |
| |
| recombine_reduce_results([], [], [], Acc) -> |
| {ok, lists:reverse(Acc)}; |
| recombine_reduce_results([<<"_", _/binary>>|RedSrcs], OsResults, [BRes|BuiltinResults], Acc) -> |
| recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [BRes|Acc]); |
| recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc) -> |
| recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [OsR|Acc]). |
| |
| os_reduce(_Lang, [], _KVs) -> |
| {ok, []}; |
| os_reduce(Lang, OsRedSrcs, KVs) -> |
| Proc = get_os_process(Lang), |
| OsResults = try proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]) of |
| [true, Reductions] -> Reductions |
| after |
| ok = ret_os_process(Proc) |
| end, |
| {ok, OsResults}. |
| |
| builtin_reduce(_Re, [], _KVs, Acc) -> |
| {ok, lists:reverse(Acc)}; |
| builtin_reduce(Re, [<<"_sum">>|BuiltinReds], KVs, Acc) -> |
| Sum = builtin_sum_rows(KVs), |
| builtin_reduce(Re, BuiltinReds, KVs, [Sum|Acc]); |
| builtin_reduce(reduce, [<<"_count">>|BuiltinReds], KVs, Acc) -> |
| Count = length(KVs), |
| builtin_reduce(reduce, BuiltinReds, KVs, [Count|Acc]); |
| builtin_reduce(rereduce, [<<"_count">>|BuiltinReds], KVs, Acc) -> |
| Count = builtin_sum_rows(KVs), |
| builtin_reduce(rereduce, BuiltinReds, KVs, [Count|Acc]). |
| |
| builtin_sum_rows(KVs) -> |
| lists:foldl(fun |
| ([_Key, Value], Acc) when is_number(Value) -> |
| Acc + Value; |
| (_Else, _Acc) -> |
| throw({invalid_value, <<"builtin _sum function requires map values to be numbers">>}) |
| end, 0, KVs). |
| |
| validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) -> |
| Proc = get_os_process(Lang), |
| JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]), |
| JsonDiskDoc = |
| if DiskDoc == nil -> |
| null; |
| true -> |
| couch_doc:to_json_obj(DiskDoc, [revs]) |
| end, |
| try proc_prompt(Proc, |
| [<<"validate">>, FunSrc, JsonEditDoc, JsonDiskDoc, Ctx]) of |
| 1 -> |
| ok; |
| {[{<<"forbidden">>, Message}]} -> |
| throw({forbidden, Message}); |
| {[{<<"unauthorized">>, Message}]} -> |
| throw({unauthorized, Message}) |
| after |
| ok = ret_os_process(Proc) |
| end. |
| % todo use json_apply_field |
| append_docid(DocId, JsonReqIn) -> |
| [{<<"docId">>, DocId} | JsonReqIn]. |
| |
| render_doc_show(Lang, ShowSrc, DocId, Doc, Req, Db) -> |
| Proc = get_os_process(Lang), |
| {JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db), |
| |
| {JsonReq, JsonDoc} = case {DocId, Doc} of |
| {nil, nil} -> {{JsonReqIn}, null}; |
| {DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null}; |
| _ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, [revs])} |
| end, |
| try proc_prompt(Proc, |
| [<<"show">>, ShowSrc, JsonDoc, JsonReq]) of |
| FormResp -> |
| FormResp |
| after |
| ok = ret_os_process(Proc) |
| end. |
| |
| render_doc_update(Lang, UpdateSrc, DocId, Doc, Req, Db) -> |
| Proc = get_os_process(Lang), |
| {JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db), |
| |
| {JsonReq, JsonDoc} = case {DocId, Doc} of |
| {nil, nil} -> {{JsonReqIn}, null}; |
| {DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null}; |
| _ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, [revs])} |
| end, |
| try proc_prompt(Proc, |
| [<<"update">>, UpdateSrc, JsonDoc, JsonReq]) of |
| FormResp -> |
| FormResp |
| after |
| ok = ret_os_process(Proc) |
| end. |
| |
| start_view_list(Lang, ListSrc) -> |
| Proc = get_os_process(Lang), |
| proc_prompt(Proc, [<<"add_fun">>, ListSrc]), |
| {ok, Proc}. |
| |
| render_list_head(Proc, Req, Db, Head) -> |
| JsonReq = couch_httpd_external:json_req_obj(Req, Db), |
| proc_prompt(Proc, [<<"list">>, Head, JsonReq]). |
| |
| render_list_row(Proc, Db, {{Key, DocId}, Value}, IncludeDoc) -> |
| JsonRow = couch_httpd_view:view_row_obj(Db, {{Key, DocId}, Value}, IncludeDoc), |
| proc_prompt(Proc, [<<"list_row">>, JsonRow]); |
| |
| render_list_row(Proc, _, {Key, Value}, _IncludeDoc) -> |
| JsonRow = {[{key, Key}, {value, Value}]}, |
| proc_prompt(Proc, [<<"list_row">>, JsonRow]). |
| |
| render_list_tail(Proc) -> |
| JsonResp = proc_prompt(Proc, [<<"list_end">>]), |
| ok = ret_os_process(Proc), |
| JsonResp. |
| |
| start_filter(Lang, FilterSrc) -> |
| Proc = get_os_process(Lang), |
| true = proc_prompt(Proc, [<<"add_fun">>, FilterSrc]), |
| {ok, Proc}. |
| |
| filter_doc(Proc, Doc, Req, Db) -> |
| JsonReq = couch_httpd_external:json_req_obj(Req, Db), |
| JsonDoc = couch_doc:to_json_obj(Doc, [revs]), |
| JsonCtx = couch_util:json_user_ctx(Db), |
| [true, [Pass]] = proc_prompt(Proc, |
| [<<"filter">>, [JsonDoc], JsonReq, JsonCtx]), |
| {ok, Pass}. |
| |
| end_filter(Proc) -> |
| ok = ret_os_process(Proc). |
| |
| |
| init([]) -> |
| |
| % read config and register for configuration changes |
| |
| % just stop if one of the config settings change. couch_server_sup |
| % will restart us and then we will pick up the new settings. |
| |
| ok = couch_config:register( |
| fun("query_servers" ++ _, _) -> |
| ?MODULE:stop() |
| end), |
| ok = couch_config:register( |
| fun("native_query_servers" ++ _, _) -> |
| ?MODULE:stop() |
| end), |
| |
| Langs = ets:new(couch_query_server_langs, [set, private]), |
| PidProcs = ets:new(couch_query_server_pid_langs, [set, private]), |
| LangProcs = ets:new(couch_query_server_procs, [set, private]), |
| InUse = ets:new(couch_query_server_used, [set, private]), |
| % 'query_servers' specifies an OS command-line to execute. |
| lists:foreach(fun({Lang, Command}) -> |
| true = ets:insert(Langs, {?l2b(Lang), |
| couch_os_process, start_link, [Command]}) |
| end, couch_config:get("query_servers")), |
| % 'native_query_servers' specifies a {Module, Func, Arg} tuple. |
| lists:foreach(fun({Lang, SpecStr}) -> |
| {ok, {Mod, Fun, SpecArg}} = couch_util:parse_term(SpecStr), |
| true = ets:insert(Langs, {?l2b(Lang), |
| Mod, Fun, SpecArg}) |
| end, couch_config:get("native_query_servers")), |
| process_flag(trap_exit, true), |
| {ok, {Langs, % Keyed by language name, value is {Mod,Func,Arg} |
| PidProcs, % Keyed by PID, valus is a #proc record. |
| LangProcs, % Keyed by language name, value is a #proc record |
| InUse % Keyed by PID, value is #proc record. |
| }}. |
| |
| terminate(_Reason, _Server) -> |
| ok. |
| |
| |
| handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) -> |
| % Note to future self. Add max process limit. |
| case ets:lookup(LangProcs, Lang) of |
| [{Lang, [Proc|_]}] -> |
| add_value(PidProcs, Proc#proc.pid, Proc), |
| rem_from_list(LangProcs, Lang, Proc), |
| add_to_list(InUse, Lang, Proc), |
| {reply, {recycled, Proc, get_query_server_config()}, Server}; |
| _ -> |
| case (catch new_process(Langs, Lang)) of |
| {ok, Proc} -> |
| add_to_list(InUse, Lang, Proc), |
| {reply, {new, Proc}, Server}; |
| Error -> |
| {reply, Error, Server} |
| end |
| end; |
| handle_call({ret_proc, Proc}, _From, {_, _, LangProcs, InUse}=Server) -> |
| % Along with max process limit, here we should check |
| % if we're over the limit and discard when we are. |
| add_to_list(LangProcs, Proc#proc.lang, Proc), |
| rem_from_list(InUse, Proc#proc.lang, Proc), |
| {reply, true, Server}. |
| |
| handle_cast(_Whatever, Server) -> |
| {noreply, Server}. |
| |
| handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs, InUse}=Server) -> |
| case ets:lookup(PidProcs, Pid) of |
| [{Pid, Proc}] -> |
| case Status of |
| normal -> ok; |
| _ -> ?LOG_DEBUG("Linked process died abnormally: ~p (reason: ~p)", [Pid, Status]) |
| end, |
| rem_value(PidProcs, Pid), |
| catch rem_from_list(LangProcs, Proc#proc.lang, Proc), |
| catch rem_from_list(InUse, Proc#proc.lang, Proc), |
| {noreply, Server}; |
| [] -> |
| ?LOG_DEBUG("Unknown linked process died: ~p (reason: ~p)", [Pid, Status]), |
| {stop, Status, Server} |
| end. |
| |
| code_change(_OldVsn, State, _Extra) -> |
| {ok, State}. |
| |
| % Private API |
| |
| get_query_server_config() -> |
| ReduceLimit = list_to_atom( |
| couch_config:get("query_server_config","reduce_limit","true")), |
| {[{<<"reduce_limit">>, ReduceLimit}]}. |
| |
| new_process(Langs, Lang) -> |
| case ets:lookup(Langs, Lang) of |
| [{Lang, Mod, Func, Arg}] -> |
| {ok, Pid} = apply(Mod, Func, Arg), |
| {ok, #proc{lang=Lang, |
| pid=Pid, |
| % Called via proc_prompt, proc_set_timeout, and proc_stop |
| prompt_fun={Mod, prompt}, |
| set_timeout_fun={Mod, set_timeout}, |
| stop_fun={Mod, stop}}}; |
| _ -> |
| {unknown_query_language, Lang} |
| end. |
| |
| proc_prompt(Proc, Args) -> |
| {Mod, Func} = Proc#proc.prompt_fun, |
| apply(Mod, Func, [Proc#proc.pid, Args]). |
| |
| proc_stop(Proc) -> |
| {Mod, Func} = Proc#proc.stop_fun, |
| apply(Mod, Func, [Proc#proc.pid]). |
| |
| proc_set_timeout(Proc, Timeout) -> |
| {Mod, Func} = Proc#proc.set_timeout_fun, |
| apply(Mod, Func, [Proc#proc.pid, Timeout]). |
| |
| get_os_process(Lang) -> |
| case gen_server:call(couch_query_servers, {get_proc, Lang}) of |
| {new, Proc} -> |
| proc_set_timeout(Proc, list_to_integer(couch_config:get( |
| "couchdb", "os_process_timeout", "5000"))), |
| link(Proc#proc.pid), |
| Proc; |
| {recycled, Proc, QueryConfig} -> |
| case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of |
| true -> |
| proc_set_timeout(Proc, list_to_integer(couch_config:get( |
| "couchdb", "os_process_timeout", "5000"))), |
| link(Proc#proc.pid), |
| Proc; |
| _ -> |
| catch proc_stop(Proc), |
| get_os_process(Lang) |
| end; |
| Error -> |
| throw(Error) |
| end. |
| |
| ret_os_process(Proc) -> |
| true = gen_server:call(couch_query_servers, {ret_proc, Proc}), |
| catch unlink(Proc#proc.pid), |
| ok. |
| |
| add_value(Tid, Key, Value) -> |
| true = ets:insert(Tid, {Key, Value}). |
| |
| rem_value(Tid, Key) -> |
| true = ets:delete(Tid, Key). |
| |
| add_to_list(Tid, Key, Value) -> |
| case ets:lookup(Tid, Key) of |
| [{Key, Vals}] -> |
| true = ets:insert(Tid, {Key, [Value|Vals]}); |
| [] -> |
| true = ets:insert(Tid, {Key, [Value]}) |
| end. |
| |
| rem_from_list(Tid, Key, Value) -> |
| case ets:lookup(Tid, Key) of |
| [{Key, Vals}] -> |
| ets:insert(Tid, {Key, [Val || Val <- Vals, Val /= Value]}); |
| [] -> ok |
| end. |