blob: eb909519b2e943a6eba6e3f6059366c96ecb6691 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(fabric_rpc).
-export([
get_db_info/1,
get_doc_count/1,
get_design_doc_count/1,
get_update_seq/1
]).
-export([
open_doc/3,
open_revs/4,
get_doc_info/3,
get_full_doc_info/3,
get_missing_revs/2, get_missing_revs/3,
update_docs/3
]).
-export([all_docs/3, changes/3, map_view/4, reduce_view/4, group_info/2]).
-export([
create_db/1, create_db/2,
delete_db/1,
reset_validation_funs/1,
set_security/3,
set_revs_limit/3,
create_shard_db_doc/2,
delete_shard_db_doc/2,
get_partition_info/2
]).
-export([get_all_security/2, open_shard/2]).
-export([compact/1, compact/2]).
-export([get_purge_seq/2, purge_docs/3, set_purge_infos_limit/3]).
-export([
get_db_info/2,
get_doc_count/2,
get_design_doc_count/2,
get_update_seq/2,
changes/4,
map_view/5,
reduce_view/5,
group_info/3,
update_mrview/4,
get_uuid/1
]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
%% rpc endpoints
%% call to with_db will supply your M:F with a Db instance
%% and then remaining args
%% @equiv changes(DbName, Args, StartSeq, [])
changes(DbName, Args, StartSeq) ->
changes(DbName, Args, StartSeq, []).
changes(DbName, #changes_args{} = Args, StartSeq, DbOptions) ->
changes(DbName, [Args], StartSeq, DbOptions);
changes(DbName, Options, StartVector, DbOptions) ->
set_io_priority(DbName, DbOptions),
Args0 = lists:keyfind(changes_args, 1, Options),
#changes_args{dir = Dir, filter_fun = Filter} = Args0,
Args =
case Filter of
{fetch, custom, Style, Req, {DDocId, Rev}, FName} ->
{ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev),
Args0#changes_args{
filter_fun = {custom, Style, Req, DDoc, FName}
};
{fetch, view, Style, {DDocId, Rev}, VName} ->
{ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev),
Args0#changes_args{filter_fun = {view, Style, DDoc, VName}};
_ ->
Args0
end,
DbOpenOptions = Args#changes_args.db_open_options ++ DbOptions,
case get_or_create_db(DbName, DbOpenOptions) of
{ok, Db} ->
StartSeq = calculate_start_seq(Db, node(), StartVector),
Enum = fun changes_enumerator/2,
Opts = [{dir, Dir}],
Acc0 = #fabric_changes_acc{
db = Db,
seq = StartSeq,
args = Args,
options = Options,
pending = couch_db:count_changes_since(Db, StartSeq),
epochs = couch_db:get_epochs(Db)
},
try
{ok, #fabric_changes_acc{seq = LastSeq, pending = Pending, epochs = Epochs}} =
do_changes(Db, StartSeq, Enum, Acc0, Opts),
rexi:stream_last(
{complete, [
{seq, {LastSeq, uuid(Db), couch_db:owner_of(Epochs, LastSeq)}},
{pending, Pending}
]}
)
after
couch_db:close(Db)
end;
Error ->
rexi:stream_last(Error)
end.
do_changes(Db, StartSeq, Enum, Acc0, Opts) ->
#fabric_changes_acc{
args = Args
} = Acc0,
#changes_args{
filter = Filter
} = Args,
case Filter of
"_doc_ids" ->
% optimised code path, we’re looking up all doc_ids in the by-id instead of filtering
% the entire by-seq tree to find the doc_ids one by one
#changes_args{
filter_fun = {doc_ids, Style, DocIds},
dir = Dir
} = Args,
couch_changes:send_changes_doc_ids(
Db, StartSeq, Dir, Enum, Acc0, {doc_ids, Style, DocIds}
);
"_design_docs" ->
% optimised code path, we’re looking up all design_docs in the by-id instead of
% filtering the entire by-seq tree to find the design_docs one by one
#changes_args{
filter_fun = {design_docs, Style},
dir = Dir
} = Args,
couch_changes:send_changes_design_docs(
Db, StartSeq, Dir, Enum, Acc0, {design_docs, Style}
);
_ ->
couch_db:fold_changes(Db, StartSeq, Enum, Acc0, Opts)
end.
all_docs(DbName, Options, Args0) ->
case fabric_util:upgrade_mrargs(Args0) of
#mrargs{keys = undefined} = Args ->
set_io_priority(DbName, Options),
{ok, Db} = get_or_create_db(DbName, Options),
CB = get_view_cb(Args),
couch_mrview:query_all_docs(Db, Args, CB, Args)
end.
update_mrview(DbName, {DDocId, Rev}, ViewName, Args0) ->
{ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev),
couch_util:with_db(DbName, fun(Db) ->
UpdateSeq = couch_db:get_update_seq(Db),
{ok, Pid, _} = couch_mrview:get_view_index_pid(
Db, DDoc, ViewName, fabric_util:upgrade_mrargs(Args0)
),
couch_index:get_state(Pid, UpdateSeq)
end).
%% @equiv map_view(DbName, DDoc, ViewName, Args0, [])
map_view(DbName, DDocInfo, ViewName, Args0) ->
map_view(DbName, DDocInfo, ViewName, Args0, []).
map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) ->
{ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev),
map_view(DbName, DDoc, ViewName, Args0, DbOptions);
map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
set_io_priority(DbName, DbOptions),
Args = fabric_util:upgrade_mrargs(Args0),
{ok, Db} = get_or_create_db(DbName, DbOptions),
CB = get_view_cb(Args),
couch_mrview:query_view(Db, DDoc, ViewName, Args, CB, Args).
%% @equiv reduce_view(DbName, DDoc, ViewName, Args0)
reduce_view(DbName, DDocInfo, ViewName, Args0) ->
reduce_view(DbName, DDocInfo, ViewName, Args0, []).
reduce_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) ->
{ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev),
reduce_view(DbName, DDoc, ViewName, Args0, DbOptions);
reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
set_io_priority(DbName, DbOptions),
Args = fabric_util:upgrade_mrargs(Args0),
{ok, Db} = get_or_create_db(DbName, DbOptions),
VAcc0 = #vacc{db = Db},
couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0).
create_db(DbName) ->
create_db(DbName, []).
create_db(DbName, Options) ->
rexi:reply(
case couch_server:create(DbName, Options) of
{ok, _} ->
ok;
Error ->
Error
end
).
create_shard_db_doc(_, Doc) ->
rexi:reply(mem3_util:write_db_doc(Doc)).
delete_db(DbName) ->
couch_server:delete(DbName, []).
delete_shard_db_doc(_, DocId) ->
rexi:reply(mem3_util:delete_db_doc(DocId)).
%% @equiv get_db_info(DbName, [])
get_db_info(DbName) ->
get_db_info(DbName, []).
get_db_info(DbName, DbOptions) ->
with_db(DbName, DbOptions, {couch_db, get_db_info, []}).
get_partition_info(DbName, Partition) ->
with_db(DbName, [], {couch_db, get_partition_info, [Partition]}).
%% equiv get_doc_count(DbName, [])
get_doc_count(DbName) ->
get_doc_count(DbName, []).
get_doc_count(DbName, DbOptions) ->
with_db(DbName, DbOptions, {couch_db, get_doc_count, []}).
%% equiv get_design_doc_count(DbName, [])
get_design_doc_count(DbName) ->
get_design_doc_count(DbName, []).
get_design_doc_count(DbName, DbOptions) ->
with_db(DbName, DbOptions, {couch_db, get_design_doc_count, []}).
%% equiv get_update_seq(DbName, [])
get_update_seq(DbName) ->
get_update_seq(DbName, []).
get_update_seq(DbName, DbOptions) ->
with_db(DbName, DbOptions, {couch_db, get_update_seq, []}).
set_security(DbName, SecObj, Options0) ->
Options =
case lists:keyfind(io_priority, 1, Options0) of
false ->
[{io_priority, {db_meta, security}} | Options0];
_ ->
Options0
end,
with_db(DbName, Options, {couch_db, set_security, [SecObj]}).
get_all_security(DbName, Options) ->
with_db(DbName, Options, {couch_db, get_security, []}).
set_revs_limit(DbName, Limit, Options) ->
with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}).
set_purge_infos_limit(DbName, Limit, Options) ->
with_db(DbName, Options, {couch_db, set_purge_infos_limit, [Limit]}).
open_doc(DbName, DocId, Options) ->
with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}).
open_revs(DbName, Id, Revs, Options) ->
with_db(DbName, Options, {couch_db, open_doc_revs, [Id, Revs, Options]}).
get_full_doc_info(DbName, DocId, Options) ->
with_db(DbName, Options, {couch_db, get_full_doc_info, [DocId]}).
get_doc_info(DbName, DocId, Options) ->
with_db(DbName, Options, {couch_db, get_doc_info, [DocId]}).
get_missing_revs(DbName, IdRevsList) ->
get_missing_revs(DbName, IdRevsList, []).
get_missing_revs(DbName, IdRevsList, Options) ->
with_db(DbName, Options, {couch_db, get_missing_revs, [IdRevsList]}).
update_docs(DbName, Docs0, Options) ->
{Docs1, Type} =
case couch_util:get_value(read_repair, Options) of
NodeRevs when is_list(NodeRevs) ->
Filtered = read_repair_filter(DbName, Docs0, NodeRevs, Options),
{Filtered, ?REPLICATED_CHANGES};
undefined ->
X =
case proplists:get_value(?REPLICATED_CHANGES, Options) of
true -> ?REPLICATED_CHANGES;
_ -> ?INTERACTIVE_EDIT
end,
{Docs0, X}
end,
Docs2 = make_att_readers(Docs1),
with_db(DbName, Options, {couch_db, update_docs, [Docs2, Options, Type]}).
get_purge_seq(DbName, Options) ->
with_db(DbName, Options, {couch_db, get_purge_seq, []}).
purge_docs(DbName, UUIdsIdsRevs, Options) ->
with_db(DbName, Options, {couch_db, purge_docs, [UUIdsIdsRevs, Options]}).
%% @equiv group_info(DbName, DDocId, [])
group_info(DbName, DDocId) ->
group_info(DbName, DDocId, []).
group_info(DbName, DDocId, DbOptions) ->
with_db(DbName, DbOptions, {couch_mrview, get_info, [DDocId]}).
reset_validation_funs(DbName) ->
case get_or_create_db(DbName, []) of
{ok, Db} ->
couch_db:reload_validation_funs(Db);
_ ->
ok
end.
open_shard(Name, Opts) ->
set_io_priority(Name, Opts),
try
rexi:reply(mem3_util:get_or_create_db(Name, Opts))
catch
exit:{timeout, _} ->
couch_stats:increment_counter([fabric, open_shard, timeouts])
end.
compact(DbName) ->
with_db(DbName, [], {couch_db, start_compact, []}).
compact(ShardName, DesignName) ->
{ok, Pid} = couch_index_server:get_index(
couch_mrview_index, ShardName, <<"_design/", DesignName/binary>>
),
Ref = erlang:make_ref(),
Pid ! {'$gen_call', {self(), Ref}, compact}.
get_uuid(DbName) ->
with_db(DbName, [], {couch_db, get_uuid, []}).
%%
%% internal
%%
with_db(DbName, Options, {M, F, A}) ->
set_io_priority(DbName, Options),
case get_or_create_db(DbName, Options) of
{ok, Db} ->
rexi:reply(
try
apply(M, F, [Db | A])
catch
Exception ->
Exception;
error:Reason:Stack ->
couch_log:error("rpc ~p:~p/~p ~p ~p", [
M,
F,
length(A) + 1,
Reason,
clean_stack(Stack)
]),
{error, Reason}
end
);
Error ->
rexi:reply(Error)
end.
read_repair_filter(DbName, Docs, NodeRevs, Options) ->
set_io_priority(DbName, Options),
case get_or_create_db(DbName, Options) of
{ok, Db} ->
try
read_repair_filter(Db, Docs, NodeRevs)
after
couch_db:close(Db)
end;
Error ->
rexi:reply(Error)
end.
% A read repair operation may have been triggered by a node
% that was out of sync with the local node. Thus, any time
% we receive a read repair request we need to check if we
% may have recently purged any of the given revisions and
% ignore them if so.
%
% This is accomplished by looking at the purge infos that we
% have locally that have not been replicated to the remote
% node. The logic here is that we may have received the purge
% request before the remote shard copy. So to check that we
% need to look at the purge infos that we have locally but
% have not yet sent to the remote copy.
%
% NodeRevs is a list of the {node(), [rev()]} tuples passed
% as the read_repair option to update_docs.
read_repair_filter(Db, Docs, NodeRevs) ->
[#doc{id = DocId} | _] = Docs,
NonLocalNodeRevs = [NR || {N, _} = NR <- NodeRevs, N /= node()],
Nodes = lists:usort([Node || {Node, _} <- NonLocalNodeRevs]),
NodeSeqs = get_node_seqs(Db, Nodes),
DbPSeq = couch_db:get_purge_seq(Db),
Lag = config:get_integer("couchdb", "read_repair_lag", 100),
% Filter out read-repair updates from any node that is
% so out of date that it would force us to scan a large
% number of purge infos
NodeFiltFun = fun({Node, _Revs}) ->
{Node, NodeSeq} = lists:keyfind(Node, 1, NodeSeqs),
NodeSeq >= DbPSeq - Lag
end,
RecentNodeRevs = lists:filter(NodeFiltFun, NonLocalNodeRevs),
% For each node we scan the purge infos to filter out any
% revisions that have been locally purged since we last
% replicated to the remote node's shard copy.
AllowableRevs = lists:foldl(
fun({Node, Revs}, RevAcc) ->
{Node, StartSeq} = lists:keyfind(Node, 1, NodeSeqs),
FoldFun = fun({_PSeq, _UUID, PDocId, PRevs}, InnerAcc) ->
if
PDocId /= DocId -> {ok, InnerAcc};
true -> {ok, InnerAcc -- PRevs}
end
end,
{ok, FiltRevs} = couch_db:fold_purge_infos(Db, StartSeq, FoldFun, Revs),
lists:usort(FiltRevs ++ RevAcc)
end,
[],
RecentNodeRevs
),
% Finally, filter the doc updates to only include revisions
% that have not been purged locally.
DocFiltFun = fun(#doc{revs = {Pos, [Rev | _]}}) ->
lists:member({Pos, Rev}, AllowableRevs)
end,
lists:filter(DocFiltFun, Docs).
get_node_seqs(Db, Nodes) ->
% Gather the list of {Node, PurgeSeq} pairs for all nodes
% that are present in our read repair group
FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) ->
case Id of
<<?LOCAL_DOC_PREFIX, "purge-mem3-", _/binary>> ->
TgtNode = couch_util:get_value(<<"target_node">>, Props),
PurgeSeq = couch_util:get_value(<<"purge_seq">>, Props),
case lists:keyfind(TgtNode, 1, Acc) of
{_, OldSeq} ->
NewSeq = erlang:max(OldSeq, PurgeSeq),
NewEntry = {TgtNode, NewSeq},
NewAcc = lists:keyreplace(TgtNode, 1, Acc, NewEntry),
{ok, NewAcc};
false ->
{ok, Acc}
end;
_ ->
% We've processed all _local mem3 purge docs
{stop, Acc}
end
end,
InitAcc = [{list_to_binary(atom_to_list(Node)), 0} || Node <- Nodes],
Opts = [{start_key, <<?LOCAL_DOC_PREFIX, "purge-mem3-">>}],
{ok, NodeBinSeqs} = couch_db:fold_local_docs(Db, FoldFun, InitAcc, Opts),
[{list_to_existing_atom(binary_to_list(N)), S} || {N, S} <- NodeBinSeqs].
get_or_create_db(DbName, Options) ->
mem3_util:get_or_create_db_int(DbName, Options).
get_view_cb(#mrargs{extra = Options}) ->
case couch_util:get_value(callback, Options) of
{Mod, Fun} when is_atom(Mod), is_atom(Fun) ->
fun Mod:Fun/2;
_ ->
fun view_cb/2
end;
get_view_cb(_) ->
fun view_cb/2.
view_cb({meta, Meta}, Acc) ->
% Map function starting
ok = rexi:stream2({meta, Meta}),
{ok, Acc};
view_cb({row, Row}, Acc) ->
% Adding another row
ViewRow = #view_row{
id = couch_util:get_value(id, Row),
key = couch_util:get_value(key, Row),
value = couch_util:get_value(value, Row),
doc = couch_util:get_value(doc, Row)
},
ok = rexi:stream2(ViewRow),
{ok, Acc};
view_cb(complete, Acc) ->
% Finish view output
ok = rexi:stream_last(complete),
{ok, Acc};
view_cb(ok, ddoc_updated) ->
rexi:reply({ok, ddoc_updated}).
reduce_cb({meta, Meta}, Acc) ->
% Map function starting
ok = rexi:stream2({meta, Meta}),
{ok, Acc};
reduce_cb({row, Row}, Acc) ->
% Adding another row
ok = rexi:stream2(#view_row{
key = couch_util:get_value(key, Row),
value = couch_util:get_value(value, Row)
}),
{ok, Acc};
reduce_cb(complete, Acc) ->
% Finish view output
ok = rexi:stream_last(complete),
{ok, Acc};
reduce_cb(ok, ddoc_updated) ->
rexi:reply({ok, ddoc_updated}).
changes_enumerator(#full_doc_info{} = FDI, Acc) ->
changes_enumerator(couch_doc:to_doc_info(FDI), Acc);
changes_enumerator(#doc_info{id = <<"_local/", _/binary>>, high_seq = Seq}, Acc) ->
{ok, Acc#fabric_changes_acc{seq = Seq, pending = Acc#fabric_changes_acc.pending - 1}};
changes_enumerator(DocInfo, Acc) ->
#fabric_changes_acc{
db = Db,
args = #changes_args{
include_docs = IncludeDocs,
conflicts = Conflicts,
filter_fun = Filter,
doc_options = DocOptions
},
pending = Pending,
epochs = Epochs
} = Acc,
#doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DocInfo,
case [X || X <- couch_changes:filter(Db, DocInfo, Filter), X /= null] of
[] ->
ChangesRow =
{no_pass, [
{pending, Pending - 1},
{seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}}
]};
Results ->
Opts =
if
Conflicts -> [conflicts | DocOptions];
true -> DocOptions
end,
ChangesRow =
{change, [
{pending, Pending - 1},
{seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}},
{id, Id},
{changes, Results},
{deleted, Del}
| if
IncludeDocs -> [doc_member(Db, DocInfo, Opts, Filter)];
true -> []
end
]}
end,
ok = rexi:stream2(ChangesRow),
{ok, Acc#fabric_changes_acc{seq = Seq, pending = Pending - 1}}.
doc_member(Shard, DocInfo, Opts, Filter) ->
case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of
{ok, Doc} ->
{doc, maybe_filtered_json_doc(Doc, Opts, Filter)};
Error ->
Error
end.
maybe_filtered_json_doc(Doc, Opts, {selector, _Style, {_Selector, Fields}}) when
Fields =/= nil
->
mango_fields:extract(couch_doc:to_json_obj(Doc, Opts), Fields);
maybe_filtered_json_doc(Doc, Opts, _Filter) ->
couch_doc:to_json_obj(Doc, Opts).
make_att_readers([]) ->
[];
make_att_readers([#doc{atts = Atts0} = Doc | Rest]) ->
% % go through the attachments looking for 'follows' in the data,
% % replace with function that reads the data from MIME stream.
Atts = [couch_att:transform(data, fun make_att_reader/1, Att) || Att <- Atts0],
[Doc#doc{atts = Atts} | make_att_readers(Rest)].
make_att_reader({follows, Parser, Ref}) when is_pid(Parser) ->
% This code will fail if the returned closure is called by a
% process other than the one that called make_att_reader/1 in the
% first place. The reason we don't put everything inside the
% closure is that the `hello_from_writer` message must *always* be
% sent to the parser, even if the closure never gets called. Also,
% make sure `hello_from_writer` is sent only once for the all the
% rest of the possible attachments.
WriterPid = self(),
ParserRef =
case get({mp_parser_ref, Parser}) of
undefined ->
% First time encountering a particular parser pid. Monitor it,
% in case it dies, and notify it about us, so it could monitor
% us in case we die.
PRef = erlang:monitor(process, Parser),
put({mp_parser_ref, Parser}, PRef),
Parser ! {hello_from_writer, Ref, WriterPid},
PRef;
Else ->
Else
end,
fun() ->
% Make sure the closure is always called from the same process which
% sent the hello_from_writer message.
case self() =:= WriterPid of
true -> ok;
false -> error({make_att_pid_assertion, self(), WriterPid})
end,
% Check if parser already died. This is for belt and suspenders mostly,
% in case somehow we call the data function again after mp_parser_died
% was thrown, so we are not stuck forever waiting for bytes.
case get({mp_parser_died, Parser}) of
undefined -> ok;
AlreadyDiedReason -> throw({mp_parser_died, AlreadyDiedReason})
end,
Parser ! {get_bytes, Ref, self()},
receive
{bytes, Ref, Bytes} ->
rexi:reply(attachment_chunk_received),
Bytes;
{'DOWN', ParserRef, _, _, Reason} ->
put({mp_parser_died, Parser}, Reason),
throw({mp_parser_died, Reason})
end
end;
make_att_reader({fabric_attachment_receiver, Middleman, Length}) ->
fabric_doc_atts:receiver_callback(Middleman, Length);
make_att_reader(Else) ->
Else.
clean_stack(S) ->
lists:map(
fun
({M, F, A}) when is_list(A) -> {M, F, length(A)};
(X) -> X
end,
S
).
set_io_priority(DbName, Options) ->
case lists:keyfind(io_priority, 1, Options) of
{io_priority, Pri} ->
erlang:put(io_priority, Pri);
false ->
erlang:put(io_priority, {interactive, DbName})
end,
case erlang:get(io_priority) of
{interactive, _} ->
case config:get("couchdb", "maintenance_mode", "false") of
"true" ->
% Done to silence error logging by rexi_server
rexi:reply({rexi_EXIT, {maintenance_mode, node()}}),
exit(normal);
_ ->
ok
end;
_ ->
ok
end.
calculate_start_seq(Db, Node, Seq) ->
case couch_db:calculate_start_seq(Db, Node, Seq) of
N when is_integer(N) ->
N;
{replace, OriginalNode, Uuid, OriginalSeq} ->
%% Scan history looking for an entry with
%% * target_node == TargetNode
%% * target_uuid == TargetUUID
%% * target_seq =< TargetSeq
%% If such an entry is found, stream from associated source_seq
mem3_rep:find_source_seq(Db, OriginalNode, Uuid, OriginalSeq)
end.
uuid(Db) ->
Uuid = couch_db:get_uuid(Db),
Prefix = fabric_util:get_uuid_prefix_len(),
binary:part(Uuid, {0, Prefix}).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
maybe_filtered_json_doc_no_filter_test() ->
Body = {[{<<"a">>, 1}]},
Doc = #doc{id = <<"1">>, revs = {1, [<<"r1">>]}, body = Body},
{JDocProps} = maybe_filtered_json_doc(Doc, [], x),
ExpectedProps = [{<<"_id">>, <<"1">>}, {<<"_rev">>, <<"1-r1">>}, {<<"a">>, 1}],
?assertEqual(lists:keysort(1, JDocProps), ExpectedProps).
maybe_filtered_json_doc_with_filter_test() ->
Body = {[{<<"a">>, 1}]},
Doc = #doc{id = <<"1">>, revs = {1, [<<"r1">>]}, body = Body},
Fields = [<<"a">>, <<"nonexistent">>],
Filter = {selector, main_only, {some_selector, Fields}},
{JDocProps} = maybe_filtered_json_doc(Doc, [], Filter),
?assertEqual(JDocProps, [{<<"a">>, 1}]).
-endif.