| % Licensed under the Apache License, Version 2.0 (the "License"); |
| % you may not use this file except in compliance with the License. |
| % |
| % You may obtain a copy of the License at |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, |
| % software distributed under the License is distributed on an |
| % "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, |
| % either express or implied. |
| % |
| % See the License for the specific language governing permissions |
| % and limitations under the License. |
| % |
| % This file drew much inspiration from erlview, which was written by and |
| % copyright Michael McDaniel [http://autosys.us], and is also under APL 2.0 |
| % |
| % |
| % This module provides the smallest possible native view-server. |
| % With this module in-place, you can add the following to your couch INI files: |
| % [native_query_servers] |
| % erlang={couch_native_process, start_link, []} |
| % |
| % Which will then allow following example map function to be used: |
| % |
| % fun({Doc}) -> |
| % % Below, we emit a single record - the _id as key, null as value |
| % DocId = couch_util:get_value(<<"_id">>, Doc, null), |
| % Emit(DocId, null) |
| % end. |
| % |
| % which should be roughly the same as the javascript: |
| % emit(doc._id, null); |
| % |
| % This module exposes enough functions such that a native erlang server can |
| % act as a fully-fleged view server, but no 'helper' functions specifically |
| % for simplifying your erlang view code. It is expected other third-party |
| % extensions will evolve which offer useful layers on top of this view server |
| % to help simplify your view code. |
| -module(couch_native_process). |
| -behaviour(gen_server). |
| -vsn(1). |
| |
| -export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3, |
| handle_info/2]). |
| -export([set_timeout/2, prompt/2]). |
| |
| -define(STATE, native_proc_state). |
| -record(evstate, { |
| ddocs, |
| funs = [], |
| query_config = [], |
| list_pid = nil, |
| timeout = 5000, |
| idle = 5000 |
| }). |
| |
| -include_lib("couch/include/couch_db.hrl"). |
| |
| start_link() -> |
| gen_server:start_link(?MODULE, [], []). |
| |
| % this is a bit messy, see also couch_query_servers handle_info |
| % stop(_Pid) -> |
| % ok. |
| |
| set_timeout(Pid, TimeOut) -> |
| gen_server:call(Pid, {set_timeout, TimeOut}). |
| |
| prompt(Pid, Data) when is_list(Data) -> |
| gen_server:call(Pid, {prompt, Data}). |
| |
| % gen_server callbacks |
| init([]) -> |
| V = config:get("query_server_config", "os_process_idle_limit", "300"), |
| Idle = list_to_integer(V) * 1000, |
| {ok, #evstate{ddocs=dict:new(), idle=Idle}, Idle}. |
| |
| handle_call({set_timeout, TimeOut}, _From, State) -> |
| {reply, ok, State#evstate{timeout=TimeOut}, State#evstate.idle}; |
| |
| handle_call({prompt, Data}, _From, State) -> |
| couch_log:debug("Prompt native qs: ~s",[?JSON_ENCODE(Data)]), |
| {NewState, Resp} = try run(State, to_binary(Data)) of |
| {S, R} -> {S, R} |
| catch |
| throw:{error, Why} -> |
| {State, [<<"error">>, Why, Why]} |
| end, |
| |
| Idle = State#evstate.idle, |
| case Resp of |
| {error, Reason} -> |
| Msg = io_lib:format("couch native server error: ~p", [Reason]), |
| Error = [<<"error">>, <<"native_query_server">>, list_to_binary(Msg)], |
| {reply, Error, NewState, Idle}; |
| [<<"error">> | Rest] -> |
| % Msg = io_lib:format("couch native server error: ~p", [Rest]), |
| % TODO: markh? (jan) |
| {reply, [<<"error">> | Rest], NewState, Idle}; |
| [<<"fatal">> | Rest] -> |
| % Msg = io_lib:format("couch native server error: ~p", [Rest]), |
| % TODO: markh? (jan) |
| {stop, fatal, [<<"error">> | Rest], NewState}; |
| Resp -> |
| {reply, Resp, NewState, Idle} |
| end. |
| |
| handle_cast(garbage_collect, State) -> |
| erlang:garbage_collect(), |
| {noreply, State, State#evstate.idle}; |
| handle_cast(stop, State) -> |
| {stop, normal, State}; |
| handle_cast(_Msg, State) -> |
| {noreply, State, State#evstate.idle}. |
| |
| handle_info(timeout, State) -> |
| gen_server:cast(couch_proc_manager, {os_proc_idle, self()}), |
| erlang:garbage_collect(), |
| {noreply, State, State#evstate.idle}; |
| handle_info({'EXIT',_,normal}, State) -> |
| {noreply, State, State#evstate.idle}; |
| handle_info({'EXIT',_,Reason}, State) -> |
| {stop, Reason, State}. |
| terminate(_Reason, _State) -> ok. |
| code_change(_OldVersion, State, _Extra) -> {ok, State}. |
| |
| run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) -> |
| Pid ! {self(), list_row, Row}, |
| receive |
| {Pid, chunks, Data} -> |
| {State, [<<"chunks">>, Data]}; |
| {Pid, list_end, Data} -> |
| receive |
| {'EXIT', Pid, normal} -> ok |
| after State#evstate.timeout -> |
| throw({timeout, list_cleanup}) |
| end, |
| process_flag(trap_exit, erlang:get(do_trap)), |
| {State#evstate{list_pid=nil}, [<<"end">>, Data]} |
| after State#evstate.timeout -> |
| throw({timeout, list_row}) |
| end; |
| run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) -> |
| Pid ! {self(), list_end}, |
| Resp = |
| receive |
| {Pid, list_end, Data} -> |
| receive |
| {'EXIT', Pid, normal} -> ok |
| after State#evstate.timeout -> |
| throw({timeout, list_cleanup}) |
| end, |
| [<<"end">>, Data] |
| after State#evstate.timeout -> |
| throw({timeout, list_end}) |
| end, |
| process_flag(trap_exit, erlang:get(do_trap)), |
| {State#evstate{list_pid=nil}, Resp}; |
| run(#evstate{list_pid=Pid}=State, _Command) when is_pid(Pid) -> |
| {State, [<<"error">>, list_error, list_error]}; |
| run(#evstate{ddocs=DDocs}, [<<"reset">>]) -> |
| {#evstate{ddocs=DDocs}, true}; |
| run(#evstate{ddocs=DDocs, idle=Idle}, [<<"reset">>, QueryConfig]) -> |
| NewState = #evstate{ |
| ddocs = DDocs, |
| query_config = QueryConfig, |
| idle = Idle |
| }, |
| {NewState, true}; |
| run(#evstate{funs=Funs}=State, [<<"add_fun">> , BinFunc]) -> |
| FunInfo = makefun(State, BinFunc), |
| {State#evstate{funs=Funs ++ [FunInfo]}, true}; |
| run(State, [<<"map_doc">> , Doc]) -> |
| Resp = lists:map(fun({Sig, Fun}) -> |
| erlang:put(Sig, []), |
| Fun(Doc), |
| lists:reverse(erlang:get(Sig)) |
| end, State#evstate.funs), |
| {State, Resp}; |
| run(State, [<<"reduce">>, Funs, KVs]) -> |
| {Keys, Vals} = |
| lists:foldl(fun([K, V], {KAcc, VAcc}) -> |
| {[K | KAcc], [V | VAcc]} |
| end, {[], []}, KVs), |
| Keys2 = lists:reverse(Keys), |
| Vals2 = lists:reverse(Vals), |
| {State, catch reduce(State, Funs, Keys2, Vals2, false)}; |
| run(State, [<<"rereduce">>, Funs, Vals]) -> |
| {State, catch reduce(State, Funs, null, Vals, true)}; |
| run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) -> |
| DDocs2 = store_ddoc(DDocs, DDocId, DDoc), |
| {State#evstate{ddocs=DDocs2}, true}; |
| run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, DDocId | Rest]) -> |
| DDoc = load_ddoc(DDocs, DDocId), |
| ddoc(State, DDoc, Rest); |
| run(_, Unknown) -> |
| couch_log:error("Native Process: Unknown command: ~p~n", [Unknown]), |
| throw({error, unknown_command}). |
| |
| ddoc(State, {DDoc}, [FunPath, Args]) -> |
| % load fun from the FunPath |
| BFun = lists:foldl(fun |
| (Key, {Props}) when is_list(Props) -> |
| couch_util:get_value(Key, Props, nil); |
| (_Key, Fun) when is_binary(Fun) -> |
| Fun; |
| (_Key, nil) -> |
| throw({error, not_found}); |
| (_Key, _Fun) -> |
| throw({error, malformed_ddoc}) |
| end, {DDoc}, FunPath), |
| ddoc(State, makefun(State, BFun, {DDoc}), FunPath, Args). |
| |
| ddoc(State, {_, Fun}, [<<"validate_doc_update">>], Args) -> |
| {State, (catch apply(Fun, Args))}; |
| ddoc(State, {_, Fun}, [<<"rewrites">>], Args) -> |
| {State, (catch apply(Fun, Args))}; |
| ddoc(State, {_, Fun}, [<<"filters">>|_], [Docs, Req]) -> |
| FilterFunWrapper = fun(Doc) -> |
| case catch Fun(Doc, Req) of |
| true -> true; |
| false -> false; |
| {'EXIT', Error} -> couch_log:error("~p", [Error]) |
| end |
| end, |
| Resp = lists:map(FilterFunWrapper, Docs), |
| {State, [true, Resp]}; |
| ddoc(State, {_, Fun}, [<<"views">>|_], [Docs]) -> |
| MapFunWrapper = fun(Doc) -> |
| case catch Fun(Doc) of |
| undefined -> true; |
| ok -> false; |
| false -> false; |
| [_|_] -> true; |
| {'EXIT', Error} -> couch_log:error("~p", [Error]) |
| end |
| end, |
| Resp = lists:map(MapFunWrapper, Docs), |
| {State, [true, Resp]}; |
| ddoc(State, {_, Fun}, [<<"shows">>|_], Args) -> |
| Resp = case (catch apply(Fun, Args)) of |
| FunResp when is_list(FunResp) -> |
| FunResp; |
| {FunResp} -> |
| [<<"resp">>, {FunResp}]; |
| FunResp -> |
| FunResp |
| end, |
| {State, Resp}; |
| ddoc(State, {_, Fun}, [<<"updates">>|_], Args) -> |
| Resp = case (catch apply(Fun, Args)) of |
| [JsonDoc, JsonResp] -> |
| [<<"up">>, JsonDoc, JsonResp] |
| end, |
| {State, Resp}; |
| ddoc(State, {Sig, Fun}, [<<"lists">>|_], Args) -> |
| Self = self(), |
| SpawnFun = fun() -> |
| LastChunk = (catch apply(Fun, Args)), |
| case start_list_resp(Self, Sig) of |
| started -> |
| receive |
| {Self, list_row, _Row} -> ignore; |
| {Self, list_end} -> ignore |
| after State#evstate.timeout -> |
| throw({timeout, list_cleanup_pid}) |
| end; |
| _ -> |
| ok |
| end, |
| LastChunks = |
| case erlang:get(Sig) of |
| undefined -> [LastChunk]; |
| OtherChunks -> [LastChunk | OtherChunks] |
| end, |
| Self ! {self(), list_end, lists:reverse(LastChunks)} |
| end, |
| erlang:put(do_trap, process_flag(trap_exit, true)), |
| Pid = spawn_link(SpawnFun), |
| Resp = |
| receive |
| {Pid, start, Chunks, JsonResp} -> |
| [<<"start">>, Chunks, JsonResp] |
| after State#evstate.timeout -> |
| throw({timeout, list_start}) |
| end, |
| {State#evstate{list_pid=Pid}, Resp}. |
| |
| store_ddoc(DDocs, DDocId, DDoc) -> |
| dict:store(DDocId, DDoc, DDocs). |
| load_ddoc(DDocs, DDocId) -> |
| try dict:fetch(DDocId, DDocs) of |
| {DDoc} -> {DDoc} |
| catch |
| _:_Else -> throw({error, ?l2b(io_lib:format("Native Query Server missing DDoc with Id: ~s",[DDocId]))}) |
| end. |
| |
| bindings(State, Sig) -> |
| bindings(State, Sig, nil). |
| bindings(State, Sig, DDoc) -> |
| Self = self(), |
| |
| Log = fun(Msg) -> |
| couch_log:info(Msg, []) |
| end, |
| |
| Emit = fun(Id, Value) -> |
| Curr = erlang:get(Sig), |
| erlang:put(Sig, [[Id, Value] | Curr]) |
| end, |
| |
| Start = fun(Headers) -> |
| erlang:put(list_headers, Headers) |
| end, |
| |
| Send = fun(Chunk) -> |
| Curr = |
| case erlang:get(Sig) of |
| undefined -> []; |
| Else -> Else |
| end, |
| erlang:put(Sig, [Chunk | Curr]) |
| end, |
| |
| GetRow = fun() -> |
| case start_list_resp(Self, Sig) of |
| started -> |
| ok; |
| _ -> |
| Chunks = |
| case erlang:get(Sig) of |
| undefined -> []; |
| CurrChunks -> CurrChunks |
| end, |
| Self ! {self(), chunks, lists:reverse(Chunks)} |
| end, |
| erlang:put(Sig, []), |
| receive |
| {Self, list_row, Row} -> Row; |
| {Self, list_end} -> nil |
| after State#evstate.timeout -> |
| throw({timeout, list_pid_getrow}) |
| end |
| end, |
| |
| FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end, |
| |
| Bindings = [ |
| {'Log', Log}, |
| {'Emit', Emit}, |
| {'Start', Start}, |
| {'Send', Send}, |
| {'GetRow', GetRow}, |
| {'FoldRows', FoldRows} |
| ], |
| case DDoc of |
| {_Props} -> |
| Bindings ++ [{'DDoc', DDoc}]; |
| _Else -> Bindings |
| end. |
| |
| % thanks to erlview, via: |
| % http://erlang.org/pipermail/erlang-questions/2003-November/010544.html |
| makefun(State, Source) -> |
| Sig = couch_hash:md5_hash(Source), |
| BindFuns = bindings(State, Sig), |
| {Sig, makefun(State, Source, BindFuns)}. |
| makefun(State, Source, {DDoc}) -> |
| Sig = couch_hash:md5_hash(lists:flatten([Source, term_to_binary(DDoc)])), |
| BindFuns = bindings(State, Sig, {DDoc}), |
| {Sig, makefun(State, Source, BindFuns)}; |
| makefun(_State, Source, BindFuns) when is_list(BindFuns) -> |
| FunStr = binary_to_list(Source), |
| {ok, Tokens, _} = erl_scan:string(FunStr), |
| Form = case (catch erl_parse:parse_exprs(Tokens)) of |
| {ok, [ParsedForm]} -> |
| ParsedForm; |
| {error, {LineNum, _Mod, [Mesg, Params]}}=Error -> |
| couch_log:error("Syntax error on line: ~p~n~s~p~n", |
| [LineNum, Mesg, Params]), |
| throw(Error) |
| end, |
| Bindings = lists:foldl(fun({Name, Fun}, Acc) -> |
| erl_eval:add_binding(Name, Fun, Acc) |
| end, erl_eval:new_bindings(), BindFuns), |
| {value, Fun, _} = erl_eval:expr(Form, Bindings), |
| Fun. |
| |
| reduce(State, BinFuns, Keys, Vals, ReReduce) -> |
| Funs = case is_list(BinFuns) of |
| true -> |
| lists:map(fun(BF) -> makefun(State, BF) end, BinFuns); |
| _ -> |
| [makefun(State, BinFuns)] |
| end, |
| Reds = lists:map(fun({_Sig, Fun}) -> |
| Fun(Keys, Vals, ReReduce) |
| end, Funs), |
| [true, Reds]. |
| |
| foldrows(GetRow, ProcRow, Acc) -> |
| case GetRow() of |
| nil -> |
| {ok, Acc}; |
| Row -> |
| case (catch ProcRow(Row, Acc)) of |
| {ok, Acc2} -> |
| foldrows(GetRow, ProcRow, Acc2); |
| {stop, Acc2} -> |
| {ok, Acc2} |
| end |
| end. |
| |
| start_list_resp(Self, Sig) -> |
| case erlang:get(list_started) of |
| undefined -> |
| Headers = |
| case erlang:get(list_headers) of |
| undefined -> {[{<<"headers">>, {[]}}]}; |
| CurrHdrs -> CurrHdrs |
| end, |
| Chunks = |
| case erlang:get(Sig) of |
| undefined -> []; |
| CurrChunks -> CurrChunks |
| end, |
| Self ! {self(), start, lists:reverse(Chunks), Headers}, |
| erlang:put(list_started, true), |
| erlang:put(Sig, []), |
| started; |
| _ -> |
| ok |
| end. |
| |
| to_binary({Data}) -> |
| Pred = fun({Key, Value}) -> |
| {to_binary(Key), to_binary(Value)} |
| end, |
| {lists:map(Pred, Data)}; |
| to_binary(Data) when is_list(Data) -> |
| [to_binary(D) || D <- Data]; |
| to_binary(null) -> |
| null; |
| to_binary(true) -> |
| true; |
| to_binary(false) -> |
| false; |
| to_binary(Data) when is_atom(Data) -> |
| list_to_binary(atom_to_list(Data)); |
| to_binary(Data) -> |
| Data. |