blob: 4cbd9f053fa8dfa13625186de9b783642e7bc0bd [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License");
% you may not use this file except in compliance with the License.
%
% You may obtain a copy of the License at
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing,
% software distributed under the License is distributed on an
% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
% either express or implied.
%
% See the License for the specific language governing permissions
% and limitations under the License.
%
% This file drew much inspiration from erlview, which was written by and
% copyright Michael McDaniel [http://autosys.us], and is also under APL 2.0
%
%
% This module provides the smallest possible native view-server.
% With this module in-place, you can add the following to your couch INI files:
% [native_query_servers]
% erlang={couch_native_process, start_link, []}
%
% Which will then allow following example map function to be used:
%
% fun({Doc}) ->
% % Below, we emit a single record - the _id as key, null as value
% DocId = couch_util:get_value(<<"_id">>, Doc, null),
% Emit(DocId, null)
% end.
%
% which should be roughly the same as the javascript:
% emit(doc._id, null);
%
% This module exposes enough functions such that a native erlang server can
% act as a fully-fleged view server, but no 'helper' functions specifically
% for simplifying your erlang view code. It is expected other third-party
% extensions will evolve which offer useful layers on top of this view server
% to help simplify your view code.
-module(couch_js_native_process).
-behaviour(gen_server).
-vsn(1).
-export([
start_link/0,
init/1,
terminate/2,
handle_call/3,
handle_cast/2,
code_change/3,
handle_info/2,
format_status/2
]).
-export([set_timeout/2, prompt/2]).
-define(STATE, native_proc_state).
-record(evstate, {
ddocs,
funs = [],
query_config = [],
list_pid = nil,
timeout = 5000,
idle = 5000
}).
-include_lib("couch/include/couch_db.hrl").
-include_lib("kernel/include/logger.hrl").
start_link() ->
gen_server:start_link(?MODULE, [], []).
% this is a bit messy, see also couch_query_servers handle_info
% stop(_Pid) ->
% ok.
set_timeout(Pid, TimeOut) ->
gen_server:call(Pid, {set_timeout, TimeOut}).
prompt(Pid, Data) when is_list(Data) ->
gen_server:call(Pid, {prompt, Data}).
% gen_server callbacks
init([]) ->
V = config:get("query_server_config", "os_process_idle_limit", "300"),
Idle = list_to_integer(V) * 1000,
{ok, #evstate{ddocs = dict:new(), idle = Idle}, Idle}.
handle_call({set_timeout, TimeOut}, _From, State) ->
{reply, ok, State#evstate{timeout = TimeOut}, State#evstate.idle};
handle_call({prompt, Data}, _From, State) ->
?LOG_DEBUG(#{
what => prompt,
in => native_process,
msg => ?JSON_ENCODE(Data)
}),
couch_log:debug("Prompt native qs: ~s", [?JSON_ENCODE(Data)]),
{NewState, Resp} =
try run(State, to_binary(Data)) of
{S, R} -> {S, R}
catch
throw:{error, Why} ->
{State, [<<"error">>, Why, Why]}
end,
Idle = State#evstate.idle,
case Resp of
{error, Reason} ->
Msg = io_lib:format("couch native server error: ~p", [Reason]),
Error = [<<"error">>, <<"native_query_server">>, list_to_binary(Msg)],
{reply, Error, NewState, Idle};
[<<"error">> | Rest] ->
% Msg = io_lib:format("couch native server error: ~p", [Rest]),
% TODO: markh? (jan)
{reply, [<<"error">> | Rest], NewState, Idle};
[<<"fatal">> | Rest] ->
% Msg = io_lib:format("couch native server error: ~p", [Rest]),
% TODO: markh? (jan)
{stop, fatal, [<<"error">> | Rest], NewState};
Resp ->
{reply, Resp, NewState, Idle}
end.
handle_cast(garbage_collect, State) ->
erlang:garbage_collect(),
{noreply, State, State#evstate.idle};
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(_Msg, State) ->
{noreply, State, State#evstate.idle}.
handle_info(timeout, State) ->
gen_server:cast(couch_js_proc_manager, {os_proc_idle, self()}),
erlang:garbage_collect(),
{noreply, State, State#evstate.idle};
handle_info({'EXIT', _, normal}, State) ->
{noreply, State, State#evstate.idle};
handle_info({'EXIT', _, Reason}, State) ->
{stop, Reason, State}.
terminate(_Reason, _State) -> ok.
code_change(_OldVersion, State, _Extra) -> {ok, State}.
format_status(_Opt, [_PDict, State]) ->
#evstate{
ddocs = DDocs,
funs = Funs,
query_config = Config
} = State,
Scrubbed = State#evstate{
ddocs = {dict_size, dict:size(DDocs)},
funs = {length, length(Funs)},
query_config = {length, length(Config)}
},
[{data, [{"State", ?record_to_keyval(evstate, Scrubbed)}]}].
run(#evstate{list_pid = Pid} = State, [<<"list_row">>, Row]) when is_pid(Pid) ->
Pid ! {self(), list_row, Row},
receive
{Pid, chunks, Data} ->
{State, [<<"chunks">>, Data]};
{Pid, list_end, Data} ->
receive
{'EXIT', Pid, normal} -> ok
after State#evstate.timeout ->
throw({timeout, list_cleanup})
end,
process_flag(trap_exit, erlang:get(do_trap)),
{State#evstate{list_pid = nil}, [<<"end">>, Data]}
after State#evstate.timeout ->
throw({timeout, list_row})
end;
run(#evstate{list_pid = Pid} = State, [<<"list_end">>]) when is_pid(Pid) ->
Pid ! {self(), list_end},
Resp =
receive
{Pid, list_end, Data} ->
receive
{'EXIT', Pid, normal} -> ok
after State#evstate.timeout ->
throw({timeout, list_cleanup})
end,
[<<"end">>, Data]
after State#evstate.timeout ->
throw({timeout, list_end})
end,
process_flag(trap_exit, erlang:get(do_trap)),
{State#evstate{list_pid = nil}, Resp};
run(#evstate{list_pid = Pid} = State, _Command) when is_pid(Pid) ->
{State, [<<"error">>, list_error, list_error]};
run(#evstate{ddocs = DDocs}, [<<"reset">>]) ->
{#evstate{ddocs = DDocs}, true};
run(#evstate{ddocs = DDocs, idle = Idle}, [<<"reset">>, QueryConfig]) ->
NewState = #evstate{
ddocs = DDocs,
query_config = QueryConfig,
idle = Idle
},
{NewState, true};
run(#evstate{funs = Funs} = State, [<<"add_fun">>, BinFunc]) ->
FunInfo = makefun(State, BinFunc),
{State#evstate{funs = Funs ++ [FunInfo]}, true};
run(State, [<<"map_doc">>, Doc]) ->
Resp = lists:map(
fun({Sig, Fun}) ->
erlang:put(Sig, []),
Fun(Doc),
lists:reverse(erlang:get(Sig))
end,
State#evstate.funs
),
{State, Resp};
run(State, [<<"reduce">>, Funs, KVs]) ->
{Keys, Vals} =
lists:foldl(
fun([K, V], {KAcc, VAcc}) ->
{[K | KAcc], [V | VAcc]}
end,
{[], []},
KVs
),
Keys2 = lists:reverse(Keys),
Vals2 = lists:reverse(Vals),
{State, catch reduce(State, Funs, Keys2, Vals2, false)};
run(State, [<<"rereduce">>, Funs, Vals]) ->
{State, catch reduce(State, Funs, null, Vals, true)};
run(#evstate{ddocs = DDocs} = State, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) ->
DDocs2 = store_ddoc(DDocs, DDocId, DDoc),
{State#evstate{ddocs = DDocs2}, true};
run(#evstate{ddocs = DDocs} = State, [<<"ddoc">>, DDocId | Rest]) ->
DDoc = load_ddoc(DDocs, DDocId),
ddoc(State, DDoc, Rest);
run(_, Unknown) ->
?LOG_ERROR(#{
what => unknown_command,
in => native_process,
cmd => Unknown
}),
couch_log:error("Native Process: Unknown command: ~p~n", [Unknown]),
throw({error, unknown_command}).
ddoc(State, {DDoc}, [FunPath, Args]) ->
% load fun from the FunPath
BFun = lists:foldl(
fun
(Key, {Props}) when is_list(Props) ->
couch_util:get_value(Key, Props, nil);
(_Key, Fun) when is_binary(Fun) ->
Fun;
(_Key, nil) ->
throw({error, not_found});
(_Key, _Fun) ->
throw({error, malformed_ddoc})
end,
{DDoc},
FunPath
),
ddoc(State, makefun(State, BFun, {DDoc}), FunPath, Args).
ddoc(State, {_, Fun}, [<<"validate_doc_update">>], Args) ->
{State, (catch apply(Fun, Args))};
ddoc(State, {_, Fun}, [<<"rewrites">>], Args) ->
{State, (catch apply(Fun, Args))};
ddoc(State, {_, Fun}, [<<"filters">> | _], [Docs, Req]) ->
FilterFunWrapper = fun(Doc) ->
case catch Fun(Doc, Req) of
true ->
true;
false ->
false;
{'EXIT', Error} ->
?LOG_ERROR(#{
what => filter_fun_crash,
in => native_process,
details => Error
}),
couch_log:error("~p", [Error])
end
end,
Resp = lists:map(FilterFunWrapper, Docs),
{State, [true, Resp]};
ddoc(State, {_, Fun}, [<<"views">> | _], [Docs]) ->
MapFunWrapper = fun(Doc) ->
case catch Fun(Doc) of
undefined ->
true;
ok ->
false;
false ->
false;
[_ | _] ->
true;
{'EXIT', Error} ->
?LOG_ERROR(#{
what => view_fun_crash,
in => native_process,
details => Error
}),
couch_log:error("~p", [Error])
end
end,
Resp = lists:map(MapFunWrapper, Docs),
{State, [true, Resp]};
ddoc(State, {_, Fun}, [<<"shows">> | _], Args) ->
Resp =
case (catch apply(Fun, Args)) of
FunResp when is_list(FunResp) ->
FunResp;
{FunResp} ->
[<<"resp">>, {FunResp}];
FunResp ->
FunResp
end,
{State, Resp};
ddoc(State, {_, Fun}, [<<"updates">> | _], Args) ->
Resp =
case (catch apply(Fun, Args)) of
[JsonDoc, JsonResp] ->
[<<"up">>, JsonDoc, JsonResp]
end,
{State, Resp};
ddoc(State, {Sig, Fun}, [<<"lists">> | _], Args) ->
Self = self(),
SpawnFun = fun() ->
LastChunk = (catch apply(Fun, Args)),
case start_list_resp(Self, Sig) of
started ->
receive
{Self, list_row, _Row} -> ignore;
{Self, list_end} -> ignore
after State#evstate.timeout ->
throw({timeout, list_cleanup_pid})
end;
_ ->
ok
end,
LastChunks =
case erlang:get(Sig) of
undefined -> [LastChunk];
OtherChunks -> [LastChunk | OtherChunks]
end,
Self ! {self(), list_end, lists:reverse(LastChunks)}
end,
erlang:put(do_trap, process_flag(trap_exit, true)),
Pid = spawn_link(SpawnFun),
Resp =
receive
{Pid, start, Chunks, JsonResp} ->
[<<"start">>, Chunks, JsonResp]
after State#evstate.timeout ->
throw({timeout, list_start})
end,
{State#evstate{list_pid = Pid}, Resp}.
store_ddoc(DDocs, DDocId, DDoc) ->
dict:store(DDocId, DDoc, DDocs).
load_ddoc(DDocs, DDocId) ->
try dict:fetch(DDocId, DDocs) of
{DDoc} -> {DDoc}
catch
_:_Else ->
throw(
{error,
?l2b(io_lib:format("Native Query Server missing DDoc with Id: ~s", [DDocId]))}
)
end.
bindings(State, Sig) ->
bindings(State, Sig, nil).
bindings(State, Sig, DDoc) ->
Self = self(),
Log = fun(Msg) ->
?LOG_INFO(#{
what => user_defined_log,
in => native_process,
signature => Sig,
msg => Msg
}),
couch_log:info(Msg, [])
end,
Emit = fun(Id, Value) ->
Curr = erlang:get(Sig),
erlang:put(Sig, [[Id, Value] | Curr])
end,
Start = fun(Headers) ->
erlang:put(list_headers, Headers)
end,
Send = fun(Chunk) ->
Curr =
case erlang:get(Sig) of
undefined -> [];
Else -> Else
end,
erlang:put(Sig, [Chunk | Curr])
end,
GetRow = fun() ->
case start_list_resp(Self, Sig) of
started ->
ok;
_ ->
Chunks =
case erlang:get(Sig) of
undefined -> [];
CurrChunks -> CurrChunks
end,
Self ! {self(), chunks, lists:reverse(Chunks)}
end,
erlang:put(Sig, []),
receive
{Self, list_row, Row} -> Row;
{Self, list_end} -> nil
after State#evstate.timeout ->
throw({timeout, list_pid_getrow})
end
end,
FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end,
Bindings = [
{'Log', Log},
{'Emit', Emit},
{'Start', Start},
{'Send', Send},
{'GetRow', GetRow},
{'FoldRows', FoldRows}
],
case DDoc of
{_Props} ->
Bindings ++ [{'DDoc', DDoc}];
_Else ->
Bindings
end.
% thanks to erlview, via:
% http://erlang.org/pipermail/erlang-questions/2003-November/010544.html
makefun(State, Source) ->
Sig = couch_hash:md5_hash(Source),
BindFuns = bindings(State, Sig),
{Sig, makefun(State, Source, BindFuns)}.
makefun(State, Source, {DDoc}) ->
Sig = couch_hash:md5_hash(lists:flatten([Source, term_to_binary(DDoc)])),
BindFuns = bindings(State, Sig, {DDoc}),
{Sig, makefun(State, Source, BindFuns)};
makefun(_State, Source, BindFuns) when is_list(BindFuns) ->
FunStr = binary_to_list(Source),
{ok, Tokens, _} = erl_scan:string(FunStr),
Form =
case (catch erl_parse:parse_exprs(Tokens)) of
{ok, [ParsedForm]} ->
ParsedForm;
{error, {LineNum, _Mod, [Mesg, Params]}} = Error ->
?LOG_ERROR(#{
what => syntax_error,
in => native_process,
line => LineNum,
details => Mesg,
parameters => Params
}),
couch_log:error(
"Syntax error on line: ~p~n~s~p~n",
[LineNum, Mesg, Params]
),
throw(Error)
end,
Bindings = lists:foldl(
fun({Name, Fun}, Acc) ->
erl_eval:add_binding(Name, Fun, Acc)
end,
erl_eval:new_bindings(),
BindFuns
),
{value, Fun, _} = erl_eval:expr(Form, Bindings),
Fun.
reduce(State, BinFuns, Keys, Vals, ReReduce) ->
Funs =
case is_list(BinFuns) of
true ->
lists:map(fun(BF) -> makefun(State, BF) end, BinFuns);
_ ->
[makefun(State, BinFuns)]
end,
Reds = lists:map(
fun({_Sig, Fun}) ->
Fun(Keys, Vals, ReReduce)
end,
Funs
),
[true, Reds].
foldrows(GetRow, ProcRow, Acc) ->
case GetRow() of
nil ->
{ok, Acc};
Row ->
case (catch ProcRow(Row, Acc)) of
{ok, Acc2} ->
foldrows(GetRow, ProcRow, Acc2);
{stop, Acc2} ->
{ok, Acc2}
end
end.
start_list_resp(Self, Sig) ->
case erlang:get(list_started) of
undefined ->
Headers =
case erlang:get(list_headers) of
undefined -> {[{<<"headers">>, {[]}}]};
CurrHdrs -> CurrHdrs
end,
Chunks =
case erlang:get(Sig) of
undefined -> [];
CurrChunks -> CurrChunks
end,
Self ! {self(), start, lists:reverse(Chunks), Headers},
erlang:put(list_started, true),
erlang:put(Sig, []),
started;
_ ->
ok
end.
to_binary({Data}) ->
Pred = fun({Key, Value}) ->
{to_binary(Key), to_binary(Value)}
end,
{lists:map(Pred, Data)};
to_binary(Data) when is_list(Data) ->
[to_binary(D) || D <- Data];
to_binary(null) ->
null;
to_binary(true) ->
true;
to_binary(false) ->
false;
to_binary(Data) when is_atom(Data) ->
list_to_binary(atom_to_list(Data));
to_binary(Data) ->
Data.