| % Licensed under the Apache License, Version 2.0 (the "License"); |
| % you may not use this file except in compliance with the License. |
| % |
| % You may obtain a copy of the License at |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, |
| % software distributed under the License is distributed on an |
| % "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, |
| % either express or implied. |
| % |
| % See the License for the specific language governing permissions |
| % and limitations under the License. |
| % |
| % This file drew much inspiration from erlview, which was written by and |
| % copyright Michael McDaniel [http://autosys.us], and is also under APL 2.0 |
| % |
| % |
| % This module provides the smallest possible native view-server. |
| % With this module in-place, you can add the following to your couch INI files: |
| % [native_query_servers] |
| % erlang={couch_native_process, start_link, []} |
| % |
| % Which will then allow following example map function to be used: |
| % |
| % fun({Doc}) -> |
| % % Below, we emit a single record - the _id as key, null as value |
| % DocId = proplists:get_value(Doc, <<"_id">>, null), |
| % Emit(DocId, null) |
| % end. |
| % |
| % which should be roughly the same as the javascript: |
| % emit(doc._id, null); |
| % |
| % This module exposes enough functions such that a native erlang server can |
| % act as a fully-fleged view server, but no 'helper' functions specifically |
| % for simplifying your erlang view code. It is expected other third-party |
| % extensions will evolve which offer useful layers on top of this view server |
| % to help simplify your view code. |
| -module(couch_native_process). |
| |
| -export([start_link/0]). |
| -export([set_timeout/2, prompt/2, stop/1]). |
| -export([loop/0]). |
| |
| -define(STATE, native_proc_state). |
| -record(evstate, {funs=[], query_config=[], list_pid=nil, timeout=5000}). |
| |
| -include("couch_db.hrl"). |
| |
| start_link() -> |
| {ok, spawn_link(?MODULE, loop, [])}. |
| |
| stop(Pid) -> |
| Pid ! {stop, self()}, |
| receive |
| {ok, Pid} -> ok |
| after 1000 -> |
| throw({error, timeout}) |
| end. |
| |
| prompt(Pid, Data) -> |
| Pid ! {prompt, self(), Data}, |
| receive |
| {ok, Pid, Resp} -> Resp |
| after 1000 -> |
| throw({error, timeout}) |
| end. |
| |
| set_timeout(Pid, TimeOut) -> |
| Pid ! {set_timeout, self(), TimeOut}, |
| receive |
| {ok, Pid} -> ok |
| after 1000 -> |
| throw({error, timeout}) |
| end. |
| |
| loop() -> |
| receive |
| {prompt, From, Data} -> |
| case (catch prompt(Data)) of |
| {error, Reason} -> |
| From ! {ok, self(), {[{error, Reason}]}}; |
| {NewState, Resp} -> |
| put(?STATE, NewState), |
| From ! {ok, self(), Resp}, |
| loop() |
| end; |
| {set_timeout, From, TimeOut} -> |
| NewState = case get(?STATE) of |
| undefined -> #evstate{timeout=TimeOut}; |
| State -> State#evstate{timeout=TimeOut} |
| end, |
| put(?STATE, NewState), |
| From ! {ok, self()}, |
| loop(); |
| {stop, From} -> |
| From ! {ok, self()} |
| end. |
| |
| prompt(Data) when is_list(Data) -> |
| case get(?STATE) of |
| undefined -> |
| State = #evstate{}, |
| put(?STATE, State); |
| State -> |
| State |
| end, |
| case is_pid(State#evstate.list_pid) of |
| true -> |
| case hd(Data) of |
| <<"list_row">> -> ok; |
| <<"list_end">> -> ok; |
| _ -> throw({error, query_server_error}) |
| end; |
| _ -> |
| ok % Not listing |
| end, |
| run(State, to_binary(Data)). |
| |
| run(_, [<<"reset">>]) -> |
| {#evstate{}, true}; |
| run(_, [<<"reset">>, QueryConfig]) -> |
| {#evstate{query_config=QueryConfig}, true}; |
| run(#evstate{funs=Funs}=State, [<<"add_fun">> , BinFunc]) -> |
| FunInfo = makefun(State, BinFunc), |
| {State#evstate{funs=Funs ++ [FunInfo]}, true}; |
| run(State, [<<"map_doc">> , Doc]) -> |
| Resp = lists:map(fun({Sig, Fun}) -> |
| erlang:put(Sig, []), |
| Fun(Doc), |
| lists:reverse(erlang:get(Sig)) |
| end, State#evstate.funs), |
| {State, Resp}; |
| run(State, [<<"reduce">>, Funs, KVs]) -> |
| {Keys, Vals} = |
| lists:foldl(fun([K, V], {KAcc, VAcc}) -> |
| {[K | KAcc], [V | VAcc]} |
| end, {[], []}, KVs), |
| Keys2 = lists:reverse(Keys), |
| Vals2 = lists:reverse(Vals), |
| {State, catch reduce(State, Funs, Keys2, Vals2, false)}; |
| run(State, [<<"rereduce">>, Funs, Vals]) -> |
| {State, catch reduce(State, Funs, null, Vals, true)}; |
| run(State, [<<"validate">>, BFun, NDoc, ODoc, Ctx]) -> |
| {_Sig, Fun} = makefun(State, BFun), |
| {State, catch Fun(NDoc, ODoc, Ctx)}; |
| run(State, [<<"filter">>, Docs, Req]) -> |
| {_Sig, Fun} = hd(State#evstate.funs), |
| Resp = lists:map(fun(Doc) -> |
| case (catch Fun(Doc, Req)) of |
| true -> true; |
| _ -> false |
| end |
| end, Docs), |
| {State, [true, Resp]}; |
| run(State, [<<"show">>, BFun, Doc, Req]) -> |
| {_Sig, Fun} = makefun(State, BFun), |
| Resp = case (catch Fun(Doc, Req)) of |
| FunResp when is_list(FunResp) -> |
| FunResp; |
| FunResp when is_tuple(FunResp), size(FunResp) == 1 -> |
| [<<"resp">>, FunResp]; |
| FunResp -> |
| FunResp |
| end, |
| {State, Resp}; |
| run(State, [<<"update">>, BFun, Doc, Req]) -> |
| {_Sig, Fun} = makefun(State, BFun), |
| Resp = case (catch Fun(Doc, Req)) of |
| [JsonDoc, JsonResp] -> |
| [<<"up">>, JsonDoc, JsonResp] |
| end, |
| {State, Resp}; |
| run(State, [<<"list">>, Head, Req]) -> |
| {Sig, Fun} = hd(State#evstate.funs), |
| % This is kinda dirty |
| case is_function(Fun, 2) of |
| false -> throw({error, render_error}); |
| true -> ok |
| end, |
| Self = self(), |
| SpawnFun = fun() -> |
| LastChunk = (catch Fun(Head, Req)), |
| case start_list_resp(Self, Sig) of |
| started -> |
| receive |
| {Self, list_row, _Row} -> ignore; |
| {Self, list_end} -> ignore |
| after State#evstate.timeout -> |
| throw({timeout, list_cleanup_pid}) |
| end; |
| _ -> |
| ok |
| end, |
| LastChunks = |
| case erlang:get(Sig) of |
| undefined -> [LastChunk]; |
| OtherChunks -> [LastChunk | OtherChunks] |
| end, |
| Self ! {self(), list_end, lists:reverse(LastChunks)} |
| end, |
| erlang:put(do_trap, process_flag(trap_exit, true)), |
| Pid = spawn_link(SpawnFun), |
| Resp = |
| receive |
| {Pid, start, Chunks, JsonResp} -> |
| [<<"start">>, Chunks, JsonResp] |
| after State#evstate.timeout -> |
| throw({timeout, list_start}) |
| end, |
| {State#evstate{list_pid=Pid}, Resp}; |
| run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) -> |
| Pid ! {self(), list_row, Row}, |
| receive |
| {Pid, chunks, Data} -> |
| {State, [<<"chunks">>, Data]}; |
| {Pid, list_end, Data} -> |
| receive |
| {'EXIT', Pid, normal} -> ok |
| after State#evstate.timeout -> |
| throw({timeout, list_cleanup}) |
| end, |
| process_flag(trap_exit, erlang:get(do_trap)), |
| {State#evstate{list_pid=nil}, [<<"end">>, Data]} |
| after State#evstate.timeout -> |
| throw({timeout, list_row}) |
| end; |
| run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) -> |
| Pid ! {self(), list_end}, |
| Resp = |
| receive |
| {Pid, list_end, Data} -> |
| receive |
| {'EXIT', Pid, normal} -> ok |
| after State#evstate.timeout -> |
| throw({timeout, list_cleanup}) |
| end, |
| [<<"end">>, Data] |
| after State#evstate.timeout -> |
| throw({timeout, list_end}) |
| end, |
| process_flag(trap_exit, erlang:get(do_trap)), |
| {State#evstate{list_pid=nil}, Resp}; |
| run(_, Unknown) -> |
| ?LOG_ERROR("Native Process: Unknown command: ~p~n", [Unknown]), |
| throw({error, query_server_error}). |
| |
| bindings(State, Sig) -> |
| Self = self(), |
| |
| Log = fun(Msg) -> |
| ?LOG_INFO(Msg, []) |
| end, |
| |
| Emit = fun(Id, Value) -> |
| Curr = erlang:get(Sig), |
| erlang:put(Sig, [[Id, Value] | Curr]) |
| end, |
| |
| Start = fun(Headers) -> |
| erlang:put(list_headers, Headers) |
| end, |
| |
| Send = fun(Chunk) -> |
| Curr = |
| case erlang:get(Sig) of |
| undefined -> []; |
| Else -> Else |
| end, |
| erlang:put(Sig, [Chunk | Curr]) |
| end, |
| |
| GetRow = fun() -> |
| case start_list_resp(Self, Sig) of |
| started -> |
| ok; |
| _ -> |
| Chunks = |
| case erlang:get(Sig) of |
| undefined -> []; |
| CurrChunks -> CurrChunks |
| end, |
| Self ! {self(), chunks, lists:reverse(Chunks)} |
| end, |
| erlang:put(Sig, []), |
| receive |
| {Self, list_row, Row} -> Row; |
| {Self, list_end} -> nil |
| after State#evstate.timeout -> |
| throw({timeout, list_pid_getrow}) |
| end |
| end, |
| |
| FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end, |
| |
| [ |
| {'Log', Log}, |
| {'Emit', Emit}, |
| {'Start', Start}, |
| {'Send', Send}, |
| {'GetRow', GetRow}, |
| {'FoldRows', FoldRows} |
| ]. |
| |
| % thanks to erlview, via: |
| % http://erlang.org/pipermail/erlang-questions/2003-November/010544.html |
| makefun(State, Source) -> |
| Sig = erlang:md5(Source), |
| BindFuns = bindings(State, Sig), |
| {Sig, makefun(State, Source, BindFuns)}. |
| |
| makefun(_State, Source, BindFuns) -> |
| FunStr = binary_to_list(Source), |
| {ok, Tokens, _} = erl_scan:string(FunStr), |
| Form = case (catch erl_parse:parse_exprs(Tokens)) of |
| {ok, [ParsedForm]} -> |
| ParsedForm; |
| {error, {LineNum, _Mod, [Mesg, Params]}}=Error -> |
| io:format(standard_error, "Syntax error on line: ~p~n", [LineNum]), |
| io:format(standard_error, "~s~p~n", [Mesg, Params]), |
| throw(Error) |
| end, |
| Bindings = lists:foldl(fun({Name, Fun}, Acc) -> |
| erl_eval:add_binding(Name, Fun, Acc) |
| end, erl_eval:new_bindings(), BindFuns), |
| {value, Fun, _} = erl_eval:expr(Form, Bindings), |
| Fun. |
| |
| reduce(State, BinFuns, Keys, Vals, ReReduce) -> |
| Funs = case is_list(BinFuns) of |
| true -> |
| lists:map(fun(BF) -> makefun(State, BF) end, BinFuns); |
| _ -> |
| [makefun(State, BinFuns)] |
| end, |
| Reds = lists:map(fun({_Sig, Fun}) -> |
| Fun(Keys, Vals, ReReduce) |
| end, Funs), |
| [true, Reds]. |
| |
| foldrows(GetRow, ProcRow, Acc) -> |
| case GetRow() of |
| nil -> |
| {ok, Acc}; |
| Row -> |
| case (catch ProcRow(Row, Acc)) of |
| {ok, Acc2} -> |
| foldrows(GetRow, ProcRow, Acc2); |
| {stop, Acc2} -> |
| {ok, Acc2} |
| end |
| end. |
| |
| start_list_resp(Self, Sig) -> |
| case erlang:get(list_started) of |
| undefined -> |
| Headers = |
| case erlang:get(list_headers) of |
| undefined -> {[{<<"headers">>, {[]}}]}; |
| CurrHdrs -> CurrHdrs |
| end, |
| Chunks = |
| case erlang:get(Sig) of |
| undefined -> []; |
| CurrChunks -> CurrChunks |
| end, |
| Self ! {self(), start, lists:reverse(Chunks), Headers}, |
| erlang:put(list_started, true), |
| erlang:put(Sig, []), |
| started; |
| _ -> |
| ok |
| end. |
| |
| to_binary({Data}) -> |
| Pred = fun({Key, Value}) -> |
| {to_binary(Key), to_binary(Value)} |
| end, |
| {lists:map(Pred, Data)}; |
| to_binary(Data) when is_list(Data) -> |
| lists:map(fun to_binary/1, Data); |
| to_binary(null) -> |
| null; |
| to_binary(true) -> |
| true; |
| to_binary(false) -> |
| false; |
| to_binary(Data) when is_atom(Data) -> |
| list_to_binary(atom_to_list(Data)); |
| to_binary(Data) -> |
| Data. |