blob: 110ec14b36d6e6ddcaf35203db4a63bcf1ba351d [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_mrview_util).
-export([get_view/4]).
-export([ddoc_to_mrst/2, init_state/4, reset_index/3]).
-export([make_header/1]).
-export([index_file/2, compaction_file/2, open_file/1]).
-export([delete_files/2, delete_index_file/2, delete_compaction_file/2]).
-export([get_row_count/1, all_docs_reduce_to_count/1, reduce_to_count/1]).
-export([all_docs_key_opts/1, all_docs_key_opts/2, key_opts/1, key_opts/2]).
-export([fold/4, fold_reduce/4]).
-export([temp_view_to_ddoc/1]).
-export([active_size/1, external_size/1]).
-export([validate_args/1]).
-export([maybe_load_doc/3, maybe_load_doc/4]).
-define(MOD, couch_mrview_index).
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
get_view(Db, DDoc, ViewName, Args0) ->
ArgCheck = fun(InitState) ->
Args1 = set_view_type(Args0, ViewName, InitState#mrst.views),
{ok, validate_args(Args1)}
end,
{ok, Pid, Args2} = couch_index_server:get_index(?MOD, Db, DDoc, ArgCheck),
DbUpdateSeq = couch_util:with_db(Db, fun(WDb) ->
couch_db:get_update_seq(WDb)
end),
MinSeq = case Args2#mrargs.stale of
ok -> 0; update_after -> 0; _ -> DbUpdateSeq
end,
{ok, State} = case couch_index:get_state(Pid, MinSeq) of
{ok, _} = Resp -> Resp;
Error -> throw(Error)
end,
Ref = erlang:monitor(process, State#mrst.fd),
if Args2#mrargs.stale == update_after ->
spawn(fun() -> catch couch_index:get_state(Pid, DbUpdateSeq) end);
true -> ok
end,
#mrst{language=Lang, views=Views} = State,
{Type, View, Args3} = extract_view(Lang, Args2, ViewName, Views),
check_range(Args3, view_cmp(View)),
Sig = view_sig(Db, State, View, Args3),
{ok, {Type, View, Ref}, Sig, Args3}.
ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) ->
MakeDict = fun({Name, {MRFuns}}, DictBySrcAcc) ->
case couch_util:get_value(<<"map">>, MRFuns) of
MapSrc when is_binary(MapSrc) ->
RedSrc = couch_util:get_value(<<"reduce">>, MRFuns, null),
{ViewOpts} = couch_util:get_value(<<"options">>, MRFuns, {[]}),
View = case dict:find({MapSrc, ViewOpts}, DictBySrcAcc) of
{ok, View0} -> View0;
error -> #mrview{def=MapSrc, options=ViewOpts}
end,
{MapNames, RedSrcs} = case RedSrc of
null ->
MNames = [Name | View#mrview.map_names],
{MNames, View#mrview.reduce_funs};
_ ->
RedFuns = [{Name, RedSrc} | View#mrview.reduce_funs],
{View#mrview.map_names, RedFuns}
end,
View2 = View#mrview{map_names=MapNames, reduce_funs=RedSrcs},
dict:store({MapSrc, ViewOpts}, View2, DictBySrcAcc);
undefined ->
DictBySrcAcc
end;
({Name, Else}, DictBySrcAcc) ->
?LOG_ERROR("design_doc_to_view_group ~s views ~p", [Name, Else]),
DictBySrcAcc
end,
{RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}),
BySrc = lists:foldl(MakeDict, dict:new(), RawViews),
NumViews = fun({_, View}, N) -> {View#mrview{id_num=N}, N+1} end,
{Views, _} = lists:mapfoldl(NumViews, 0, lists:sort(dict:to_list(BySrc))),
Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>),
{DesignOpts} = couch_util:get_value(<<"options">>, Fields, {[]}),
{RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}),
Lib = couch_util:get_value(<<"lib">>, RawViews, {[]}),
IdxState = #mrst{
db_name=DbName,
idx_name=Id,
lib=Lib,
views=Views,
language=Language,
design_opts=DesignOpts
},
SigInfo = {Views, Language, DesignOpts, couch_index_util:sort_lib(Lib)},
{ok, IdxState#mrst{sig=couch_util:md5(term_to_binary(SigInfo))}}.
set_view_type(_Args, _ViewName, []) ->
throw({not_found, missing_named_view});
set_view_type(Args, ViewName, [View | Rest]) ->
RedNames = [N || {N, _} <- View#mrview.reduce_funs],
case lists:member(ViewName, RedNames) of
true ->
case Args#mrargs.reduce of
false -> Args#mrargs{view_type=map};
_ -> Args#mrargs{view_type=red}
end;
false ->
case lists:member(ViewName, View#mrview.map_names) of
true -> Args#mrargs{view_type=map};
false -> set_view_type(Args, ViewName, Rest)
end
end.
extract_view(_Lang, _Args, _ViewName, []) ->
throw({not_found, missing_named_view});
extract_view(Lang, #mrargs{view_type=map}=Args, Name, [View | Rest]) ->
Names = View#mrview.map_names ++ [N || {N, _} <- View#mrview.reduce_funs],
case lists:member(Name, Names) of
true -> {map, View, Args};
_ -> extract_view(Lang, Args, Name, Rest)
end;
extract_view(Lang, #mrargs{view_type=red}=Args, Name, [View | Rest]) ->
RedNames = [N || {N, _} <- View#mrview.reduce_funs],
case lists:member(Name, RedNames) of
true -> {red, {index_of(Name, RedNames), Lang, View}, Args};
false -> extract_view(Lang, Args, Name, Rest)
end.
view_sig(Db, State, View, #mrargs{include_docs=true}=Args) ->
BaseSig = view_sig(Db, State, View, Args#mrargs{include_docs=false}),
UpdateSeq = couch_db:get_update_seq(Db),
PurgeSeq = couch_db:get_purge_seq(Db),
Bin = term_to_binary({BaseSig, UpdateSeq, PurgeSeq}),
couch_index_util:hexsig(couch_util:md5(Bin));
view_sig(Db, State, {_Nth, _Lang, View}, Args) ->
view_sig(Db, State, View, Args);
view_sig(_Db, State, View, Args0) ->
Sig = State#mrst.sig,
UpdateSeq = View#mrview.update_seq,
PurgeSeq = View#mrview.purge_seq,
Args = Args0#mrargs{
preflight_fun=undefined,
extra=[]
},
Bin = term_to_binary({Sig, UpdateSeq, PurgeSeq, Args}),
couch_index_util:hexsig(couch_util:md5(Bin)).
init_state(Db, Fd, #mrst{views=Views}=State, nil) ->
Header = #mrheader{
seq=0,
purge_seq=couch_db:get_purge_seq(Db),
id_btree_state=nil,
view_states=[{nil, 0, 0} || _ <- Views]
},
init_state(Db, Fd, State, Header);
init_state(Db, Fd, State, Header) ->
#mrst{language=Lang, views=Views} = State,
#mrheader{
seq=Seq,
purge_seq=PurgeSeq,
id_btree_state=IdBtreeState,
view_states=ViewStates
} = Header,
StateUpdate = fun
({_, _, _}=St) -> St;
(St) -> {St, 0, 0}
end,
ViewStates2 = lists:map(StateUpdate, ViewStates),
IdReduce = fun
(reduce, KVs) -> length(KVs);
(rereduce, Reds) -> lists:sum(Reds)
end,
IdBtOpts = [{reduce, IdReduce}, {compression, couch_db:compression(Db)}],
{ok, IdBtree} = couch_btree:open(IdBtreeState, Fd, IdBtOpts),
OpenViewFun = fun(St, View) -> open_view(Db, Fd, Lang, St, View) end,
Views2 = lists:zipwith(OpenViewFun, ViewStates2, Views),
State#mrst{
fd=Fd,
fd_monitor=erlang:monitor(process, Fd),
update_seq=Seq,
purge_seq=PurgeSeq,
id_btree=IdBtree,
views=Views2
}.
open_view(Db, Fd, Lang, {BTState, USeq, PSeq}, View) ->
FunSrcs = [FunSrc || {_Name, FunSrc} <- View#mrview.reduce_funs],
ReduceFun =
fun(reduce, KVs) ->
KVs2 = detuple_kvs(expand_dups(KVs, []), []),
{ok, Result} = couch_query_servers:reduce(Lang, FunSrcs, KVs2),
{length(KVs2), Result, reduce_external_size(KVs2, Result)};
(rereduce, Reds) ->
Count = lists:sum(extract_reduction(Reds, counts)),
DataSize = lists:sum(extract_reduction(Reds, data_size)),
UsrReds = extract_reduction(Reds, user_reds),
{ok, Result} = couch_query_servers:rereduce(Lang, FunSrcs, UsrReds),
{Count, Result, DataSize + erlang:external_size(Result)}
end,
Less = case couch_util:get_value(<<"collation">>, View#mrview.options) of
<<"raw">> -> fun(A, B) -> A < B end;
_ -> fun couch_ejson_compare:less_json_ids/2
end,
ViewBtOpts = [
{less, Less},
{reduce, ReduceFun},
{compression, couch_db:compression(Db)}
],
{ok, Btree} = couch_btree:open(BTState, Fd, ViewBtOpts),
View#mrview{btree=Btree, update_seq=USeq, purge_seq=PSeq}.
reduce_external_size(KVList, Reduction) ->
InitSize = erlang:external_size(Reduction),
lists:foldl(fun([[Key, _], Value], Acc) ->
KSize = erlang:external_size(Key),
VSize = erlang:external_size(Value),
KSize + VSize + Acc
end, InitSize, KVList).
extract_reduction(Reds, counts) ->
[element(1, R) || R <- Reds];
extract_reduction(Reds, user_reds) ->
[element(2, R) || R <- Reds];
extract_reduction(Reds, data_size) ->
lists:map(fun({_, _}) -> 0; ({_, _, S}) -> S end, Reds).
temp_view_to_ddoc({Props}) ->
Language = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
Options = couch_util:get_value(<<"options">>, Props, {[]}),
View0 = [{<<"map">>, couch_util:get_value(<<"map">>, Props)}],
View1 = View0 ++ case couch_util:get_value(<<"reduce">>, Props) of
RedSrc when is_binary(RedSrc) -> [{<<"reduce">>, RedSrc}];
_ -> []
end,
DDoc = {[
{<<"_id">>, couch_uuids:random()},
{<<"language">>, Language},
{<<"options">>, Options},
{<<"views">>, {[
{<<"temp">>, {View1}}
]}}
]},
couch_doc:from_json_obj(DDoc).
get_row_count(#mrview{btree=Bt}) ->
{ok, Reds} = couch_btree:full_reduce(Bt),
{ok, element(1, Reds)}.
all_docs_reduce_to_count(Reductions) ->
Reduce = fun couch_db_updater:btree_by_id_reduce/2,
{Count, _, _} = couch_btree:final_reduce(Reduce, Reductions),
Count.
reduce_to_count(nil) ->
0;
reduce_to_count(Reductions) ->
Reduce = fun
(reduce, KVs) ->
Counts = [
case V of {dups, Vals} -> length(Vals); _ -> 1 end
|| {_,V} <- KVs
],
{lists:sum(Counts), []};
(rereduce, Reds) ->
{lists:sum([Count0 || {Count0, _} <- Reds]), []}
end,
{Count, _} = couch_btree:final_reduce(Reduce, Reductions),
Count.
fold(#mrview{btree=Bt}, Fun, Acc, Opts) ->
WrapperFun = fun(KV, Reds, Acc2) ->
fold_fun(Fun, expand_dups([KV], []), Reds, Acc2)
end,
{ok, _LastRed, _Acc} = couch_btree:fold(Bt, WrapperFun, Acc, Opts).
fold_fun(_Fun, [], _, Acc) ->
{ok, Acc};
fold_fun(Fun, [KV|Rest], {KVReds, Reds}, Acc) ->
case Fun(KV, {KVReds, Reds}, Acc) of
{ok, Acc2} ->
fold_fun(Fun, Rest, {[KV|KVReds], Reds}, Acc2);
{stop, Acc2} ->
{stop, Acc2}
end.
fold_reduce({NthRed, Lang, View}, Fun, Acc, Options) ->
#mrview{
btree=Bt,
reduce_funs=RedFuns
} = View,
LPad = lists:duplicate(NthRed - 1, []),
RPad = lists:duplicate(length(RedFuns) - NthRed, []),
{_Name, FunSrc} = lists:nth(NthRed,RedFuns),
ReduceFun = fun
(reduce, KVs0) ->
KVs1 = detuple_kvs(expand_dups(KVs0, []), []),
{ok, Red} = couch_query_servers:reduce(Lang, [FunSrc], KVs1),
{0, LPad ++ Red ++ RPad};
(rereduce, Reds) ->
ExtractRed = fun({_, UReds0}) -> [lists:nth(NthRed, UReds0)] end,
UReds = lists:map(ExtractRed, Reds),
{ok, Red} = couch_query_servers:rereduce(Lang, [FunSrc], UReds),
{0, LPad ++ Red ++ RPad}
end,
WrapperFun = fun({GroupedKey, _}, PartialReds, Acc0) ->
{_, Reds} = couch_btree:final_reduce(ReduceFun, PartialReds),
Fun(GroupedKey, lists:nth(NthRed, Reds), Acc0)
end,
couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options).
validate_args(Args) ->
Reduce = Args#mrargs.reduce,
case Reduce == undefined orelse is_boolean(Reduce) of
true -> ok;
_ -> mrverror(<<"Invalid `reduce` value.">>)
end,
case {Args#mrargs.view_type, Reduce} of
{map, true} -> mrverror(<<"Reduce is invalid for map-only views.">>);
_ -> ok
end,
case {Args#mrargs.view_type, Args#mrargs.group_level, Args#mrargs.keys} of
{red, exact, _} -> ok;
{red, _, KeyList} when is_list(KeyList) ->
Msg = <<"Multi-key fetchs for reduce views must use `group=true`">>,
mrverror(Msg);
_ -> ok
end,
case Args#mrargs.keys of
Keys when is_list(Keys) -> ok;
undefined -> ok;
_ -> mrverror(<<"`keys` must be an array of strings.">>)
end,
case {Args#mrargs.keys, Args#mrargs.start_key} of
{undefined, _} -> ok;
{[], _} -> ok;
{[_|_], undefined} -> ok;
_ -> mrverror(<<"`start_key` is incompatible with `keys`">>)
end,
case Args#mrargs.start_key_docid of
undefined -> ok;
SKDocId0 when is_binary(SKDocId0) -> ok;
_ -> mrverror(<<"`start_key_docid` must be a string.">>)
end,
case {Args#mrargs.keys, Args#mrargs.end_key} of
{undefined, _} -> ok;
{[], _} -> ok;
{[_|_], undefined} -> ok;
_ -> mrverror(<<"`end_key` is incompatible with `keys`">>)
end,
case Args#mrargs.end_key_docid of
undefined -> ok;
EKDocId0 when is_binary(EKDocId0) -> ok;
_ -> mrverror(<<"`end_key_docid` must be a string.">>)
end,
case Args#mrargs.direction of
fwd -> ok;
rev -> ok;
_ -> mrverror(<<"Invalid direction.">>)
end,
case {Args#mrargs.limit >= 0, Args#mrargs.limit == undefined} of
{true, _} -> ok;
{_, true} -> ok;
_ -> mrverror(<<"`limit` must be a positive integer.">>)
end,
case Args#mrargs.skip < 0 of
true -> mrverror(<<"`skip` must be >= 0">>);
_ -> ok
end,
case {Args#mrargs.view_type, Args#mrargs.group_level} of
{red, exact} -> ok;
{_, 0} -> ok;
{red, Int} when is_integer(Int), Int >= 0 -> ok;
{red, _} -> mrverror(<<"`group_level` must be >= 0">>);
{map, _} -> mrverror(<<"Invalid use of grouping on a map view.">>)
end,
case Args#mrargs.stale of
ok -> ok;
update_after -> ok;
false -> ok;
_ -> mrverror(<<"Invalid value for `stale`.">>)
end,
case is_boolean(Args#mrargs.inclusive_end) of
true -> ok;
_ -> mrverror(<<"Invalid value for `inclusive_end`.">>)
end,
case {Args#mrargs.view_type, Args#mrargs.include_docs} of
{red, true} -> mrverror(<<"`include_docs` is invalid for reduce">>);
{_, ID} when is_boolean(ID) -> ok;
_ -> mrverror(<<"Invalid value for `include_docs`">>)
end,
case {Args#mrargs.view_type, Args#mrargs.conflicts} of
{_, undefined} -> ok;
{map, V} when is_boolean(V) -> ok;
{red, undefined} -> ok;
{map, _} -> mrverror(<<"Invalid value for `conflicts`.">>);
{red, _} -> mrverror(<<"`conflicts` is invalid for reduce views.">>)
end,
SKDocId = case {Args#mrargs.direction, Args#mrargs.start_key_docid} of
{fwd, undefined} -> <<>>;
{rev, undefined} -> <<255>>;
{_, SKDocId1} -> SKDocId1
end,
EKDocId = case {Args#mrargs.direction, Args#mrargs.end_key_docid} of
{fwd, undefined} -> <<255>>;
{rev, undefined} -> <<>>;
{_, EKDocId1} -> EKDocId1
end,
Args#mrargs{
start_key_docid=SKDocId,
end_key_docid=EKDocId
}.
check_range(#mrargs{start_key=undefined}, _Cmp) ->
ok;
check_range(#mrargs{end_key=undefined}, _Cmp) ->
ok;
check_range(#mrargs{start_key=K, end_key=K}, _Cmp) ->
ok;
check_range(Args, Cmp) ->
#mrargs{
direction=Dir,
start_key=SK,
start_key_docid=SKD,
end_key=EK,
end_key_docid=EKD
} = Args,
case {Dir, Cmp({SK, SKD}, {EK, EKD})} of
{fwd, false} ->
throw({query_parse_error,
<<"No rows can match your key range, reverse your ",
"start_key and end_key or set descending=true">>});
{rev, true} ->
throw({query_parse_error,
<<"No rows can match your key range, reverse your ",
"start_key and end_key or set descending=false">>});
_ -> ok
end.
view_cmp({_Nth, _Lang, View}) ->
view_cmp(View);
view_cmp(View) ->
fun(A, B) -> couch_btree:less(View#mrview.btree, A, B) end.
make_header(State) ->
#mrst{
update_seq=Seq,
purge_seq=PurgeSeq,
id_btree=IdBtree,
views=Views
} = State,
ViewStates = [
{
couch_btree:get_state(V#mrview.btree),
V#mrview.update_seq,
V#mrview.purge_seq
}
||
V <- Views
],
#mrheader{
seq=Seq,
purge_seq=PurgeSeq,
id_btree_state=couch_btree:get_state(IdBtree),
view_states=ViewStates
}.
index_file(DbName, Sig) ->
FileName = couch_index_util:hexsig(Sig) ++ ".view",
couch_index_util:index_file(mrview, DbName, FileName).
compaction_file(DbName, Sig) ->
FileName = couch_index_util:hexsig(Sig) ++ ".compact.view",
couch_index_util:index_file(mrview, DbName, FileName).
open_file(FName) ->
case couch_file:open(FName) of
{ok, Fd} -> {ok, Fd};
{error, enoent} -> couch_file:open(FName, [create]);
Error -> Error
end.
delete_files(DbName, Sig) ->
delete_index_file(DbName, Sig),
delete_compaction_file(DbName, Sig).
delete_index_file(DbName, Sig) ->
delete_file(index_file(DbName, Sig)).
delete_compaction_file(DbName, Sig) ->
delete_file(compaction_file(DbName, Sig)).
delete_file(FName) ->
case filelib:is_file(FName) of
true ->
RootDir = couch_index_util:root_dir(),
couch_file:delete(RootDir, FName);
_ ->
ok
end.
reset_index(Db, Fd, #mrst{sig=Sig}=State) ->
ok = couch_file:truncate(Fd, 0),
ok = couch_file:write_header(Fd, {Sig, nil}),
init_state(Db, Fd, reset_state(State), nil).
reset_state(State) ->
State#mrst{
fd=nil,
qserver=nil,
update_seq=0,
id_btree=nil,
views=[View#mrview{btree=nil} || View <- State#mrst.views]
}.
all_docs_key_opts(Args) ->
all_docs_key_opts(Args, []).
all_docs_key_opts(#mrargs{keys=undefined}=Args, Extra) ->
all_docs_key_opts(Args#mrargs{keys=[]}, Extra);
all_docs_key_opts(#mrargs{keys=[], direction=Dir}=Args, Extra) ->
[[{dir, Dir}] ++ ad_skey_opts(Args) ++ ad_ekey_opts(Args) ++ Extra];
all_docs_key_opts(#mrargs{keys=Keys, direction=Dir}=Args, Extra) ->
lists:map(fun(K) ->
[{dir, Dir}]
++ ad_skey_opts(Args#mrargs{start_key=K})
++ ad_ekey_opts(Args#mrargs{end_key=K})
++ Extra
end, Keys).
ad_skey_opts(#mrargs{start_key=SKey}) when is_binary(SKey) ->
[{start_key, SKey}];
ad_skey_opts(#mrargs{start_key_docid=SKeyDocId}) ->
[{start_key, SKeyDocId}].
ad_ekey_opts(#mrargs{end_key=EKey}=Args) when is_binary(EKey) ->
Type = if Args#mrargs.inclusive_end -> end_key; true -> end_key_gt end,
[{Type, EKey}];
ad_ekey_opts(#mrargs{end_key_docid=EKeyDocId}=Args) ->
Type = if Args#mrargs.inclusive_end -> end_key; true -> end_key_gt end,
[{Type, EKeyDocId}].
key_opts(Args) ->
key_opts(Args, []).
key_opts(#mrargs{keys=undefined, direction=Dir}=Args, Extra) ->
[[{dir, Dir}] ++ skey_opts(Args) ++ ekey_opts(Args) ++ Extra];
key_opts(#mrargs{keys=Keys, direction=Dir}=Args, Extra) ->
lists:map(fun(K) ->
[{dir, Dir}]
++ skey_opts(Args#mrargs{start_key=K})
++ ekey_opts(Args#mrargs{end_key=K})
++ Extra
end, Keys).
skey_opts(#mrargs{start_key=undefined}) ->
[];
skey_opts(#mrargs{start_key=SKey, start_key_docid=SKeyDocId}) ->
[{start_key, {SKey, SKeyDocId}}].
ekey_opts(#mrargs{end_key=undefined}) ->
[];
ekey_opts(#mrargs{end_key=EKey, end_key_docid=EKeyDocId}=Args) ->
case Args#mrargs.inclusive_end of
true -> [{end_key, {EKey, EKeyDocId}}];
false -> [{end_key_gt, {EKey, reverse_key_default(EKeyDocId)}}]
end.
reverse_key_default(<<>>) -> <<255>>;
reverse_key_default(<<255>>) -> <<>>;
reverse_key_default(Key) -> Key.
active_size(#mrst{id_btree=IdBt, views=Views}) ->
Trees = [IdBt] ++ [Bt || #mrview{btree=Bt} <- Views],
lists:foldl(fun(T, Acc) ->
case couch_btree:size(T) of
_ when Acc == null -> null;
undefined -> null;
S -> Acc + S
end
end, 0, Trees).
external_size(#mrst{views=Views}) ->
lists:foldl(fun(#mrview{btree=Btree}, Acc) ->
{ok, {_, _, Size}} = couch_btree:full_reduce(Btree),
Size + Acc
end, 0, Views).
detuple_kvs([], Acc) ->
lists:reverse(Acc);
detuple_kvs([KV | Rest], Acc) ->
{{Key,Id},Value} = KV,
NKV = [[Key, Id], Value],
detuple_kvs(Rest, [NKV | Acc]).
expand_dups([], Acc) ->
lists:reverse(Acc);
expand_dups([{Key, {dups, Vals}} | Rest], Acc) ->
Expanded = [{Key, Val} || Val <- Vals],
expand_dups(Rest, Expanded ++ Acc);
expand_dups([KV | Rest], Acc) ->
expand_dups(Rest, [KV | Acc]).
maybe_load_doc(_Db, _DI, #mrargs{include_docs=false}) ->
[];
maybe_load_doc(Db, #doc_info{}=DI, #mrargs{conflicts=true}) ->
doc_row(couch_index_util:load_doc(Db, DI, [conflicts]));
maybe_load_doc(Db, #doc_info{}=DI, _Args) ->
doc_row(couch_index_util:load_doc(Db, DI, [])).
maybe_load_doc(_Db, _Id, _Val, #mrargs{include_docs=false}) ->
[];
maybe_load_doc(Db, Id, Val, #mrargs{conflicts=true}) ->
doc_row(couch_index_util:load_doc(Db, docid_rev(Id, Val), [conflicts]));
maybe_load_doc(Db, Id, Val, _Args) ->
doc_row(couch_index_util:load_doc(Db, docid_rev(Id, Val), [])).
doc_row(null) ->
[{doc, null}];
doc_row(Doc) ->
[{doc, couch_doc:to_json_obj(Doc, [])}].
docid_rev(Id, {Props}) ->
DocId = couch_util:get_value(<<"_id">>, Props, Id),
Rev = case couch_util:get_value(<<"_rev">>, Props, nil) of
nil -> nil;
Rev0 -> couch_doc:parse_rev(Rev0)
end,
{DocId, Rev};
docid_rev(Id, _) ->
{Id, nil}.
index_of(Key, List) ->
index_of(Key, List, 1).
index_of(_, [], _) ->
throw({error, missing_named_view});
index_of(Key, [Key | _], Idx) ->
Idx;
index_of(Key, [_ | Rest], Idx) ->
index_of(Key, Rest, Idx+1).
mrverror(Mesg) ->
throw({query_parse_error, Mesg}).