blob: 0ea4f6129f6373c49d63ed8095f811bb7167782e [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_quickjs_scanner_plugin).
-behaviour(couch_scanner_plugin).
-export([
start/2,
resume/2,
complete/1,
checkpoint/1,
db/2,
ddoc/3,
shards/2,
db_opened/2,
doc_id/3,
doc/3,
db_closing/2
]).
-include_lib("couch_scanner/include/couch_scanner_plugin.hrl").
-record(st, {
sid,
ddocs = #{},
docs = [],
docs_size = 0,
qjs_proc,
sm_proc,
ddoc_cnt = 0,
doc_cnt = 0,
doc_step = 0,
max_ddocs = 100,
max_shards = 4,
max_docs = 1000,
max_step = 1000,
% These match defaults in couch_mrview_updater
max_batch_items = 100,
max_batch_size = 16777216
}).
% DDoc fields
-define(FILTERS, <<"filters">>).
-define(VIEWS, <<"views">>).
-define(MAP, <<"map">>).
-define(REDUCE, <<"reduce">>).
-define(LIB, <<"lib">>).
-define(VDU, <<"validate_doc_update">>).
% Behavior callbacks
start(SId, #{}) ->
case couch_server:with_spidermonkey() of
true ->
?WARN("starting.", [], #{sid => SId}),
{ok, init_config(#st{sid = SId})};
false ->
?WARN("not starting. Spidermonkey not enabled", [], #{}),
skip
end.
resume(SId, #{}) ->
case couch_server:with_spidermonkey() of
true ->
?WARN("resuming", [], #{sid => SId}),
{ok, init_config(#st{sid = SId})};
false ->
?WARN("not resuming. Spidermonkey not enabled", [], #{}),
skip
end.
complete(#st{sid = SId}) ->
?WARN("completed", [], #{sid => SId}),
{ok, #{}}.
checkpoint(#st{sid = SId}) ->
?INFO(#{sid => SId}),
{ok, #{<<"sid">> => SId}}.
db(#st{sid = SId} = St, DbName) ->
St1 = reset_per_db_state(St),
?INFO(#{sid => SId, db => DbName}),
{ok, St1}.
ddoc(#st{ddoc_cnt = C, max_ddocs = M} = St, _, _) when C > M ->
{stop, St};
ddoc(#st{sid = SId} = St, DbName, #doc{id = DDocId} = DDoc) ->
?INFO(#{sid => SId, db => DbName, ddoc => DDocId}),
#st{ddoc_cnt = Cnt} = St,
St1 = St#st{ddoc_cnt = Cnt + 1},
#doc{body = {Props = [_ | _]}} = DDoc,
case couch_util:get_value(<<"language">>, Props, <<"javascript">>) of
<<"javascript">> -> {ok, process_ddoc(St1, DbName, DDoc)};
_ -> {ok, St1}
end.
shards(#st{max_shards = Max, ddocs = DDocs} = St, Shards) ->
case {map_size(DDocs), lists:sublist(Shards, Max)} of
{0, _} -> {[], St};
{_, []} -> {[], St};
{_, Shards1} -> {Shards1, St}
end.
db_opened(#st{} = St, Db) ->
#st{max_docs = MaxDocs, max_step = MaxStep} = St,
{ok, DocTotal} = couch_db:get_doc_count(Db),
Step = min(MaxStep, max(1, DocTotal div MaxDocs)),
{ok, St#st{doc_cnt = 0, doc_step = Step, docs = []}}.
doc_id(#st{} = St, <<?DESIGN_DOC_PREFIX, _/binary>>, _Db) ->
{skip, St};
doc_id(#st{doc_cnt = C, max_docs = M} = St, _DocId, _Db) when C > M ->
{stop, St};
doc_id(#st{doc_cnt = C, doc_step = S} = St, _DocId, _Db) when C rem S /= 0 ->
{skip, St#st{doc_cnt = C + 1}};
doc_id(#st{doc_cnt = C} = St, _DocId, _Db) ->
{ok, St#st{doc_cnt = C + 1}}.
doc(#st{} = St, Db, #doc{id = DocId} = Doc) ->
#st{
sid = SId,
ddocs = DDocs,
docs = Docs,
docs_size = DocsSize,
max_batch_items = MaxItems,
max_batch_size = MaxSize
} = St,
JsonDoc = couch_query_servers:json_doc(Doc),
DDocFun = fun(DDocId, #{} = DDoc) ->
try
Filters = maps:get(?FILTERS, DDoc, undefined),
filter_doc_validate(St, DDocId, Filters, JsonDoc),
VDU = maps:get(?VDU, DDoc, undefined),
vdu_doc_validate(St, DDocId, VDU, JsonDoc)
catch
throw:{validate, Error} ->
Meta = #{sid => SId, db => Db, ddoc => DDocId, doc => DocId},
?WARN("doc validation failed ~p", [Error], Meta)
end
end,
maps:foreach(DDocFun, DDocs),
DocsSize1 = DocsSize + ?term_size(JsonDoc),
case DocsSize1 < MaxSize andalso length(Docs) < MaxItems of
true ->
St1 = St#st{docs = [JsonDoc | Docs], docs_size = DocsSize1},
{ok, St1};
false ->
{_Db, St1} = maps:fold(fun views_validate/3, {Db, St}, DDocs),
St2 = St1#st{docs = [], docs_size = 0},
{ok, St2}
end.
db_closing(#st{docs = []} = St, _Db) ->
{ok, St#st{doc_cnt = 0, doc_step = 0}};
db_closing(#st{ddocs = DDocs} = St, Db) ->
{_Db, St1} = maps:fold(fun views_validate/3, {Db, St}, DDocs),
{ok, St1#st{doc_cnt = 0, doc_step = 0, docs = []}}.
% Private
init_config(#st{} = St) ->
St#st{
max_ddocs = cfg_int("max_ddocs", St#st.max_ddocs),
max_shards = cfg_int("max_shards", St#st.max_shards),
max_docs = cfg_int("max_docs", St#st.max_docs),
max_step = cfg_int("max_step", St#st.max_step),
max_batch_items = cfg_int("max_batch_items", St#st.max_batch_items),
max_batch_size = cfg_int("max_batch_size", St#st.max_batch_size)
}.
cfg_int(Key, Default) when is_list(Key), is_integer(Default) ->
config:get_integer(atom_to_list(?MODULE), Key, Default).
process_ddoc(#st{} = St, DbName, #doc{} = DDoc0) ->
#st{sid = SId, ddocs = DDocs} = St,
#doc{id = DDocId, body = Body} = DDoc0,
DDoc = couch_scanner_util:ejson_map(Body),
case map_size(DDoc) > 0 of
true ->
St1 = start_or_reset_procs(St),
try
lib_load(St1, maps:get(?LIB, DDoc, undefined)),
views_load(St1, maps:get(?VIEWS, DDoc, undefined)),
filters_load(St1, maps:get(?FILTERS, DDoc, undefined)),
vdu_load(St1, maps:get(?VDU, DDoc, undefined)),
reset_procs(St1),
teach_ddoc_validate(St1, DDocId, DDoc),
St1#st{ddocs = DDocs#{DDocId => DDoc}}
catch
throw:{validate, Error} ->
Meta = #{sid => SId, db => DbName, ddoc => DDocId},
?WARN("ddoc validation failed ~p", [Error], Meta),
St1
end;
false ->
St
end.
views_validate(DDocId, #{?VIEWS := Views} = DDoc, {Db, #st{} = St0}) when
map_size(Views) > 0
->
St = reset_procs(St0),
#st{sid = SId, docs = Docs} = St,
try
lib_load(St, maps:get(?LIB, DDoc, undefined)),
ViewList = lists:sort(maps:to_list(Views)),
Fun = fun({Name, #{?MAP := Src}}) -> add_fun_load(St, Name, Src) end,
lists:foreach(Fun, ViewList),
{[_ | _], St1 = #st{}} = lists:foldl(fun mapred_fold/2, {ViewList, St}, Docs),
{Db, St1}
catch
throw:{validate, Error} ->
Meta = #{sid => SId, db => Db, ddoc => DDocId},
?WARN("view validation failed ~p", [Error], Meta),
{Db, St}
end;
views_validate(_DDocId, #{} = _DDoc, {Db, #st{} = St}) ->
% No views
{Db, St}.
mapred_fold({Props = [_ | _]} = Doc, {ViewList = [_ | _], #st{} = St}) ->
#st{qjs_proc = Qjs, sm_proc = Sm} = St,
DocId = couch_util:get_value(<<"_id">>, Props),
QjsMapRes = map_doc(Qjs, Doc),
SmMapRes = map_doc(Sm, Doc),
case QjsMapRes == SmMapRes of
true -> ok;
false -> throw({validate, {map_doc, DocId, QjsMapRes, SmMapRes}})
end,
case QjsMapRes of
[_ | _] ->
MapResZip = lists:zip(ViewList, QjsMapRes),
ReduceKVs = lists:filtermap(fun reduce_filter_map/1, MapResZip),
view_reduce_validate(St, ReduceKVs);
_ ->
ok
end,
{ViewList, St}.
reset_per_db_state(#st{qjs_proc = QjsProc, sm_proc = SmProc} = St) ->
proc_stop(QjsProc),
proc_stop(SmProc),
St#st{
ddocs = #{},
docs = [],
qjs_proc = undefined,
sm_proc = undefined,
ddoc_cnt = 0
}.
start_or_reset_procs(#st{} = St) ->
St1 = start_or_reset_qjs_proc(St),
start_or_reset_sm_proc(St1).
start_or_reset_qjs_proc(#st{qjs_proc = undefined} = St) ->
Cmd = couch_quickjs:mainjs_cmd(),
{ok, Pid} = couch_os_process:start_link(Cmd),
Proc = proc(Pid),
true = proc_reset(Proc),
St#st{qjs_proc = Proc};
start_or_reset_qjs_proc(#st{qjs_proc = #proc{} = Proc} = St) ->
try
true = proc_reset(Proc),
St
catch
Tag:Err ->
?WARN("failed to reset QuickJS proc ~p:~p", [Tag, Err]),
proc_stop(Proc),
start_or_reset_qjs_proc(St#st{qjs_proc = undefined})
end.
start_or_reset_sm_proc(#st{sm_proc = undefined} = St) ->
Cmd = os:getenv("COUCHDB_QUERY_SERVER_JAVASCRIPT"),
{ok, Pid} = couch_os_process:start_link(Cmd),
Proc = proc(Pid),
true = proc_reset(Proc),
St#st{sm_proc = Proc};
start_or_reset_sm_proc(#st{sm_proc = #proc{} = Proc} = St) ->
try
true = proc_reset(Proc),
St
catch
Tag:Err ->
?WARN("failed to reset Spidermonkey proc ~p:~p", [Tag, Err]),
proc_stop(Proc),
start_or_reset_sm_proc(St#st{sm_proc = undefined})
end.
lib_load(#st{}, undefined) ->
ok;
lib_load(#st{qjs_proc = Qjs, sm_proc = Sm}, #{} = Lib) ->
QjsRes = add_lib(Qjs, Lib),
SmRes = add_lib(Sm, Lib),
case QjsRes == SmRes of
true -> ok;
false -> throw({validate, {add_lib, QjsRes, SmRes}})
end.
views_load(#st{}, undefined) ->
ok;
views_load(#st{} = St, #{} = Views) ->
Fun = fun(Name, #{} = View) -> view_load(St, Name, View) end,
maps:foreach(Fun, Views).
view_load(#st{} = St, Name, View) ->
#{?MAP := MapSrc} = View,
add_fun_load(St, Name, MapSrc),
RedSrc = maps:get(?REDUCE, View, undefined),
add_fun_load(St, Name, RedSrc).
add_fun_load(#st{}, _, undefined) ->
ok;
add_fun_load(#st{qjs_proc = Qjs, sm_proc = Sm}, Name, Src) ->
QjsRes = add_fun(Qjs, Src),
SmRes = add_fun(Sm, Src),
case QjsRes == SmRes of
true -> ok;
false -> throw({validate, {add_fun, Name, QjsRes, SmRes}})
end.
reduce_filter_map({{_Name, #{?REDUCE := <<"_", _/binary>>}}, _KVs}) ->
% Likely built-in view
false;
reduce_filter_map({{Name, #{?REDUCE := <<_/binary>> = Src}}, [_ | _] = KVs}) ->
{true, {Name, Src, KVs}};
reduce_filter_map({{_Name, #{}}, _KVs}) ->
false.
view_reduce_validate(#st{} = St, ReduceKVs) ->
#st{qjs_proc = Qjs, sm_proc = Sm} = St,
RedFun = fun({Name, RedSrc, KVs}) ->
QjsRedRes = reduce(Qjs, RedSrc, KVs),
SmRedRes = reduce(Sm, RedSrc, KVs),
case QjsRedRes == SmRedRes of
true -> ok;
false -> throw({validate, {reduce, Name, QjsRedRes, SmRedRes}})
end,
case QjsRedRes of
[true, [_ | _] = RedVals] ->
QjsRRedRes = rereduce(Qjs, RedSrc, RedVals),
SmRRedRes = rereduce(Sm, RedSrc, RedVals),
case QjsRRedRes == SmRRedRes of
true -> ok;
false -> throw({validate, {rereduce, Name, QjsRRedRes, SmRRedRes}})
end;
_ ->
ok
end
end,
lists:foreach(RedFun, ReduceKVs).
filters_load(#st{}, undefined) ->
ok;
filters_load(#st{} = St, #{} = Filters) ->
Fun = fun(Name, Filter) -> filter_load(St, Name, Filter) end,
maps:foreach(Fun, Filters).
filter_load(#st{qjs_proc = Qjs, sm_proc = Sm}, Name, Filter) ->
QjsRes = add_fun(Qjs, Filter),
SmRes = add_fun(Sm, Filter),
case QjsRes == SmRes of
true -> ok;
false -> throw({validate, {filter, Name, QjsRes, SmRes}})
end.
filter_doc_validate(#st{}, _, undefined, _) ->
ok;
filter_doc_validate(#st{} = St, DDocId, #{} = Filters, Doc) ->
#st{qjs_proc = Qjs, sm_proc = Sm} = St,
Fun = fun(FName, _) ->
QjsRes = filter_doc(Qjs, DDocId, FName, Doc),
SmRes = filter_doc(Sm, DDocId, FName, Doc),
case QjsRes == SmRes of
true -> ok;
false -> throw({validate, {filter_doc, FName, QjsRes, SmRes}})
end
end,
maps:foreach(Fun, Filters).
vdu_load(#st{}, undefined) ->
ok;
vdu_load(#st{qjs_proc = Qjs, sm_proc = Sm}, VDU) ->
QjsRes = add_fun(Qjs, VDU),
SmRes = add_fun(Sm, VDU),
case QjsRes == SmRes of
true -> ok;
false -> throw({validate, {vdu, QjsRes, SmRes}})
end.
vdu_doc_validate(#st{} = St, DDocId, _VDU, Doc) ->
#st{qjs_proc = Qjs, sm_proc = Sm} = St,
QjsRes = vdu_doc(Qjs, DDocId, Doc),
SmRes = vdu_doc(Sm, DDocId, Doc),
case QjsRes == SmRes of
true -> ok;
false -> throw({validate, {vdu_doc, QjsRes, SmRes}})
end.
teach_ddoc_validate(#st{qjs_proc = Qjs, sm_proc = Sm}, DDocId, DDoc) ->
QjsRes = teach_ddoc(Qjs, DDocId, DDoc),
SmRes = teach_ddoc(Sm, DDocId, DDoc),
case QjsRes == SmRes of
true -> ok;
false -> throw({validate, {teach_ddoc, DDocId, QjsRes, SmRes}})
end.
reset_procs(#st{qjs_proc = #proc{} = QjsProc, sm_proc = #proc{} = SmProc} = St) ->
true = proc_reset(QjsProc),
true = proc_reset(SmProc),
St.
% Proc commands
add_lib(#proc{} = Proc, #{} = Lib) ->
true = prompt(Proc, [<<"add_lib">>, Lib]).
map_doc(#proc{} = Proc, {[_ | _]} = Doc) ->
prompt(Proc, [<<"map_doc">>, Doc]).
reduce(#proc{} = Proc, <<_/binary>> = Src, KVs) ->
prompt(Proc, [<<"reduce">>, [Src], KVs]).
rereduce(#proc{} = Proc, <<_/binary>> = Src, Vals) ->
prompt(Proc, [<<"rereduce">>, [Src], Vals]).
add_fun(#proc{}, undefined) ->
ok;
add_fun(#proc{}, <<"_", _/binary>>) ->
% Built-in reduce likely
ok;
add_fun(#proc{} = Proc, <<_/binary>> = FunSrc) ->
prompt(Proc, [<<"add_fun">>, FunSrc]).
filter_doc(#proc{} = Proc, DDocId, FName, {[_ | _]} = Doc) ->
prompt(Proc, [<<"ddoc">>, DDocId, [<<"filters">>, FName], [[Doc], #{}]]).
vdu_doc(#proc{} = Proc, DDocId, {[_ | _]} = Doc) ->
prompt(Proc, [<<"ddoc">>, DDocId, [<<"validate_doc_update">>], [Doc, Doc]]).
teach_ddoc(#proc{} = Proc, DDocId, DDoc) ->
true = prompt(Proc, [<<"ddoc">>, <<"new">>, DDocId, DDoc]).
proc_reset(#proc{} = Proc) ->
Timeout = config:get_integer("couchdb", "os_process_timeout", 5000),
Cfg = [{<<"reduce_limit">>, true}, {<<"timeout">>, Timeout}],
Result = prompt(Proc, [<<"reset">>, {Cfg}]),
proc_set_timeout(Proc, Timeout),
Result.
% End proc commands
% Proc utils
proc(Pid) ->
#proc{
pid = Pid,
prompt_fun = {couch_os_process, prompt},
set_timeout_fun = {couch_os_process, set_timeout},
stop_fun = {couch_os_process, stop}
}.
prompt(#proc{} = Proc, Prompt) ->
try
couch_query_servers:proc_prompt(Proc, Prompt)
catch
Tag:Err ->
{error, {Tag, Err}}
end.
proc_set_timeout(Proc, Timeout) ->
{Mod, Func} = Proc#proc.set_timeout_fun,
apply(Mod, Func, [Proc#proc.pid, Timeout]).
proc_stop(undefined) ->
ok;
proc_stop(#proc{pid = Pid} = Proc) ->
unlink(Pid),
Ref = monitor(process, Pid),
{Mod, Func} = Proc#proc.stop_fun,
apply(Mod, Func, [Pid]),
receive
{'DOWN', Ref, _, _, _} -> ok
end.