blob: e2208ba5e3aa5379ce0c9e524a5652e7d71903fd [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License");
% you may not use this file except in compliance with the License.
%
% You may obtain a copy of the License at
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing,
% software distributed under the License is distributed on an
% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
% either express or implied.
%
% See the License for the specific language governing permissions
% and limitations under the License.
%
% This file drew much inspiration from erlview, which was written by and
% copyright Michael McDaniel [http://autosys.us], and is also under APL 2.0
%
%
% This module provides the smallest possible native view-server.
% With this module in-place, you can add the following to your couch INI files:
% [native_query_servers]
% erlang={couch_native_process, start_link, []}
%
% Which will then allow following example map function to be used:
%
% fun({Doc}) ->
% % Below, we emit a single record - the _id as key, null as value
% DocId = proplists:get_value(Doc, <<"_id">>, null),
% Emit(DocId, null)
% end.
%
% which should be roughly the same as the javascript:
% emit(doc._id, null);
%
% This module exposes enough functions such that a native erlang server can
% act as a fully-fleged view server, but no 'helper' functions specifically
% for simplifying your erlang view code. It is expected other third-party
% extensions will evolve which offer useful layers on top of this view server
% to help simplify your view code.
-module(couch_native_process).
-export([start_link/0]).
-export([set_timeout/2, prompt/2, stop/1]).
-export([loop/0]).
-define(STATE, native_proc_state).
-record(evstate, {funs=[], query_config=[], list_pid=nil, timeout=5000}).
-include("couch_db.hrl").
start_link() ->
{ok, spawn_link(?MODULE, loop, [])}.
stop(Pid) ->
Pid ! {stop, self()},
receive
{ok, Pid} -> ok
after 1000 ->
throw({error, timeout})
end.
prompt(Pid, Data) ->
Pid ! {prompt, self(), Data},
receive
{ok, Pid, Resp} -> Resp
after 1000 ->
throw({error, timeout})
end.
set_timeout(Pid, TimeOut) ->
Pid ! {set_timeout, self(), TimeOut},
receive
{ok, Pid} -> ok
after 1000 ->
throw({error, timeout})
end.
loop() ->
receive
{prompt, From, Data} ->
case (catch prompt(Data)) of
{error, Reason} ->
From ! {ok, self(), {[{error, Reason}]}};
{NewState, Resp} ->
put(?STATE, NewState),
From ! {ok, self(), Resp},
loop()
end;
{set_timeout, From, TimeOut} ->
NewState = case get(?STATE) of
undefined -> #evstate{timeout=TimeOut};
State -> State#evstate{timeout=TimeOut}
end,
put(?STATE, NewState),
From ! {ok, self()},
loop();
{stop, From} ->
From ! {ok, self()}
end.
prompt(Data) when is_list(Data) ->
case get(?STATE) of
undefined ->
State = #evstate{},
put(?STATE, State);
State ->
State
end,
case is_pid(State#evstate.list_pid) of
true ->
case hd(Data) of
<<"list_row">> -> ok;
<<"list_end">> -> ok;
_ -> throw({error, query_server_error})
end;
_ ->
ok % Not listing
end,
run(State, to_binary(Data)).
run(_, [<<"reset">>]) ->
{#evstate{}, true};
run(_, [<<"reset">>, QueryConfig]) ->
{#evstate{query_config=QueryConfig}, true};
run(#evstate{funs=Funs}=State, [<<"add_fun">> , BinFunc]) ->
FunInfo = makefun(State, BinFunc),
{State#evstate{funs=Funs ++ [FunInfo]}, true};
run(State, [<<"map_doc">> , Doc]) ->
Resp = lists:map(fun({Sig, Fun}) ->
erlang:put(Sig, []),
Fun(Doc),
lists:reverse(erlang:get(Sig))
end, State#evstate.funs),
{State, Resp};
run(State, [<<"reduce">>, Funs, KVs]) ->
{Keys, Vals} =
lists:foldl(fun([K, V], {KAcc, VAcc}) ->
{[K | KAcc], [V | VAcc]}
end, {[], []}, KVs),
Keys2 = lists:reverse(Keys),
Vals2 = lists:reverse(Vals),
{State, catch reduce(State, Funs, Keys2, Vals2, false)};
run(State, [<<"rereduce">>, Funs, Vals]) ->
{State, catch reduce(State, Funs, null, Vals, true)};
run(State, [<<"validate">>, BFun, NDoc, ODoc, Ctx]) ->
{_Sig, Fun} = makefun(State, BFun),
{State, catch Fun(NDoc, ODoc, Ctx)};
run(State, [<<"filter">>, Docs, Req]) ->
{_Sig, Fun} = hd(State#evstate.funs),
Resp = lists:map(fun(Doc) ->
case (catch Fun(Doc, Req)) of
true -> true;
_ -> false
end
end, Docs),
{State, [true, Resp]};
run(State, [<<"show">>, BFun, Doc, Req]) ->
{_Sig, Fun} = makefun(State, BFun),
Resp = case (catch Fun(Doc, Req)) of
FunResp when is_list(FunResp) ->
FunResp;
FunResp when is_tuple(FunResp), size(FunResp) == 1 ->
[<<"resp">>, FunResp];
FunResp ->
FunResp
end,
{State, Resp};
run(State, [<<"update">>, BFun, Doc, Req]) ->
{_Sig, Fun} = makefun(State, BFun),
Resp = case (catch Fun(Doc, Req)) of
[JsonDoc, JsonResp] ->
[<<"up">>, JsonDoc, JsonResp]
end,
{State, Resp};
run(State, [<<"list">>, Head, Req]) ->
{Sig, Fun} = hd(State#evstate.funs),
% This is kinda dirty
case is_function(Fun, 2) of
false -> throw({error, render_error});
true -> ok
end,
Self = self(),
SpawnFun = fun() ->
LastChunk = (catch Fun(Head, Req)),
case start_list_resp(Self, Sig) of
started ->
receive
{Self, list_row, _Row} -> ignore;
{Self, list_end} -> ignore
after State#evstate.timeout ->
throw({timeout, list_cleanup_pid})
end;
_ ->
ok
end,
LastChunks =
case erlang:get(Sig) of
undefined -> [LastChunk];
OtherChunks -> [LastChunk | OtherChunks]
end,
Self ! {self(), list_end, lists:reverse(LastChunks)}
end,
erlang:put(do_trap, process_flag(trap_exit, true)),
Pid = spawn_link(SpawnFun),
Resp =
receive
{Pid, start, Chunks, JsonResp} ->
[<<"start">>, Chunks, JsonResp]
after State#evstate.timeout ->
throw({timeout, list_start})
end,
{State#evstate{list_pid=Pid}, Resp};
run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) ->
Pid ! {self(), list_row, Row},
receive
{Pid, chunks, Data} ->
{State, [<<"chunks">>, Data]};
{Pid, list_end, Data} ->
receive
{'EXIT', Pid, normal} -> ok
after State#evstate.timeout ->
throw({timeout, list_cleanup})
end,
process_flag(trap_exit, erlang:get(do_trap)),
{State#evstate{list_pid=nil}, [<<"end">>, Data]}
after State#evstate.timeout ->
throw({timeout, list_row})
end;
run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) ->
Pid ! {self(), list_end},
Resp =
receive
{Pid, list_end, Data} ->
receive
{'EXIT', Pid, normal} -> ok
after State#evstate.timeout ->
throw({timeout, list_cleanup})
end,
[<<"end">>, Data]
after State#evstate.timeout ->
throw({timeout, list_end})
end,
process_flag(trap_exit, erlang:get(do_trap)),
{State#evstate{list_pid=nil}, Resp};
run(_, Unknown) ->
?LOG_ERROR("Native Process: Unknown command: ~p~n", [Unknown]),
throw({error, query_server_error}).
bindings(State, Sig) ->
Self = self(),
Log = fun(Msg) ->
?LOG_INFO(Msg, [])
end,
Emit = fun(Id, Value) ->
Curr = erlang:get(Sig),
erlang:put(Sig, [[Id, Value] | Curr])
end,
Start = fun(Headers) ->
erlang:put(list_headers, Headers)
end,
Send = fun(Chunk) ->
Curr =
case erlang:get(Sig) of
undefined -> [];
Else -> Else
end,
erlang:put(Sig, [Chunk | Curr])
end,
GetRow = fun() ->
case start_list_resp(Self, Sig) of
started ->
ok;
_ ->
Chunks =
case erlang:get(Sig) of
undefined -> [];
CurrChunks -> CurrChunks
end,
Self ! {self(), chunks, lists:reverse(Chunks)}
end,
erlang:put(Sig, []),
receive
{Self, list_row, Row} -> Row;
{Self, list_end} -> nil
after State#evstate.timeout ->
throw({timeout, list_pid_getrow})
end
end,
FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end,
[
{'Log', Log},
{'Emit', Emit},
{'Start', Start},
{'Send', Send},
{'GetRow', GetRow},
{'FoldRows', FoldRows}
].
% thanks to erlview, via:
% http://erlang.org/pipermail/erlang-questions/2003-November/010544.html
makefun(State, Source) ->
Sig = erlang:md5(Source),
BindFuns = bindings(State, Sig),
{Sig, makefun(State, Source, BindFuns)}.
makefun(_State, Source, BindFuns) ->
FunStr = binary_to_list(Source),
{ok, Tokens, _} = erl_scan:string(FunStr),
Form = case (catch erl_parse:parse_exprs(Tokens)) of
{ok, [ParsedForm]} ->
ParsedForm;
{error, {LineNum, _Mod, [Mesg, Params]}}=Error ->
io:format(standard_error, "Syntax error on line: ~p~n", [LineNum]),
io:format(standard_error, "~s~p~n", [Mesg, Params]),
throw(Error)
end,
Bindings = lists:foldl(fun({Name, Fun}, Acc) ->
erl_eval:add_binding(Name, Fun, Acc)
end, erl_eval:new_bindings(), BindFuns),
{value, Fun, _} = erl_eval:expr(Form, Bindings),
Fun.
reduce(State, BinFuns, Keys, Vals, ReReduce) ->
Funs = case is_list(BinFuns) of
true ->
lists:map(fun(BF) -> makefun(State, BF) end, BinFuns);
_ ->
[makefun(State, BinFuns)]
end,
Reds = lists:map(fun({_Sig, Fun}) ->
Fun(Keys, Vals, ReReduce)
end, Funs),
[true, Reds].
foldrows(GetRow, ProcRow, Acc) ->
case GetRow() of
nil ->
{ok, Acc};
Row ->
case (catch ProcRow(Row, Acc)) of
{ok, Acc2} ->
foldrows(GetRow, ProcRow, Acc2);
{stop, Acc2} ->
{ok, Acc2}
end
end.
start_list_resp(Self, Sig) ->
case erlang:get(list_started) of
undefined ->
Headers =
case erlang:get(list_headers) of
undefined -> {[{<<"headers">>, {[]}}]};
CurrHdrs -> CurrHdrs
end,
Chunks =
case erlang:get(Sig) of
undefined -> [];
CurrChunks -> CurrChunks
end,
Self ! {self(), start, lists:reverse(Chunks), Headers},
erlang:put(list_started, true),
erlang:put(Sig, []),
started;
_ ->
ok
end.
to_binary({Data}) ->
Pred = fun({Key, Value}) ->
{to_binary(Key), to_binary(Value)}
end,
{lists:map(Pred, Data)};
to_binary(Data) when is_list(Data) ->
lists:map(fun to_binary/1, Data);
to_binary(null) ->
null;
to_binary(true) ->
true;
to_binary(false) ->
false;
to_binary(Data) when is_atom(Data) ->
list_to_binary(atom_to_list(Data));
to_binary(Data) ->
Data.