blob: cee3c5576d89ce3be8cf2f05011f093a51d19b59 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(fabric_rpc).
-export([get_db_info/1, get_doc_count/1, get_update_seq/1]).
-export([open_doc/3, open_revs/4, get_missing_revs/2, get_missing_revs/3,
update_docs/3]).
-export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]).
-export([create_db/1, delete_db/1, reset_validation_funs/1, set_security/3,
set_revs_limit/3, create_shard_db_doc/2, delete_shard_db_doc/2]).
-export([get_all_security/2]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
%% rpc endpoints
%% call to with_db will supply your M:F with a #db{} and then remaining args
all_docs(DbName, #mrargs{keys=undefined} = QueryArgs) ->
{ok, Db} = get_or_create_db(DbName, []),
#mrargs{
start_key = StartKey,
start_key_docid = StartDocId,
end_key = EndKey,
end_key_docid = EndDocId,
limit = Limit,
skip = Skip,
include_docs = IncludeDocs,
direction = Dir,
inclusive_end = Inclusive,
extra = Extra
} = QueryArgs,
set_io_priority(DbName, Extra),
{ok, Total} = couch_db:get_doc_count(Db),
Acc0 = #view_acc{
db = Db,
include_docs = IncludeDocs,
conflicts = proplists:get_value(conflicts, Extra, false),
limit = Limit+Skip,
total_rows = Total
},
EndKeyType = if Inclusive -> end_key; true -> end_key_gt end,
Options = [
{dir, Dir},
{start_key, if is_binary(StartKey) -> StartKey; true -> StartDocId end},
{EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end}
],
{ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options),
final_response(Total, Acc#view_acc.offset).
changes(DbName, #changes_args{} = Args, StartSeq) ->
changes(DbName, [Args], StartSeq);
changes(DbName, Options, StartSeq) ->
erlang:put(io_priority, {interactive, DbName}),
#changes_args{dir=Dir} = Args = lists:keyfind(changes_args, 1, Options),
case get_or_create_db(DbName, []) of
{ok, Db} ->
Enum = fun changes_enumerator/2,
Opts = [{dir,Dir}],
Acc0 = {Db, StartSeq, Args, Options},
try
{ok, {_, LastSeq, _, _}} =
couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0),
rexi:reply({complete, LastSeq})
after
couch_db:close(Db)
end;
Error ->
rexi:reply(Error)
end.
map_view(DbName, DDoc, ViewName, QueryArgs) ->
{ok, Db} = get_or_create_db(DbName, []),
#mrargs{
limit = Limit,
skip = Skip,
keys = Keys,
include_docs = IncludeDocs,
stale = Stale,
view_type = ViewType,
extra = Extra
} = QueryArgs,
set_io_priority(DbName, Extra),
{LastSeq, MinSeq} = calculate_seqs(Db, Stale),
Group0 = couch_view_group:design_doc_to_view_group(DDoc),
{ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
{ok, Group} = couch_view_group:request_group(Pid, MinSeq),
maybe_update_view_group(Pid, LastSeq, Stale),
erlang:monitor(process, couch_view_group:get_fd(Group)),
Views = couch_view_group:get_views(Group),
View = fabric_view:extract_view(Pid, ViewName, Views, ViewType),
{ok, Total} = couch_view:get_row_count(View),
Acc0 = #view_acc{
db = Db,
include_docs = IncludeDocs,
conflicts = proplists:get_value(conflicts, Extra, false),
limit = Limit+Skip,
total_rows = Total,
reduce_fun = fun couch_view:reduce_to_count/1
},
case Keys of
undefined ->
Options = couch_httpd_view:make_key_options(QueryArgs),
{ok, _, Acc} = couch_view:fold(View, fun view_fold/3, Acc0, Options);
_ ->
Acc = lists:foldl(fun(Key, AccIn) ->
KeyArgs = QueryArgs#mrargs{start_key=Key, end_key=Key},
Options = couch_httpd_view:make_key_options(KeyArgs),
{_Go, _, Out} = couch_view:fold(View, fun view_fold/3, AccIn,
Options),
Out
end, Acc0, Keys)
end,
final_response(Total, Acc#view_acc.offset).
reduce_view(DbName, #doc{} = DDoc, ViewName, QueryArgs) ->
Group = couch_view_group:design_doc_to_view_group(DDoc),
reduce_view(DbName, Group, ViewName, QueryArgs);
reduce_view(DbName, Group0, ViewName, QueryArgs) ->
erlang:put(io_priority, {interactive, DbName}),
{ok, Db} = get_or_create_db(DbName, []),
#mrargs{
group_level = GroupLevel,
limit = Limit,
skip = Skip,
keys = Keys,
stale = Stale,
extra = Extra
} = QueryArgs,
set_io_priority(DbName, Extra),
GroupFun = group_rows_fun(GroupLevel),
{LastSeq, MinSeq} = calculate_seqs(Db, Stale),
{ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
{ok, Group} = couch_view_group:request_group(Pid, MinSeq),
maybe_update_view_group(Pid, LastSeq, Stale),
Lang = couch_view_group:get_language(Group),
Views = couch_view_group:get_views(Group),
erlang:monitor(process, couch_view_group:get_fd(Group)),
{NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce),
ReduceView = {reduce, NthRed, Lang, View},
Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip},
case Keys of
undefined ->
Options0 = couch_httpd_view:make_key_options(QueryArgs),
Options = [{key_group_fun, GroupFun} | Options0],
couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options);
_ ->
lists:map(fun(Key) ->
KeyArgs = QueryArgs#mrargs{start_key=Key, end_key=Key},
Options0 = couch_httpd_view:make_key_options(KeyArgs),
Options = [{key_group_fun, GroupFun} | Options0],
couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options)
end, Keys)
end,
rexi:reply(complete).
calculate_seqs(Db, Stale) ->
LastSeq = couch_db:get_update_seq(Db),
if
Stale == ok orelse Stale == update_after ->
{LastSeq, 0};
true ->
{LastSeq, LastSeq}
end.
maybe_update_view_group(GroupPid, LastSeq, update_after) ->
couch_view_group:trigger_group_update(GroupPid, LastSeq);
maybe_update_view_group(_, _, _) ->
ok.
create_db(DbName) ->
rexi:reply(case couch_server:create(DbName, []) of
{ok, _} ->
ok;
Error ->
Error
end).
create_shard_db_doc(_, Doc) ->
rexi:reply(mem3_util:write_db_doc(Doc)).
delete_db(DbName) ->
couch_server:delete(DbName, []).
delete_shard_db_doc(_, DocId) ->
rexi:reply(mem3_util:delete_db_doc(DocId)).
get_db_info(DbName) ->
with_db(DbName, [], {couch_db, get_db_info, []}).
get_doc_count(DbName) ->
with_db(DbName, [], {couch_db, get_doc_count, []}).
get_update_seq(DbName) ->
with_db(DbName, [], {couch_db, get_update_seq, []}).
set_security(DbName, SecObj, Options) ->
with_db(DbName, Options, {couch_db, set_security, [SecObj]}).
get_all_security(DbName, Options) ->
with_db(DbName, Options, {couch_db, get_security, []}).
set_revs_limit(DbName, Limit, Options) ->
with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}).
open_doc(DbName, DocId, Options) ->
with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}).
open_revs(DbName, Id, Revs, Options) ->
with_db(DbName, Options, {couch_db, open_doc_revs, [Id, Revs, Options]}).
get_missing_revs(DbName, IdRevsList) ->
get_missing_revs(DbName, IdRevsList, []).
get_missing_revs(DbName, IdRevsList, Options) ->
% reimplement here so we get [] for Ids with no missing revs in response
set_io_priority(DbName, Options),
rexi:reply(case get_or_create_db(DbName, Options) of
{ok, Db} ->
Ids = [Id1 || {Id1, _Revs} <- IdRevsList],
{ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) ->
case FullDocInfoResult of
{ok, #full_doc_info{rev_tree=RevisionTree} = FullInfo} ->
MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs),
{Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)};
not_found ->
{Id, Revs, []}
end
end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))};
Error ->
Error
end).
update_docs(DbName, Docs0, Options) ->
case proplists:get_value(replicated_changes, Options) of
true ->
X = replicated_changes;
_ ->
X = interactive_edit
end,
Docs = make_att_readers(Docs0),
with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}).
group_info(DbName, Group0) ->
{ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
rexi:reply(couch_view_group:request_group_info(Pid)).
reset_validation_funs(DbName) ->
case get_or_create_db(DbName, []) of
{ok, #db{main_pid = Pid}} ->
gen_server:cast(Pid, {load_validation_funs, undefined});
_ ->
ok
end.
%%
%% internal
%%
with_db(DbName, Options, {M,F,A}) ->
set_io_priority(DbName, Options),
case get_or_create_db(DbName, Options) of
{ok, Db} ->
rexi:reply(try
apply(M, F, [Db | A])
catch Exception ->
Exception;
error:Reason ->
couch_log:error("rpc ~p:~p/~p ~p ~p", [M, F, length(A)+1, Reason,
clean_stack()]),
{error, Reason}
end);
Error ->
rexi:reply(Error)
end.
get_or_create_db(DbName, Options) ->
case couch_db:open_int(DbName, Options) of
{not_found, no_db_file} ->
couch_log:warning("~p creating ~s", [?MODULE, DbName]),
couch_server:create(DbName, Options);
Else ->
Else
end.
view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) ->
% matches for _all_docs and translates #full_doc_info{} -> KV pair
case couch_doc:to_doc_info(FullDocInfo) of
#doc_info{id=Id, revs=[#rev_info{deleted=false, rev=Rev}|_]} = DI ->
Value = {[{rev,couch_doc:rev_to_str(Rev)}]},
view_fold({{Id,Id}, Value}, OffsetReds, Acc#view_acc{doc_info=DI});
#doc_info{revs=[#rev_info{deleted=true}|_]} ->
{ok, Acc}
end;
view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) ->
% calculates the offset for this shard
#view_acc{reduce_fun=Reduce} = Acc,
Offset = Reduce(OffsetReds),
case rexi:sync_reply({total_and_offset, Total, Offset}) of
ok ->
view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset});
stop ->
exit(normal);
timeout ->
exit(timeout)
end;
view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) ->
% we scanned through limit+skip local rows
{stop, Acc};
view_fold({{Key,Id}, Value}, _Offset, Acc) ->
% the normal case
#view_acc{
db = Db,
doc_info = DocInfo,
limit = Limit,
conflicts = Conflicts,
include_docs = IncludeDocs
} = Acc,
case Value of {Props} ->
LinkedDocs = (couch_util:get_value(<<"_id">>, Props) =/= undefined);
_ ->
LinkedDocs = false
end,
if LinkedDocs ->
% we'll embed this at a higher level b/c the doc may be non-local
Doc = undefined;
IncludeDocs ->
IdOrInfo = if DocInfo =/= nil -> DocInfo; true -> Id end,
Options = if Conflicts -> [conflicts]; true -> [] end,
case couch_db:open_doc(Db, IdOrInfo, Options) of
{not_found, deleted} ->
Doc = null;
{not_found, missing} ->
Doc = undefined;
{ok, Doc0} ->
Doc = couch_doc:to_json_obj(Doc0, [])
end;
true ->
Doc = undefined
end,
case rexi:stream(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of
ok ->
{ok, Acc#view_acc{limit=Limit-1}};
timeout ->
exit(timeout)
end.
final_response(Total, nil) ->
case rexi:sync_reply({total_and_offset, Total, Total}) of ok ->
rexi:reply(complete);
stop ->
ok;
timeout ->
exit(timeout)
end;
final_response(_Total, _Offset) ->
rexi:reply(complete).
%% TODO: handle case of bogus group level
group_rows_fun(exact) ->
fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end;
group_rows_fun(0) ->
fun(_A, _B) -> true end;
group_rows_fun(GroupLevel) when is_integer(GroupLevel) ->
fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) ->
lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel);
({Key1,_}, {Key2,_}) ->
Key1 == Key2
end.
reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) ->
{stop, Acc};
reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) ->
send(null, Red, Acc);
reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) ->
send(Key, Red, Acc);
reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) ->
send(lists:sublist(K, I), Red, Acc);
reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0 ->
send(K, Red, Acc).
send(Key, Value, #view_acc{limit=Limit} = Acc) ->
case put(fabric_sent_first_row, true) of
undefined ->
case rexi:sync_reply(#view_row{key=Key, value=Value}) of
ok ->
{ok, Acc#view_acc{limit=Limit-1}};
stop ->
exit(normal);
timeout ->
exit(timeout)
end;
true ->
case rexi:stream(#view_row{key=Key, value=Value}) of
ok ->
{ok, Acc#view_acc{limit=Limit-1}};
timeout ->
exit(timeout)
end
end.
changes_enumerator(DocInfo, {Db, _Seq, Args, Options}) ->
#changes_args{
include_docs = IncludeDocs,
filter = Acc
} = Args,
Conflicts = proplists:get_value(conflicts, Options, false),
#doc_info{high_seq=Seq, revs=[#rev_info{deleted=Del}|_]} = DocInfo,
case [X || X <- couch_changes:filter(Db, DocInfo, Acc), X /= null] of
[] ->
{ok, {Db, Seq, Args, Options}};
Results ->
Opts = if Conflicts -> [conflicts]; true -> [] end,
ChangesRow = changes_row(Db, DocInfo, Results, Del, IncludeDocs, Opts),
Go = rexi:sync_reply(ChangesRow),
{Go, {Db, Seq, Args, Options}}
end.
changes_row(Db, #doc_info{id=Id, high_seq=Seq}=DI, Results, Del, true, Opts) ->
Doc = doc_member(Db, DI, Opts),
#change{key=Seq, id=Id, value=Results, doc=Doc, deleted=Del};
changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) ->
#change{key=Seq, id=Id, value=Results, deleted=true};
changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) ->
#change{key=Seq, id=Id, value=Results}.
doc_member(Shard, DocInfo, Opts) ->
case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of
{ok, Doc} ->
couch_doc:to_json_obj(Doc, []);
Error ->
Error
end.
possible_ancestors(_FullInfo, []) ->
[];
possible_ancestors(FullInfo, MissingRevs) ->
#doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo),
LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo],
% Find the revs that are possible parents of this rev
lists:foldl(fun({LeafPos, LeafRevId}, Acc) ->
% this leaf is a "possible ancenstor" of the missing
% revs if this LeafPos lessthan any of the missing revs
case lists:any(fun({MissingPos, _}) ->
LeafPos < MissingPos end, MissingRevs) of
true ->
[{LeafPos, LeafRevId} | Acc];
false ->
Acc
end
end, [], LeafRevs).
make_att_readers([]) ->
[];
make_att_readers([#doc{atts=Atts0} = Doc | Rest]) ->
% % go through the attachments looking for 'follows' in the data,
% % replace with function that reads the data from MIME stream.
Atts = [Att#att{data=make_att_reader(D)} || #att{data=D} = Att <- Atts0],
[Doc#doc{atts = Atts} | make_att_readers(Rest)].
make_att_reader({follows, Parser, Ref}) ->
fun() ->
ParserRef = case get(mp_parser_ref) of
undefined ->
PRef = erlang:monitor(process, Parser),
put(mp_parser_ref, PRef),
PRef;
Else ->
Else
end,
Parser ! {get_bytes, Ref, self()},
receive
{bytes, Ref, Bytes} ->
Bytes;
{'DOWN', ParserRef, _, _, Reason} ->
throw({mp_parser_died, Reason})
end
end;
make_att_reader(Else) ->
Else.
clean_stack() ->
lists:map(fun({M,F,A}) when is_list(A) -> {M,F,length(A)}; (X) -> X end,
erlang:get_stacktrace()).
set_io_priority(DbName, Options) ->
case lists:keyfind(io_priority, 1, Options) of
{io_priority, Pri} ->
erlang:put(io_priority, Pri);
false ->
erlang:put(io_priority, {interactive, DbName})
end.