blob: 816a6a87130f244406e7eaf667e467afb1c2811c [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_views_reader).
-export([
read/7
]).
-include("couch_views.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("fabric/include/fabric2.hrl").
-define(LOAD_DOC_TIMEOUT_MSEC, 10000).
read(Db, Mrst, ViewName, UserCallback, UserAcc, Args, DbReadVsn) ->
ReadFun =
case Args of
#mrargs{view_type = map} -> fun read_map_view/7;
#mrargs{view_type = red} -> fun read_red_view/7
end,
ReadFun(Db, Mrst, ViewName, UserCallback, UserAcc, Args, DbReadVsn).
read_map_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args, DbReadVsn) ->
DocLoader = maybe_start_doc_loader(Db, DbReadVsn),
try
fabric2_fdb:transactional(Db, fun(TxDb) ->
#mrst{
language = Lang,
views = Views
} = Mrst = couch_views_trees:open(TxDb, Mrst0),
View = get_map_view(Lang, Args, ViewName, Views),
Fun = fun handle_map_row/4,
Meta = get_map_meta(TxDb, Mrst, View, Args),
UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
Acc0 = #{
db => TxDb,
skip => Args#mrargs.skip,
limit => Args#mrargs.limit,
mrargs => undefined,
callback => UserCallback,
acc => UserAcc1,
doc_loader => DocLoader
},
Acc1 = lists:foldl(
fun(KeyArgs, KeyAcc0) ->
Opts = mrargs_to_fdb_options(KeyArgs),
KeyAcc1 = KeyAcc0#{
mrargs := KeyArgs
},
couch_views_trees:fold_map_idx(TxDb, View, Opts, Fun, KeyAcc1)
end,
Acc0,
expand_keys_args(Args)
),
#{
acc := UserAcc2
} = Acc1,
{ok, maybe_stop(UserCallback(complete, UserAcc2))}
end)
catch
throw:{complete, Out} ->
{_, Final} = UserCallback(complete, Out),
{ok, Final};
throw:{done, Out} ->
{ok, Out}
after
stop_doc_loader(DocLoader)
end.
read_red_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args, _DbReadVsn) ->
#mrst{
language = Lang,
views = Views
} = Mrst0,
{Idx, Lang, View0} = get_red_view(Lang, Args, ViewName, Views),
Mrst1 = Mrst0#mrst{views = [View0]},
ReadOpts = [{read_only, Idx}],
try
fabric2_fdb:transactional(Db, fun(TxDb) ->
#mrst{
language = Lang,
views = [View1]
} = Mrst = couch_views_trees:open(TxDb, Mrst1, ReadOpts),
#mrargs{
extra = Extra
} = Args,
Fun = fun handle_red_row/3,
Meta = get_red_meta(TxDb, Mrst, View1, Args),
UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
Finalizer =
case couch_util:get_value(finalizer, Extra) of
undefined ->
{_, FunSrc} = lists:nth(Idx, View1#mrview.reduce_funs),
FunSrc;
CustomFun ->
CustomFun
end,
Acc0 = #{
db => TxDb,
skip => Args#mrargs.skip,
limit => Args#mrargs.limit,
mrargs => undefined,
finalizer => Finalizer,
red_idx => Idx,
language => Lang,
callback => UserCallback,
acc => UserAcc1
},
Acc1 = lists:foldl(
fun(KeyArgs, KeyAcc0) ->
Opts = mrargs_to_fdb_options(KeyArgs),
KeyAcc1 = KeyAcc0#{
mrargs := KeyArgs
},
couch_views_trees:fold_red_idx(
TxDb,
View1,
Idx,
Opts,
Fun,
KeyAcc1
)
end,
Acc0,
expand_keys_args(Args)
),
#{
acc := UserAcc2
} = Acc1,
{ok, maybe_stop(UserCallback(complete, UserAcc2))}
end)
catch
throw:{complete, Out} ->
{_, Final} = UserCallback(complete, Out),
{ok, Final};
throw:{done, Out} ->
{ok, Out}
end.
get_map_meta(TxDb, Mrst, View, #mrargs{update_seq = true}) ->
TotalRows = couch_views_trees:get_row_count(TxDb, View),
ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
{meta, [{update_seq, ViewSeq}, {total, TotalRows}, {offset, null}]};
get_map_meta(TxDb, _Mrst, View, #mrargs{}) ->
TotalRows = couch_views_trees:get_row_count(TxDb, View),
{meta, [{total, TotalRows}, {offset, null}]}.
get_red_meta(TxDb, Mrst, _View, #mrargs{update_seq = true}) ->
ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
{meta, [{update_seq, ViewSeq}]};
get_red_meta(_TxDb, _Mrst, _View, #mrargs{}) ->
{meta, []}.
handle_map_row(_DocId, _Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
Acc#{skip := Skip - 1};
handle_map_row(_DocID, _Key, _Value, #{limit := 0, acc := UserAcc}) ->
throw({complete, UserAcc});
handle_map_row(DocId, Key, Value, Acc) ->
#{
db := TxDb,
limit := Limit,
mrargs := Args,
callback := UserCallback,
acc := UserAcc0,
doc_loader := DocLoader
} = Acc,
BaseRow = [
{id, DocId},
{key, Key},
{value, Value}
],
Row =
BaseRow ++
if
not Args#mrargs.include_docs ->
[];
true ->
DocOpts0 = Args#mrargs.doc_options,
DocOpts1 =
DocOpts0 ++
case Args#mrargs.conflicts of
true -> [conflicts];
_ -> []
end,
{TargetDocId, Rev} = get_doc_id(DocId, Value),
DocObj = load_doc(TxDb, TargetDocId, Rev, DocOpts1, DocLoader),
[{doc, DocObj}]
end,
UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
Acc#{limit := Limit - 1, acc := UserAcc1}.
handle_red_row(_Key, _Red, #{skip := Skip} = Acc) when Skip > 0 ->
Acc#{skip := Skip - 1};
handle_red_row(_Key, _Value, #{limit := 0, acc := UserAcc}) ->
throw({complete, UserAcc});
handle_red_row(Key0, Value0, Acc) ->
#{
limit := Limit,
finalizer := Finalizer,
callback := UserCallback,
acc := UserAcc0
} = Acc,
Key1 =
case Key0 of
undefined -> null;
_ -> Key0
end,
Value1 = maybe_finalize(Finalizer, Value0),
Row = [{key, Key1}, {value, Value1}],
UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
Acc#{limit := Limit - 1, acc := UserAcc1}.
maybe_finalize(null, Red) ->
Red;
maybe_finalize(Finalizer, Red) ->
{ok, Finalized} = couch_query_servers:finalize(Finalizer, Red),
Finalized.
get_map_view(Lang, Args, ViewName, Views) ->
case couch_views_util:extract_view(Lang, Args, ViewName, Views) of
{map, View, _Args} -> View;
{red, {_Idx, _Lang, View}, _} -> View
end.
get_red_view(Lang, Args, ViewName, Views) ->
case couch_views_util:extract_view(Lang, Args, ViewName, Views) of
{red, {Idx, Lang, View}, _} -> check_red_enabled({Idx, Lang, View});
_ -> throw({not_found, missing_named_view})
end.
check_red_enabled({Idx, _Lang, View} = Resp) ->
case lists:nth(Idx, View#mrview.reduce_funs) of
{_, disabled} ->
throw({disabled, <<"Custom reduce functions are disabled.">>});
_ ->
Resp
end.
expand_keys_args(#mrargs{keys = undefined} = Args) ->
[Args];
expand_keys_args(#mrargs{keys = Keys} = Args) ->
lists:map(
fun(Key) ->
Args#mrargs{
start_key = Key,
end_key = Key
}
end,
Keys
).
mrargs_to_fdb_options(Args) ->
#mrargs{
view_type = ViewType,
start_key = StartKey,
start_key_docid = StartKeyDocId,
end_key = EndKey,
end_key_docid = EndKeyDocId0,
direction = Direction,
inclusive_end = InclusiveEnd,
group_level = GroupLevel
} = Args,
StartKeyOpts =
if
StartKey == undefined -> [];
true -> [{start_key, {StartKey, StartKeyDocId}}]
end,
EndKeyDocId =
case {Direction, EndKeyDocId0} of
{fwd, <<255>>} when InclusiveEnd -> <<255>>;
{fwd, <<255>>} when not InclusiveEnd -> <<>>;
{rev, <<>>} when InclusiveEnd -> <<>>;
{rev, <<>>} when not InclusiveEnd -> <<255>>;
_ -> EndKeyDocId0
end,
EndKeyOpts =
if
EndKey == undefined -> [];
true -> [{end_key, {EndKey, EndKeyDocId}}]
end,
GroupFunOpt = make_group_key_fun(ViewType, GroupLevel),
[
{dir, Direction},
{inclusive_end, InclusiveEnd}
] ++ StartKeyOpts ++ EndKeyOpts ++ GroupFunOpt.
make_group_key_fun(map, _) ->
[];
make_group_key_fun(red, exact) ->
[
{group_key_fun, fun({Key, _DocId}) -> Key end}
];
make_group_key_fun(red, 0) ->
[
{group_key_fun, group_all}
];
make_group_key_fun(red, N) when is_integer(N), N > 0 ->
GKFun = fun
({Key, _DocId}) when is_list(Key) -> lists:sublist(Key, N);
({Key, _DocId}) -> Key
end,
[{group_key_fun, GKFun}].
maybe_stop({ok, Acc}) -> Acc;
maybe_stop({stop, Acc}) -> throw({done, Acc}).
get_doc_id(Id, {Props}) ->
DocId = couch_util:get_value(<<"_id">>, Props, Id),
Rev = couch_util:get_value(<<"_rev">>, Props, null),
{DocId, Rev};
get_doc_id(Id, _Value) ->
{Id, null}.
load_doc(TxDb, Id, Rev, DocOpts, undefined) ->
load_doc(TxDb, Id, Rev, DocOpts);
load_doc(_TxDb, Id, Rev, DocOpts, DocLoader) when is_pid(DocLoader) ->
DocLoader ! {load_doc, Id, Rev, DocOpts},
receive
{load_doc_res, Result} -> Result
after ?LOAD_DOC_TIMEOUT_MSEC ->
error(load_doc_timeout)
end.
load_doc(TxDb, Id, null, DocOpts) ->
case fabric2_db:open_doc(TxDb, Id, DocOpts) of
{ok, Doc} -> couch_doc:to_json_obj(Doc, DocOpts);
{not_found, _} -> null
end;
load_doc(TxDb, Id, Rev, DocOpts) ->
Rev1 = couch_doc:parse_rev(Rev),
case fabric2_db:open_doc_revs(TxDb, Id, [Rev1], DocOpts) of
{ok, [{ok, Doc}]} -> couch_doc:to_json_obj(Doc, DocOpts);
{ok, [_Else]} -> null
end.
% When reading doc bodies at the db version at which the indexer
% observed them, need to use a separate process since the process dict
% is used to hold some of the transaction metadata.
%
maybe_start_doc_loader(_Db, ?VIEW_CURRENT_VSN) ->
undefined;
maybe_start_doc_loader(Db0, DbReadVsn) ->
Parent = self(),
Db = Db0#{tx := undefined},
spawn_link(fun() ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
erlfdb:set_read_version(maps:get(tx, TxDb), DbReadVsn),
doc_loader_loop(TxDb, Parent)
end)
end).
stop_doc_loader(undefined) ->
ok;
stop_doc_loader(Pid) when is_pid(Pid) ->
unlink(Pid),
exit(Pid, kill).
doc_loader_loop(TxDb, Parent) ->
receive
{load_doc, Id, Rev, DocOpts} ->
Parent ! {load_doc_res, load_doc(TxDb, Id, Rev, DocOpts)},
doc_loader_loop(TxDb, Parent)
end.