blob: a310470ea48df9bb04d19acc98c6c34b713b1bf1 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(fabric2_db).
-export([
create/2,
open/2,
delete/2,
undelete/4,
list_dbs/0,
list_dbs/1,
list_dbs/3,
list_dbs_info/0,
list_dbs_info/1,
list_dbs_info/3,
list_deleted_dbs_info/0,
list_deleted_dbs_info/1,
list_deleted_dbs_info/3,
check_is_admin/1,
check_is_member/1,
name/1,
get_after_doc_read_fun/1,
get_before_doc_update_fun/1,
get_committed_update_seq/1,
get_compacted_seq/1,
get_compactor_pid/1,
get_db_info/1,
%% get_partition_info/2,
get_del_doc_count/1,
get_doc_count/1,
get_doc_count/2,
%% get_epochs/1,
%% get_filepath/1,
get_instance_start_time/1,
get_pid/1,
get_revs_limit/1,
get_revs_limit/2,
get_security/1,
get_security/2,
get_update_seq/1,
get_user_ctx/1,
get_uuid/1,
%% get_purge_seq/1,
%% get_oldest_purge_seq/1,
%% get_purge_infos_limit/1,
is_clustered/1,
is_db/1,
is_partitioned/1,
is_system_db/1,
is_system_db_name/1,
is_replicator_db/1,
is_users_db/1,
set_revs_limit/2,
%% set_purge_infos_limit/2,
set_security/2,
set_user_ctx/2,
ensure_full_commit/1,
ensure_full_commit/2,
%% load_validation_funs/1,
%% reload_validation_funs/1,
open_doc/2,
open_doc/3,
open_doc_revs/4,
%% open_doc_int/3,
get_doc_info/2,
get_full_doc_info/2,
get_full_doc_infos/2,
get_missing_revs/2,
get_design_docs/1,
%% get_purge_infos/2,
%% get_minimum_purge_seq/1,
%% purge_client_exists/3,
validate_docid/1,
%% doc_from_json_obj_validate/2,
update_doc/2,
update_doc/3,
update_docs/2,
update_docs/3,
%% delete_doc/3,
%% purge_docs/2,
%% purge_docs/3,
read_attachment/3,
write_attachment/3,
fold_docs/3,
fold_docs/4,
fold_docs/5,
fold_design_docs/4,
fold_local_docs/4,
fold_changes/4,
fold_changes/5,
%% count_changes_since/2,
%% fold_purge_infos/4,
%% fold_purge_infos/5,
%% calculate_start_seq/3,
%% owner_of/2,
%% start_compact/1,
%% cancel_compact/1,
%% wait_for_compaction/1,
%% wait_for_compaction/2,
dbname_suffix/1,
normalize_dbname/1,
validate_dbname/1,
%% make_doc/5,
new_revid/2,
apply_open_doc_opts/3
]).
-include_lib("couch/include/couch_db.hrl").
-include("fabric2.hrl").
% Default max database name length is based on CouchDb < 4.x compatibility. See
% default.ini entry for additional information.
-define(DEFAULT_MAX_DATABASE_NAME_LENGTH, 238).
-define(DBNAME_REGEX,
"^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*" % use the stock CouchDB regex
"(\\.[0-9]{10,})?$" % but allow an optional shard timestamp at the end
).
-define(FIRST_DDOC_KEY, <<"_design/">>).
-define(LAST_DDOC_KEY, <<"_design0">>).
-define(RETURN(Term), throw({?MODULE, Term})).
-define(DEFAULT_UPDATE_DOCS_BATCH_SIZE, 2500000).
-record(bacc, {
db,
docs,
batch_size,
options,
rev_futures,
seen,
results
}).
create(DbName, Options) ->
case validate_dbname(DbName) of
ok ->
Result = fabric2_fdb:transactional(DbName, Options, fun(TxDb) ->
case fabric2_fdb:exists(TxDb) of
true ->
{error, file_exists};
false ->
fabric2_fdb:create(TxDb, Options)
end
end),
% We cache outside of the transaction so that we're sure
% that the transaction was committed.
case Result of
#{} = Db0 ->
Db1 = maybe_add_sys_db_callbacks(Db0),
ok = fabric2_server:store(Db1),
fabric2_db_plugin:after_db_create(DbName, get_uuid(Db1)),
{ok, Db1#{tx := undefined}};
Error ->
Error
end;
Error ->
Error
end.
open(DbName, Options) ->
UUID = fabric2_util:get_value(uuid, Options),
case fabric2_server:fetch(DbName, UUID) of
#{} = Db ->
Db1 = maybe_set_user_ctx(Db, Options),
Db2 = maybe_set_interactive(Db1, Options),
{ok, require_member_check(Db2)};
undefined ->
Result = fabric2_fdb:transactional(DbName, Options, fun(TxDb) ->
fabric2_fdb:open(TxDb, Options)
end),
% Cache outside the transaction retry loop
case Result of
#{} = Db0 ->
Db1 = maybe_add_sys_db_callbacks(Db0),
ok = fabric2_server:store(Db1),
Db2 = Db1#{tx := undefined},
{ok, require_member_check(Db2)};
Error ->
Error
end
end.
delete(DbName, Options) ->
% Delete doesn't check user_ctx, that's done at the HTTP API level
% here we just care to get the `database_does_not_exist` error thrown
Options1 = lists:keystore(user_ctx, 1, Options, ?ADMIN_CTX),
case lists:keyfind(deleted_at, 1, Options1) of
{deleted_at, TimeStamp} ->
fabric2_fdb:transactional(DbName, Options1, fun(TxDb) ->
fabric2_fdb:remove_deleted_db(TxDb, TimeStamp)
end);
false ->
{ok, Db} = open(DbName, Options1),
Resp = fabric2_fdb:transactional(Db, fun(TxDb) ->
fabric2_fdb:delete(TxDb)
end),
if Resp /= ok -> Resp; true ->
fabric2_db_plugin:after_db_delete(DbName, get_uuid(Db)),
fabric2_server:remove(DbName)
end
end.
undelete(DbName, TgtDbName, TimeStamp, Options) ->
case validate_dbname(TgtDbName) of
ok ->
Resp = fabric2_fdb:transactional(DbName, Options,
fun(TxDb) ->
fabric2_fdb:undelete(TxDb, TgtDbName, TimeStamp)
end
),
if Resp /= ok -> ok; true ->
{ok, Db} = open(TgtDbName, Options),
fabric2_db_plugin:after_db_create(TgtDbName, get_uuid(Db))
end,
Resp;
Error ->
Error
end.
list_dbs() ->
list_dbs([]).
list_dbs(Options) ->
Callback = fun(DbName, Acc) -> [DbName | Acc] end,
DbNames = fabric2_fdb:transactional(fun(Tx) ->
fabric2_fdb:list_dbs(Tx, Callback, [], Options)
end),
lists:reverse(DbNames).
list_dbs(UserFun, UserAcc0, Options) ->
FoldFun = fun
(DbName, Acc) -> maybe_stop(UserFun({row, [{id, DbName}]}, Acc))
end,
fabric2_fdb:transactional(fun(Tx) ->
try
UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)),
UserAcc2 = fabric2_fdb:list_dbs(
Tx,
FoldFun,
UserAcc1,
Options
),
{ok, maybe_stop(UserFun(complete, UserAcc2))}
catch throw:{stop, FinalUserAcc} ->
{ok, FinalUserAcc}
end
end).
list_dbs_info() ->
list_dbs_info([]).
list_dbs_info(Options) ->
Callback = fun(Value, Acc) ->
NewAcc = case Value of
{meta, _} -> Acc;
{row, DbInfo} -> [DbInfo | Acc];
complete -> Acc
end,
{ok, NewAcc}
end,
{ok, DbInfos} = list_dbs_info(Callback, [], Options),
{ok, lists:reverse(DbInfos)}.
list_dbs_info(UserFun, UserAcc0, Options) ->
FoldFun = fun(DbName, InfoFuture, {FutureQ, Count, Acc}) ->
NewFutureQ = queue:in({DbName, InfoFuture}, FutureQ),
drain_info_futures(NewFutureQ, Count + 1, UserFun, Acc)
end,
fabric2_fdb:transactional(fun(Tx) ->
try
UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)),
InitAcc = {queue:new(), 0, UserAcc1},
{FinalFutureQ, _, UserAcc2} = fabric2_fdb:list_dbs_info(
Tx,
FoldFun,
InitAcc,
Options
),
UserAcc3 = drain_all_info_futures(FinalFutureQ, UserFun, UserAcc2),
{ok, maybe_stop(UserFun(complete, UserAcc3))}
catch throw:{stop, FinalUserAcc} ->
{ok, FinalUserAcc}
end
end).
list_deleted_dbs_info() ->
list_deleted_dbs_info([]).
list_deleted_dbs_info(Options) ->
Callback = fun(Value, Acc) ->
NewAcc = case Value of
{meta, _} -> Acc;
{row, DbInfo} -> [DbInfo | Acc];
complete -> Acc
end,
{ok, NewAcc}
end,
{ok, DbInfos} = list_deleted_dbs_info(Callback, [], Options),
{ok, lists:reverse(DbInfos)}.
list_deleted_dbs_info(UserFun, UserAcc0, Options0) ->
Dir = fabric2_util:get_value(dir, Options0, fwd),
StartKey0 = fabric2_util:get_value(start_key, Options0),
EndKey0 = fabric2_util:get_value(end_key, Options0),
{FirstBinary, LastBinary} = case Dir of
fwd -> {<<>>, <<255>>};
rev -> {<<255>>, <<>>}
end,
StartKey1 = case StartKey0 of
undefined ->
{FirstBinary};
DbName0 when is_binary(DbName0) ->
{DbName0, FirstBinary};
[DbName0, TimeStamp0] when is_binary(DbName0), is_binary(TimeStamp0) ->
{DbName0, TimeStamp0};
BadStartKey ->
erlang:error({invalid_start_key, BadStartKey})
end,
EndKey1 = case EndKey0 of
undefined ->
{LastBinary};
DbName1 when is_binary(DbName1) ->
{DbName1, LastBinary};
[DbName1, TimeStamp1] when is_binary(DbName1), is_binary(TimeStamp1) ->
{DbName1, TimeStamp1};
BadEndKey ->
erlang:error({invalid_end_key, BadEndKey})
end,
Options1 = Options0 -- [{start_key, StartKey0}, {end_key, EndKey0}],
Options2 = [
{start_key, StartKey1},
{end_key, EndKey1},
{wrap_keys, false}
] ++ Options1,
FoldFun = fun(DbName, TimeStamp, InfoFuture, {FutureQ, Count, Acc}) ->
NewFutureQ = queue:in({DbName, TimeStamp, InfoFuture}, FutureQ),
drain_deleted_info_futures(NewFutureQ, Count + 1, UserFun, Acc)
end,
fabric2_fdb:transactional(fun(Tx) ->
try
UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)),
InitAcc = {queue:new(), 0, UserAcc1},
{FinalFutureQ, _, UserAcc2} = fabric2_fdb:list_deleted_dbs_info(
Tx,
FoldFun,
InitAcc,
Options2
),
UserAcc3 = drain_all_deleted_info_futures(
FinalFutureQ,
UserFun,
UserAcc2
),
{ok, maybe_stop(UserFun(complete, UserAcc3))}
catch throw:{stop, FinalUserAcc} ->
{ok, FinalUserAcc}
end
end).
is_admin(Db, {SecProps}) when is_list(SecProps) ->
case fabric2_db_plugin:check_is_admin(Db) of
true ->
true;
false ->
UserCtx = get_user_ctx(Db),
{Admins} = get_admins(SecProps),
is_authorized(Admins, UserCtx)
end.
check_is_admin(Db) ->
check_is_admin(Db, get_security(Db)).
check_is_admin(Db, SecDoc) ->
case is_admin(Db, SecDoc) of
true ->
ok;
false ->
UserCtx = get_user_ctx(Db),
Reason = <<"You are not a db or server admin.">>,
throw_security_error(UserCtx, Reason)
end.
check_is_member(Db) ->
check_is_member(Db, get_security(Db)).
check_is_member(Db, SecDoc) ->
case is_member(Db, SecDoc) of
true ->
ok;
false ->
UserCtx = get_user_ctx(Db),
throw_security_error(UserCtx)
end.
require_admin_check(#{} = Db) ->
Db#{security_fun := fun check_is_admin/2}.
require_member_check(#{} = Db) ->
Db#{security_fun := fun check_is_member/2}.
name(#{name := DbName}) ->
DbName.
get_after_doc_read_fun(#{after_doc_read := AfterDocRead}) ->
AfterDocRead.
get_before_doc_update_fun(#{before_doc_update := BeforeDocUpdate}) ->
BeforeDocUpdate.
get_committed_update_seq(#{} = Db) ->
get_update_seq(Db).
get_compacted_seq(#{} = Db) ->
get_update_seq(Db).
get_compactor_pid(#{} = _Db) ->
nil.
get_db_info(#{} = Db) ->
DbProps = fabric2_fdb:transactional(Db, fun(TxDb) ->
fabric2_fdb:get_info(TxDb)
end),
{ok, make_db_info(name(Db), DbProps)}.
get_del_doc_count(#{} = Db) ->
get_doc_count(Db, <<"doc_del_count">>).
get_doc_count(Db) ->
get_doc_count(Db, <<"doc_count">>).
get_doc_count(Db, undefined) ->
get_doc_count(Db, <<"doc_count">>);
get_doc_count(Db, <<"_all_docs">>) ->
get_doc_count(Db, <<"doc_count">>);
get_doc_count(DbName, <<"_design">>) ->
get_doc_count(DbName, <<"doc_design_count">>);
get_doc_count(DbName, <<"_local">>) ->
get_doc_count(DbName, <<"doc_local_count">>);
get_doc_count(Db, Key) ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
fabric2_fdb:get_stat(TxDb, Key)
end).
get_instance_start_time(#{}) ->
0.
get_pid(#{}) ->
nil.
get_revs_limit(#{} = Db) ->
get_revs_limit(Db, []).
get_revs_limit(#{} = Db, Opts) ->
CurrentDb = get_cached_db(Db, Opts),
maps:get(revs_limit, CurrentDb).
get_security(#{} = Db) ->
get_security(Db, []).
get_security(#{} = Db, Opts) ->
CurrentDb = get_cached_db(Db, Opts),
maps:get(security_doc, CurrentDb).
get_update_seq(#{} = Db) ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
fabric2_fdb:get_last_change(TxDb)
end).
get_user_ctx(#{user_ctx := UserCtx}) ->
UserCtx.
get_uuid(#{uuid := UUID}) ->
UUID.
is_clustered(#{}) ->
false.
is_db(#{name := _}) ->
true;
is_db(_) ->
false.
is_partitioned(#{}) ->
false.
is_system_db(#{name := DbName}) ->
is_system_db_name(DbName).
is_system_db_name(DbName) when is_list(DbName) ->
is_system_db_name(?l2b(DbName));
is_system_db_name(DbName) when is_binary(DbName) ->
Suffix = filename:basename(DbName),
case {filename:dirname(DbName), lists:member(Suffix, ?SYSTEM_DATABASES)} of
{<<".">>, Result} -> Result;
{_Prefix, false} -> false;
{Prefix, true} ->
ReOpts = [{capture,none}, dollar_endonly],
re:run(Prefix, ?DBNAME_REGEX, ReOpts) == match
end.
is_replicator_db(#{name := DbName}) ->
is_replicator_db(DbName);
is_replicator_db(DbName) when is_binary(DbName) ->
fabric2_util:dbname_ends_with(DbName, <<"_replicator">>).
is_users_db(#{name := DbName}) ->
is_users_db(DbName);
is_users_db(DbName) when is_binary(DbName) ->
AuthenticationDb = config:get("chttpd_auth", "authentication_db"),
CfgUsersSuffix = config:get("couchdb", "users_db_suffix", "_users"),
IsAuthCache = if AuthenticationDb == undefined -> false; true ->
DbName == ?l2b(AuthenticationDb)
end,
IsCfgUsersDb = fabric2_util:dbname_ends_with(DbName, ?l2b(CfgUsersSuffix)),
IsGlobalUsersDb = fabric2_util:dbname_ends_with(DbName, <<"_users">>),
IsAuthCache orelse IsCfgUsersDb orelse IsGlobalUsersDb.
set_revs_limit(#{} = Db0, RevsLimit) when is_integer(RevsLimit) ->
Db1 = require_admin_check(Db0),
Resp = fabric2_fdb:transactional(Db1, fun(TxDb) ->
fabric2_fdb:set_config(TxDb, revs_limit, RevsLimit)
end),
case Resp of
{ok, #{} = Db2} -> fabric2_server:store(Db2);
Err -> Err
end.
set_security(#{} = Db0, Security) ->
Db1 = require_admin_check(Db0),
ok = fabric2_util:validate_security_object(Security),
Resp = fabric2_fdb:transactional(Db1, fun(TxDb) ->
fabric2_fdb:set_config(TxDb, security_doc, Security)
end),
case Resp of
{ok, #{} = Db2} -> fabric2_server:store(Db2);
Err -> Err
end.
set_user_ctx(#{} = Db, UserCtx) ->
Db#{user_ctx := UserCtx}.
ensure_full_commit(#{}) ->
{ok, 0}.
ensure_full_commit(#{}, _Timeout) ->
{ok, 0}.
open_doc(#{} = Db, DocId) ->
open_doc(Db, DocId, []).
open_doc(#{} = Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, _Options) ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
case fabric2_fdb:get_local_doc(TxDb, DocId) of
#doc{} = Doc -> {ok, Doc};
Else -> Else
end
end);
open_doc(#{} = Db, DocId, Options) ->
NeedsTreeOpts = [revs_info, conflicts, deleted_conflicts],
NeedsTree = (Options -- NeedsTreeOpts /= Options),
OpenDeleted = lists:member(deleted, Options),
fabric2_fdb:transactional(Db, fun(TxDb) ->
Revs = case NeedsTree of
true -> fabric2_fdb:get_all_revs(TxDb, DocId);
false -> fabric2_fdb:get_winning_revs(TxDb, DocId, 1)
end,
if Revs == [] -> {not_found, missing}; true ->
#{winner := true} = RI = lists:last(Revs),
case fabric2_fdb:get_doc_body(TxDb, DocId, RI) of
#doc{deleted = true} when not OpenDeleted ->
{not_found, deleted};
#doc{} = Doc ->
apply_open_doc_opts(Doc, Revs, Options);
Else ->
Else
end
end
end).
open_doc_revs(Db, DocId, Revs, Options) ->
Latest = lists:member(latest, Options),
fabric2_fdb:transactional(Db, fun(TxDb) ->
AllRevInfos = fabric2_fdb:get_all_revs(TxDb, DocId),
RevTree = lists:foldl(fun(RI, TreeAcc) ->
RIPath = fabric2_util:revinfo_to_path(RI),
{Merged, _} = couch_key_tree:merge(TreeAcc, RIPath),
Merged
end, [], AllRevInfos),
{Found, Missing} = case Revs of
all ->
{couch_key_tree:get_all_leafs(RevTree), []};
_ when Latest ->
couch_key_tree:get_key_leafs(RevTree, Revs);
_ ->
couch_key_tree:get(RevTree, Revs)
end,
Docs = lists:map(fun({Value, {Pos, [Rev | RevPath]}}) ->
case Value of
?REV_MISSING ->
% We have the rev in our list but know nothing about it
{{not_found, missing}, {Pos, Rev}};
_ ->
RevInfo = #{
rev_id => {Pos, Rev},
rev_path => RevPath
},
case fabric2_fdb:get_doc_body(TxDb, DocId, RevInfo) of
#doc{} = Doc ->
apply_open_doc_opts(Doc, AllRevInfos, Options);
Else ->
{Else, {Pos, Rev}}
end
end
end, Found),
MissingDocs = [{{not_found, missing}, MRev} || MRev <- Missing],
{ok, Docs ++ MissingDocs}
end).
get_doc_info(Db, DocId) ->
case get_full_doc_info(Db, DocId) of
not_found -> not_found;
FDI -> couch_doc:to_doc_info(FDI)
end.
get_full_doc_info(Db, DocId) ->
RevInfos = fabric2_fdb:transactional(Db, fun(TxDb) ->
fabric2_fdb:get_all_revs(TxDb, DocId)
end),
if RevInfos == [] -> not_found; true ->
#{winner := true} = Winner = lists:last(RevInfos),
RevTree = lists:foldl(fun(RI, TreeAcc) ->
RIPath = fabric2_util:revinfo_to_path(RI),
{Merged, _} = couch_key_tree:merge(TreeAcc, RIPath),
Merged
end, [], RevInfos),
#full_doc_info{
id = DocId,
update_seq = fabric2_fdb:vs_to_seq(maps:get(sequence, Winner)),
deleted = maps:get(deleted, Winner),
rev_tree = RevTree
}
end.
get_full_doc_infos(Db, DocIds) ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
lists:map(fun(DocId) ->
get_full_doc_info(TxDb, DocId)
end, DocIds)
end).
get_missing_revs(Db, JsonIdRevs) ->
IdRevs = [idrevs(IdR) || IdR <- JsonIdRevs],
AllRevInfos = fabric2_fdb:transactional(Db, fun(TxDb) ->
lists:foldl(fun({Id, _Revs}, Acc) ->
case maps:is_key(Id, Acc) of
true ->
Acc;
false ->
RevInfos = fabric2_fdb:get_all_revs(TxDb, Id),
Acc#{Id => RevInfos}
end
end, #{}, IdRevs)
end),
AllMissing = lists:flatmap(fun({Id, Revs}) ->
#{Id := RevInfos} = AllRevInfos,
Missing = try
lists:foldl(fun(RevInfo, RevAcc) ->
if RevAcc /= [] -> ok; true ->
throw(all_found)
end,
filter_found_revs(RevInfo, RevAcc)
end, Revs, RevInfos)
catch throw:all_found ->
[]
end,
if Missing == [] -> []; true ->
PossibleAncestors = find_possible_ancestors(RevInfos, Missing),
[{Id, Missing, PossibleAncestors}]
end
end, IdRevs),
{ok, AllMissing}.
get_design_docs(Db) ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
#{
db_prefix := DbPrefix
} = TxDb,
Prefix = erlfdb_tuple:pack({?DB_ALL_DOCS}, DbPrefix),
Options = set_design_doc_keys([]),
FoldFun = fun({Key, Val}, Acc) ->
{DocId} = erlfdb_tuple:unpack(Key, Prefix),
RevId = erlfdb_tuple:unpack(Val),
Rev = #{
rev_id => RevId,
rev_path => []
},
Future = fabric2_fdb:get_doc_body_future(TxDb, DocId, Rev),
[{DocId, Rev, Future} | Acc]
end,
Futures = fabric2_fdb:fold_range(TxDb, Prefix, FoldFun, [], Options),
% Using foldl instead of map means that the design
% docs come out in sorted order.
lists:foldl(fun({DocId, Rev, Future}, Acc) ->
[fabric2_fdb:get_doc_body_wait(TxDb, DocId, Rev, Future) | Acc]
end, [], Futures)
end).
validate_docid(<<"">>) ->
throw({illegal_docid, <<"Document id must not be empty">>});
validate_docid(<<"_design/">>) ->
throw({illegal_docid, <<"Illegal document id `_design/`">>});
validate_docid(<<"_local/">>) ->
throw({illegal_docid, <<"Illegal document id `_local/`">>});
validate_docid(Id) when is_binary(Id) ->
MaxLen = case config:get("couchdb", "max_document_id_length", "infinity") of
"infinity" -> infinity;
IntegerVal -> list_to_integer(IntegerVal)
end,
case MaxLen > 0 andalso byte_size(Id) > MaxLen of
true -> throw({illegal_docid, <<"Document id is too long">>});
false -> ok
end,
case couch_util:validate_utf8(Id) of
false -> throw({illegal_docid, <<"Document id must be valid UTF-8">>});
true -> ok
end,
case Id of
<<?DESIGN_DOC_PREFIX, _/binary>> -> ok;
<<?LOCAL_DOC_PREFIX, _/binary>> -> ok;
<<"_", _/binary>> ->
case fabric2_db_plugin:validate_docid(Id) of
true ->
ok;
false ->
throw(
{illegal_docid,
<<"Only reserved document ids may start with underscore.">>})
end;
_Else -> ok
end;
validate_docid(Id) ->
couch_log:debug("Document id is not a string: ~p", [Id]),
throw({illegal_docid, <<"Document id must be a string">>}).
update_doc(Db, Doc) ->
update_doc(Db, Doc, []).
update_doc(Db, Doc, Options) ->
case update_docs(Db, [Doc], Options) of
{ok, [{ok, NewRev}]} ->
{ok, NewRev};
{ok, [{{_Id, _Rev}, Error}]} ->
throw(Error);
{error, [{{_Id, _Rev}, Error}]} ->
throw(Error);
{error, [Error]} ->
throw(Error);
{ok, []} ->
% replication success
{Pos, [RevId | _]} = Doc#doc.revs,
{ok, {Pos, RevId}}
end.
update_docs(Db, Docs) ->
update_docs(Db, Docs, []).
update_docs(Db, Docs0, Options) ->
Docs1 = apply_before_doc_update(Db, Docs0, Options),
try
validate_atomic_update(Docs0, lists:member(all_or_nothing, Options)),
Resps0 = batch_update_docs(Db, Docs1, Options),
% Notify index builder
fabric2_index:db_updated(name(Db)),
% Convert errors
Resps1 = lists:map(fun(Resp) ->
case Resp of
{#doc{} = Doc, Error} ->
#doc{
id = DocId,
revs = Revs
} = Doc,
RevId = case Revs of
{RevPos, [Rev | _]} -> {RevPos, Rev};
{0, []} -> {0, <<>>};
Else -> Else
end,
{{DocId, RevId}, Error};
Else ->
Else
end
end, Resps0),
case is_replicated(Options) of
true ->
{ok, lists:flatmap(fun(R) ->
case R of
{ok, []} -> [];
{{_, _}, {ok, []}} -> [];
Else -> [Else]
end
end, Resps1)};
false ->
{ok, Resps1}
end
catch throw:{aborted, Errors} ->
{aborted, Errors}
end.
read_attachment(Db, DocId, AttId) ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
fabric2_fdb:read_attachment(TxDb, DocId, AttId)
end).
write_attachment(Db, DocId, Att) ->
Data = couch_att:fetch(data, Att),
Encoding = couch_att:fetch(encoding, Att),
{ok, AttId} = fabric2_fdb:write_attachment(Db, DocId, Data, Encoding),
couch_att:store(data, {loc, Db, DocId, AttId}, Att).
fold_docs(Db, UserFun, UserAcc) ->
fold_docs(Db, UserFun, UserAcc, []).
fold_docs(Db, UserFun, UserAcc0, Options) ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
try
#{
db_prefix := DbPrefix
} = TxDb,
Prefix = erlfdb_tuple:pack({?DB_ALL_DOCS}, DbPrefix),
Meta = get_all_docs_meta(TxDb, Options),
UserAcc1 = maybe_stop(UserFun({meta, Meta}, UserAcc0)),
UserAcc2 = fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) ->
{DocId} = erlfdb_tuple:unpack(K, Prefix),
RevId = erlfdb_tuple:unpack(V),
Row0 = [
{id, DocId},
{key, DocId},
{value, {[{rev, couch_doc:rev_to_str(RevId)}]}}
],
DocOpts = couch_util:get_value(doc_opts, Options, []),
OpenOpts = [deleted | DocOpts],
Row1 = case lists:keyfind(include_docs, 1, Options) of
{include_docs, true} ->
Row0 ++ open_json_doc(TxDb, DocId, OpenOpts, DocOpts);
_ ->
Row0
end,
maybe_stop(UserFun({row, Row1}, Acc))
end, UserAcc1, Options),
{ok, maybe_stop(UserFun(complete, UserAcc2))}
catch throw:{stop, FinalUserAcc} ->
{ok, FinalUserAcc}
end
end).
fold_docs(Db, DocIds, UserFun, UserAcc0, Options) ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
try
NeedsTreeOpts = [revs_info, conflicts, deleted_conflicts],
NeedsTree = (Options -- NeedsTreeOpts /= Options),
InitAcc = #{
revs_q => queue:new(),
revs_count => 0,
body_q => queue:new(),
body_count => 0,
doc_opts => Options,
user_acc => UserAcc0,
user_fun => UserFun
},
FinalAcc1 = lists:foldl(fun(DocId, Acc) ->
#{
revs_q := RevsQ,
revs_count := RevsCount
} = Acc,
Future = fold_docs_get_revs(TxDb, DocId, NeedsTree),
NewAcc = Acc#{
revs_q := queue:in({DocId, Future}, RevsQ),
revs_count := RevsCount + 1
},
drain_fold_docs_revs_futures(TxDb, NewAcc)
end, InitAcc, DocIds),
FinalAcc2 = drain_all_fold_docs_revs_futures(TxDb, FinalAcc1),
FinalAcc3 = drain_all_fold_docs_body_futures(TxDb, FinalAcc2),
#{
user_acc := FinalUserAcc
} = FinalAcc3,
{ok, FinalUserAcc}
catch throw:{stop, StopUserAcc} ->
{ok, StopUserAcc}
end
end).
fold_design_docs(Db, UserFun, UserAcc0, Options1) ->
Options2 = set_design_doc_keys(Options1),
fold_docs(Db, UserFun, UserAcc0, Options2).
fold_local_docs(Db, UserFun, UserAcc0, Options0) ->
% This is mostly for testing and sanity checking. When calling from a test
% namespace will be automatically set. We also assert when called from the
% API the correct namespace was set
Options = case lists:keyfind(namespace, 1, Options0) of
{namespace, <<"_local">>} -> Options0;
false -> [{namespace, <<"_local">>} | Options0]
end,
fabric2_fdb:transactional(Db, fun(TxDb) ->
try
#{
db_prefix := DbPrefix
} = TxDb,
Prefix = erlfdb_tuple:pack({?DB_LOCAL_DOCS}, DbPrefix),
Meta = get_all_docs_meta(TxDb, Options),
UserAcc1 = maybe_stop(UserFun({meta, Meta}, UserAcc0)),
UserAcc2 = fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) ->
{DocId} = erlfdb_tuple:unpack(K, Prefix),
Rev = fabric2_fdb:get_local_doc_rev(TxDb, DocId, V),
maybe_stop(UserFun({row, [
{id, DocId},
{key, DocId},
{value, {[{rev, couch_doc:rev_to_str({0, Rev})}]}}
]}, Acc))
end, UserAcc1, Options),
{ok, maybe_stop(UserFun(complete, UserAcc2))}
catch throw:{stop, FinalUserAcc} ->
{ok, FinalUserAcc}
end
end).
fold_changes(Db, SinceSeq, UserFun, UserAcc) ->
fold_changes(Db, SinceSeq, UserFun, UserAcc, []).
fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
try
#{
db_prefix := DbPrefix
} = TxDb,
Prefix = erlfdb_tuple:pack({?DB_CHANGES}, DbPrefix),
Dir = case fabric2_util:get_value(dir, Options, fwd) of
rev -> rev;
_ -> fwd
end,
RestartTx = case fabric2_util:get_value(restart_tx, Options) of
undefined -> [{restart_tx, true}];
_AlreadySet -> []
end,
StartKey = get_since_seq(TxDb, Dir, SinceSeq),
EndKey = case fabric2_util:get_value(end_key, Options) of
undefined when Dir == rev ->
fabric2_util:seq_zero_vs();
undefined ->
fabric2_util:seq_max_vs();
EK when is_binary(EK) ->
fabric2_fdb:seq_to_vs(EK);
EK when is_tuple(EK), element(1, EK) == versionstamp ->
EK
end,
BaseOpts = [{start_key, StartKey}] ++ RestartTx ++ Options,
FoldOpts = lists:keystore(end_key, 1, BaseOpts, {end_key, EndKey}),
{ok, fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) ->
{SeqVS} = erlfdb_tuple:unpack(K, Prefix),
{DocId, Deleted, RevId} = erlfdb_tuple:unpack(V),
Change = #{
id => DocId,
sequence => fabric2_fdb:vs_to_seq(SeqVS),
rev_id => RevId,
deleted => Deleted
},
maybe_stop(UserFun(Change, Acc))
end, UserAcc, FoldOpts)}
catch throw:{stop, FinalUserAcc} ->
{ok, FinalUserAcc}
end
end).
dbname_suffix(DbName) ->
filename:basename(normalize_dbname(DbName)).
normalize_dbname(DbName) ->
% Remove in the final cleanup. We don't need to handle shards prefix or
% remove .couch suffixes anymore. Keep it for now to pass all the existing
% tests.
couch_db:normalize_dbname(DbName).
validate_dbname(DbName) when is_list(DbName) ->
validate_dbname(?l2b(DbName));
validate_dbname(DbName) when is_binary(DbName) ->
Normalized = normalize_dbname(DbName),
fabric2_db_plugin:validate_dbname(
DbName, Normalized, fun validate_dbname_int/2).
validate_dbname_int(DbName, Normalized) when is_binary(DbName) ->
case validate_dbname_length(DbName) of
ok -> validate_dbname_pat(DbName, Normalized);
{error, _} = Error -> Error
end.
validate_dbname_length(DbName) ->
MaxLength = config:get_integer("couchdb", "max_database_name_length",
?DEFAULT_MAX_DATABASE_NAME_LENGTH),
case byte_size(DbName) =< MaxLength of
true -> ok;
false -> {error, {database_name_too_long, DbName}}
end.
validate_dbname_pat(DbName, Normalized) ->
DbNoExt = couch_util:drop_dot_couch_ext(DbName),
case re:run(DbNoExt, ?DBNAME_REGEX, [{capture,none}, dollar_endonly]) of
match ->
ok;
nomatch ->
case is_system_db_name(Normalized) of
true -> ok;
false -> {error, {illegal_database_name, DbName}}
end
end.
maybe_add_sys_db_callbacks(Db) ->
IsReplicatorDb = is_replicator_db(Db),
IsUsersDb = is_users_db(Db),
{BDU, ADR} = if
IsReplicatorDb ->
{
fun couch_replicator_docs:before_doc_update/3,
fun couch_replicator_docs:after_doc_read/2
};
IsUsersDb ->
{
fun fabric2_users_db:before_doc_update/3,
fun fabric2_users_db:after_doc_read/2
};
true ->
{undefined, undefined}
end,
Db#{
before_doc_update := BDU,
after_doc_read := ADR
}.
make_db_info(DbName, Props) ->
BaseProps = [
{cluster, {[{n, 0}, {q, 0}, {r, 0}, {w, 0}]}},
{compact_running, false},
{data_size, 0},
{db_name, DbName},
{disk_format_version, 0},
{disk_size, 0},
{instance_start_time, <<"0">>},
{purge_seq, 0}
],
lists:foldl(fun({Key, Val}, Acc) ->
lists:keystore(Key, 1, Acc, {Key, Val})
end, BaseProps, Props).
drain_info_futures(FutureQ, Count, _UserFun, Acc) when Count < 100 ->
{FutureQ, Count, Acc};
drain_info_futures(FutureQ, Count, UserFun, Acc) when Count >= 100 ->
{{value, {DbName, Future}}, RestQ} = queue:out(FutureQ),
InfoProps = fabric2_fdb:get_info_wait(Future),
DbInfo = make_db_info(DbName, InfoProps),
NewAcc = maybe_stop(UserFun({row, DbInfo}, Acc)),
{RestQ, Count - 1, NewAcc}.
drain_all_info_futures(FutureQ, UserFun, Acc) ->
case queue:out(FutureQ) of
{{value, {DbName, Future}}, RestQ} ->
InfoProps = fabric2_fdb:get_info_wait(Future),
DbInfo = make_db_info(DbName, InfoProps),
NewAcc = maybe_stop(UserFun({row, DbInfo}, Acc)),
drain_all_info_futures(RestQ, UserFun, NewAcc);
{empty, _} ->
Acc
end.
drain_deleted_info_futures(FutureQ, Count, _UserFun, Acc) when Count < 100 ->
{FutureQ, Count, Acc};
drain_deleted_info_futures(FutureQ, Count, UserFun, Acc) when Count >= 100 ->
{{value, {DbName, TimeStamp, Future}}, RestQ} = queue:out(FutureQ),
BaseProps = fabric2_fdb:get_info_wait(Future),
DeletedProps = BaseProps ++ [
{deleted, true},
{timestamp, TimeStamp}
],
DbInfo = make_db_info(DbName, DeletedProps),
NewAcc = maybe_stop(UserFun({row, DbInfo}, Acc)),
{RestQ, Count - 1, NewAcc}.
drain_all_deleted_info_futures(FutureQ, UserFun, Acc) ->
case queue:out(FutureQ) of
{{value, {DbName, TimeStamp, Future}}, RestQ} ->
BaseProps = fabric2_fdb:get_info_wait(Future),
DeletedProps = BaseProps ++ [
{deleted, true},
{timestamp, TimeStamp}
],
DbInfo = make_db_info(DbName, DeletedProps),
NewAcc = maybe_stop(UserFun({row, DbInfo}, Acc)),
drain_all_deleted_info_futures(RestQ, UserFun, NewAcc);
{empty, _} ->
Acc
end.
fold_docs_get_revs(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, _) ->
fabric2_fdb:get_local_doc_rev_future(Db, DocId);
fold_docs_get_revs(Db, DocId, true) ->
fabric2_fdb:get_all_revs_future(Db, DocId);
fold_docs_get_revs(Db, DocId, false) ->
fabric2_fdb:get_winning_revs_future(Db, DocId, 1).
fold_docs_get_revs_wait(_Db, <<?LOCAL_DOC_PREFIX, _/binary>>, RevsFuture) ->
Rev = fabric2_fdb:get_local_doc_rev_wait(RevsFuture),
[Rev];
fold_docs_get_revs_wait(Db, _DocId, RevsFuture) ->
fabric2_fdb:get_revs_wait(Db, RevsFuture).
fold_docs_get_doc_body_future(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId,
[Rev]) ->
fabric2_fdb:get_local_doc_body_future(Db, DocId, Rev);
fold_docs_get_doc_body_future(Db, DocId, Revs) ->
Winner = get_rev_winner(Revs),
fabric2_fdb:get_doc_body_future(Db, DocId, Winner).
fold_docs_get_doc_body_wait(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, [Rev],
_DocOpts, BodyFuture) ->
case fabric2_fdb:get_local_doc_body_wait(Db, DocId, Rev, BodyFuture) of
{not_found, missing} -> {not_found, missing};
Doc -> {ok, Doc}
end;
fold_docs_get_doc_body_wait(Db, DocId, Revs, DocOpts, BodyFuture) ->
RevInfo = get_rev_winner(Revs),
Base = fabric2_fdb:get_doc_body_wait(Db, DocId, RevInfo,
BodyFuture),
apply_open_doc_opts(Base, Revs, DocOpts).
drain_fold_docs_revs_futures(_TxDb, #{revs_count := C} = Acc) when C < 100 ->
Acc;
drain_fold_docs_revs_futures(TxDb, Acc) ->
drain_one_fold_docs_revs_future(TxDb, Acc).
drain_all_fold_docs_revs_futures(_TxDb, #{revs_count := C} = Acc) when C =< 0 ->
Acc;
drain_all_fold_docs_revs_futures(TxDb, #{revs_count := C} = Acc) when C > 0 ->
NewAcc = drain_one_fold_docs_revs_future(TxDb, Acc),
drain_all_fold_docs_revs_futures(TxDb, NewAcc).
drain_one_fold_docs_revs_future(TxDb, Acc) ->
#{
revs_q := RevsQ,
revs_count := RevsCount,
body_q := BodyQ,
body_count := BodyCount
} = Acc,
{{value, {DocId, RevsFuture}}, RestRevsQ} = queue:out(RevsQ),
Revs = fold_docs_get_revs_wait(TxDb, DocId, RevsFuture),
DocFuture = case Revs of
[] ->
{DocId, [], not_found};
[_ | _] ->
BodyFuture = fold_docs_get_doc_body_future(TxDb, DocId, Revs),
{DocId, Revs, BodyFuture}
end,
NewAcc = Acc#{
revs_q := RestRevsQ,
revs_count := RevsCount - 1,
body_q := queue:in(DocFuture, BodyQ),
body_count := BodyCount + 1
},
drain_fold_docs_body_futures(TxDb, NewAcc).
drain_fold_docs_body_futures(_TxDb, #{body_count := C} = Acc) when C < 100 ->
Acc;
drain_fold_docs_body_futures(TxDb, Acc) ->
drain_one_fold_docs_body_future(TxDb, Acc).
drain_all_fold_docs_body_futures(_TxDb, #{body_count := C} = Acc) when C =< 0 ->
Acc;
drain_all_fold_docs_body_futures(TxDb, #{body_count := C} = Acc) when C > 0 ->
NewAcc = drain_one_fold_docs_body_future(TxDb, Acc),
drain_all_fold_docs_body_futures(TxDb, NewAcc).
drain_one_fold_docs_body_future(TxDb, Acc) ->
#{
body_q := BodyQ,
body_count := BodyCount,
doc_opts := DocOpts,
user_fun := UserFun,
user_acc := UserAcc
} = Acc,
{{value, {DocId, Revs, BodyFuture}}, RestBodyQ} = queue:out(BodyQ),
Doc = case BodyFuture of
not_found ->
{not_found, missing};
_ ->
fold_docs_get_doc_body_wait(TxDb, DocId, Revs, DocOpts, BodyFuture)
end,
NewUserAcc = maybe_stop(UserFun(DocId, Doc, UserAcc)),
Acc#{
body_q := RestBodyQ,
body_count := BodyCount - 1,
user_acc := NewUserAcc
}.
get_rev_winner(Revs) ->
[Winner] = lists:filter(fun(Rev) ->
maps:get(winner, Rev)
end, Revs),
Winner.
new_revid(Db, Doc) ->
#doc{
id = DocId,
body = Body,
revs = {OldStart, OldRevs},
atts = Atts,
deleted = Deleted
} = Doc,
{NewAtts, AttSigInfo} = lists:mapfoldl(fun(Att, Acc) ->
[Name, Type, Data, Md5] = couch_att:fetch([name, type, data, md5], Att),
case Data of
{loc, _, _, _} ->
{Att, [{Name, Type, Md5} | Acc]};
_ ->
Att1 = couch_att:flush(Db, DocId, Att),
Att2 = couch_att:store(revpos, OldStart + 1, Att1),
{Att2, [{Name, Type, couch_att:fetch(md5, Att2)} | Acc]}
end
end, [], Atts),
Rev = case length(Atts) == length(AttSigInfo) of
true ->
OldRev = case OldRevs of [] -> 0; [OldRev0 | _] -> OldRev0 end,
SigTerm = [Deleted, OldStart, OldRev, Body, AttSigInfo],
couch_hash:md5_hash(term_to_binary(SigTerm, [{minor_version, 1}]));
false ->
erlang:error(missing_att_info)
end,
Doc#doc{
revs = {OldStart + 1, [Rev | OldRevs]},
atts = NewAtts
}.
get_all_docs_meta(TxDb, Options) ->
NS = couch_util:get_value(namespace, Options),
DocCount = get_doc_count(TxDb, NS),
case lists:keyfind(update_seq, 1, Options) of
{_, true} ->
UpdateSeq = fabric2_db:get_update_seq(TxDb),
[{update_seq, UpdateSeq}];
_ ->
[]
end ++ [{total, DocCount}, {offset, null}].
maybe_set_interactive(#{} = Db, Options) ->
Interactive = fabric2_util:get_value(interactive, Options, false),
Db#{interactive := Interactive}.
maybe_set_user_ctx(Db, Options) ->
case fabric2_util:get_value(user_ctx, Options) of
#user_ctx{} = UserCtx ->
set_user_ctx(Db, UserCtx);
undefined ->
Db
end.
is_member(Db, {SecProps}) when is_list(SecProps) ->
case is_admin(Db, {SecProps}) of
true ->
true;
false ->
case is_public_db(SecProps) of
true ->
true;
false ->
{Members} = get_members(SecProps),
UserCtx = get_user_ctx(Db),
is_authorized(Members, UserCtx)
end
end.
is_authorized(Group, UserCtx) ->
#user_ctx{
name = UserName,
roles = UserRoles
} = UserCtx,
Names = fabric2_util:get_value(<<"names">>, Group, []),
Roles = fabric2_util:get_value(<<"roles">>, Group, []),
case check_security(roles, UserRoles, [<<"_admin">> | Roles]) of
true ->
true;
false ->
check_security(names, UserName, Names)
end.
check_security(roles, [], _) ->
false;
check_security(roles, UserRoles, Roles) ->
UserRolesSet = ordsets:from_list(UserRoles),
RolesSet = ordsets:from_list(Roles),
not ordsets:is_disjoint(UserRolesSet, RolesSet);
check_security(names, _, []) ->
false;
check_security(names, null, _) ->
false;
check_security(names, UserName, Names) ->
lists:member(UserName, Names).
throw_security_error(#user_ctx{name = null} = UserCtx) ->
Reason = <<"You are not authorized to access this db.">>,
throw_security_error(UserCtx, Reason);
throw_security_error(#user_ctx{name = _} = UserCtx) ->
Reason = <<"You are not allowed to access this db.">>,
throw_security_error(UserCtx, Reason).
throw_security_error(#user_ctx{} = UserCtx, Reason) ->
Error = security_error_type(UserCtx),
throw({Error, Reason}).
security_error_type(#user_ctx{name = null}) ->
unauthorized;
security_error_type(#user_ctx{name = _}) ->
forbidden.
is_public_db(SecProps) ->
{Members} = get_members(SecProps),
Names = fabric2_util:get_value(<<"names">>, Members, []),
Roles = fabric2_util:get_value(<<"roles">>, Members, []),
Names =:= [] andalso Roles =:= [].
get_admins(SecProps) ->
fabric2_util:get_value(<<"admins">>, SecProps, {[]}).
get_members(SecProps) ->
% we fallback to readers here for backwards compatibility
case fabric2_util:get_value(<<"members">>, SecProps) of
undefined ->
fabric2_util:get_value(<<"readers">>, SecProps, {[]});
Members ->
Members
end.
apply_open_doc_opts(Doc0, Revs, Options) ->
IncludeRevsInfo = lists:member(revs_info, Options),
IncludeConflicts = lists:member(conflicts, Options),
IncludeDelConflicts = lists:member(deleted_conflicts, Options),
IncludeLocalSeq = lists:member(local_seq, Options),
% This revs_info becomes fairly useless now that we're
% not keeping old document bodies around...
Meta1 = if not IncludeRevsInfo -> []; true ->
{Pos, [Rev | RevPath]} = Doc0#doc.revs,
RevPathMissing = lists:map(fun(R) -> {R, missing} end, RevPath),
[{revs_info, Pos, [{Rev, available} | RevPathMissing]}]
end,
Meta2 = if not IncludeConflicts -> []; true ->
Conflicts = [RI || RI = #{winner := false, deleted := false} <- Revs],
if Conflicts == [] -> []; true ->
ConflictRevs = [maps:get(rev_id, RI) || RI <- Conflicts],
[{conflicts, ConflictRevs}]
end
end,
Meta3 = if not IncludeDelConflicts -> []; true ->
DelConflicts = [RI || RI = #{winner := false, deleted := true} <- Revs],
if DelConflicts == [] -> []; true ->
DelConflictRevs = [maps:get(rev_id, RI) || RI <- DelConflicts],
[{deleted_conflicts, DelConflictRevs}]
end
end,
Meta4 = if not IncludeLocalSeq -> []; true ->
#{winner := true, sequence := SeqVS} = lists:last(Revs),
[{local_seq, fabric2_fdb:vs_to_seq(SeqVS)}]
end,
Doc1 = case lists:keyfind(atts_since, 1, Options) of
{_, PossibleAncestors} ->
#doc{
revs = DocRevs,
atts = Atts0
} = Doc0,
RevPos = find_ancestor_rev_pos(DocRevs, PossibleAncestors),
Atts1 = lists:map(fun(Att) ->
[AttPos, Data] = couch_att:fetch([revpos, data], Att),
if AttPos > RevPos -> couch_att:store(data, Data, Att);
true -> couch_att:store(data, stub, Att)
end
end, Atts0),
Doc0#doc{atts = Atts1};
false ->
Doc0
end,
{ok, Doc1#doc{meta = Meta1 ++ Meta2 ++ Meta3 ++ Meta4}}.
find_ancestor_rev_pos({_, []}, _PossibleAncestors) ->
0;
find_ancestor_rev_pos(_DocRevs, []) ->
0;
find_ancestor_rev_pos({RevPos, [RevId | Rest]}, AttsSinceRevs) ->
case lists:member({RevPos, RevId}, AttsSinceRevs) of
true -> RevPos;
false -> find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs)
end.
filter_found_revs(RevInfo, Revs) ->
#{
rev_id := {Pos, Rev},
rev_path := RevPath
} = RevInfo,
FullRevPath = [Rev | RevPath],
lists:flatmap(fun({FindPos, FindRev} = RevIdToFind) ->
if FindPos > Pos -> [RevIdToFind]; true ->
% Add 1 because lists:nth is 1 based
Idx = Pos - FindPos + 1,
case Idx > length(FullRevPath) of
true ->
[RevIdToFind];
false ->
case lists:nth(Idx, FullRevPath) == FindRev of
true -> [];
false -> [RevIdToFind]
end
end
end
end, Revs).
find_possible_ancestors(RevInfos, MissingRevs) ->
% Find any revinfos that are possible ancestors
% of the missing revs. A possible ancestor is
% any rev that has a start position less than
% any missing revision. Stated alternatively,
% find any revinfo that could theoretically
% extended to be one or more of the missing
% revisions.
%
% Since we are looking at any missing revision
% we can just compare against the maximum missing
% start position.
MaxMissingPos = case MissingRevs of
[] -> 0;
[_ | _] -> lists:max([Start || {Start, _Rev} <- MissingRevs])
end,
lists:flatmap(fun(RevInfo) ->
#{rev_id := {RevPos, _} = RevId} = RevInfo,
case RevPos < MaxMissingPos of
true -> [RevId];
false -> []
end
end, RevInfos).
apply_before_doc_update(Db, Docs, Options) ->
UpdateType = case lists:member(replicated_changes, Options) of
true -> replicated_changes;
false -> interactive_edit
end,
lists:map(fun(Doc) ->
fabric2_db_plugin:before_doc_update(Db, Doc, UpdateType)
end, Docs).
update_doc_int(#{} = Db, #doc{} = Doc, Options) ->
IsLocal = case Doc#doc.id of
<<?LOCAL_DOC_PREFIX, _/binary>> -> true;
_ -> false
end,
try
case {IsLocal, is_replicated(Options)} of
{false, false} -> update_doc_interactive(Db, Doc, Options);
{false, true} -> update_doc_replicated(Db, Doc, Options);
{true, _} -> update_local_doc(Db, Doc, Options)
end
catch throw:{?MODULE, Return} ->
Return
end.
batch_update_docs(Db, Docs, Options) ->
BAcc = #bacc{
db = Db,
docs = Docs,
batch_size = get_batch_size(Options),
options = Options,
rev_futures = #{},
seen = [],
results = []
},
#bacc{results = Res} = batch_update_docs(BAcc),
lists:reverse(Res).
batch_update_docs(#bacc{docs = []} = BAcc) ->
BAcc;
batch_update_docs(#bacc{db = Db} = BAcc) ->
#bacc{
db = Db,
docs = Docs,
options = Options
} = BAcc,
BAccTx2 = fabric2_fdb:transactional(Db, fun(TxDb) ->
BAccTx = BAcc#bacc{db = TxDb},
case is_replicated(Options) of
false ->
Tagged = tag_docs(Docs),
RevFutures = get_winning_rev_futures(TxDb, Tagged),
BAccTx1 = BAccTx#bacc{
docs = Tagged,
rev_futures = RevFutures
},
batch_update_interactive_tx(BAccTx1);
true ->
BAccTx1 = batch_update_replicated_tx(BAccTx),
% For replicated updates reset `seen` after every transaction
BAccTx1#bacc{seen = []}
end
end),
% Clean up after the transaction ends so we can recurse with a clean state
maps:map(fun(Tag, RangeFuture) when is_reference(Tag) ->
ok = erlfdb:cancel(RangeFuture, [flush])
end, BAccTx2#bacc.rev_futures),
BAcc1 = BAccTx2#bacc{
db = Db,
rev_futures = #{}
},
batch_update_docs(BAcc1).
batch_update_interactive_tx(#bacc{docs = []} = BAcc) ->
BAcc;
batch_update_interactive_tx(#bacc{} = BAcc) ->
#bacc{
db = TxDb,
docs = [Doc | Docs],
options = Options,
batch_size = MaxSize,
rev_futures = RevFutures,
seen = Seen,
results = Results
} = BAcc,
{Res, Seen1} = try
update_docs_interactive(TxDb, Doc, Options, RevFutures, Seen)
catch throw:{?MODULE, Return} ->
{Return, Seen}
end,
BAcc1 = BAcc#bacc{
docs = Docs,
results = [Res | Results],
seen = Seen1
},
case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of
true -> BAcc1;
false -> batch_update_interactive_tx(BAcc1)
end.
batch_update_replicated_tx(#bacc{docs = []} = BAcc) ->
BAcc;
batch_update_replicated_tx(#bacc{} = BAcc) ->
#bacc{
db = TxDb,
docs = [Doc | Docs],
options = Options,
batch_size = MaxSize,
seen = Seen,
results = Results
} = BAcc,
case lists:member(Doc#doc.id, Seen) of
true ->
% If we already updated this doc in the current transaction, wait
% till the next transaction to update it again.
BAcc;
false ->
Res = update_doc_int(TxDb, Doc, Options),
BAcc1 = BAcc#bacc{
docs = Docs,
results = [Res | Results],
seen = [Doc#doc.id | Seen]
},
case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of
true -> BAcc1;
false -> batch_update_replicated_tx(BAcc1)
end
end.
update_docs_interactive(Db, #doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} = Doc,
Options, _Futures, SeenIds) ->
{update_local_doc(Db, Doc, Options), SeenIds};
update_docs_interactive(Db, Doc, Options, Futures, SeenIds) ->
case lists:member(Doc#doc.id, SeenIds) of
true ->
{conflict, SeenIds};
false ->
Future = maps:get(doc_tag(Doc), Futures),
case update_doc_interactive(Db, Doc, Future, Options) of
{ok, _} = Resp ->
{Resp, [Doc#doc.id | SeenIds]};
_ = Resp ->
{Resp, SeenIds}
end
end.
update_doc_interactive(Db, Doc0, Options) ->
% Get the current winning revision. This is needed
% regardless of which branch we're updating. The extra
% revision we're grabbing is an optimization to
% save us a round trip if we end up deleting
% the winning revision branch.
NumRevs = if Doc0#doc.deleted -> 2; true -> 1 end,
Future = fabric2_fdb:get_winning_revs_future(Db, Doc0#doc.id, NumRevs),
update_doc_interactive(Db, Doc0, Future, Options).
update_doc_interactive(Db, Doc0, Future, _Options) ->
RevInfos = fabric2_fdb:get_revs_wait(Db, Future),
{Winner, SecondPlace} = case RevInfos of
[] -> {not_found, not_found};
[WRI] -> {WRI, not_found};
[WRI, SPRI] -> {WRI, SPRI}
end,
WinnerRevId = case Winner of
not_found ->
{0, <<>>};
_ ->
case maps:get(deleted, Winner) of
true -> {0, <<>>};
false -> maps:get(rev_id, Winner)
end
end,
% Check that a revision was specified if required
Doc0RevId = doc_to_revid(Doc0),
HasRev = Doc0RevId =/= {0, <<>>},
if HasRev orelse WinnerRevId == {0, <<>>} -> ok; true ->
?RETURN({Doc0, conflict})
end,
% Allow inserting new deleted documents. Only works when the document has
% never existed to match CouchDB 3.x
case not HasRev andalso Doc0#doc.deleted andalso is_map(Winner) of
true -> ?RETURN({Doc0, conflict});
false -> ok
end,
% Get the target revision to update
Target = case Doc0RevId == WinnerRevId of
true ->
Winner;
false ->
case fabric2_fdb:get_non_deleted_rev(Db, Doc0#doc.id, Doc0RevId) of
#{deleted := false} = Target0 ->
Target0;
not_found ->
% Either a missing revision or a deleted
% revision. Either way a conflict. Note
% that we get not_found for a deleted revision
% because we only check for the non-deleted
% key in fdb
?RETURN({Doc0, conflict})
end
end,
Doc1 = case Winner of
#{deleted := true} when not Doc0#doc.deleted ->
% When recreating a deleted document we want to extend
% the winning revision branch rather than create a
% new branch. If we did not do this we could be
% recreating into a state that previously existed.
Doc0#doc{revs = fabric2_util:revinfo_to_revs(Winner)};
#{} ->
% Otherwise we're extending the target's revision
% history with this update
Doc0#doc{revs = fabric2_util:revinfo_to_revs(Target)};
not_found ->
% Creating a new doc means our revs start empty
Doc0
end,
% Validate the doc update and create the
% new revinfo map
Doc2 = prep_and_validate(Db, Doc1, Target),
Doc3 = new_revid(Db, Doc2),
#doc{
deleted = NewDeleted,
revs = {NewRevPos, [NewRev | NewRevPath]},
atts = Atts
} = Doc4 = stem_revisions(Db, Doc3),
NewRevInfo = #{
winner => undefined,
exists => false,
deleted => NewDeleted,
rev_id => {NewRevPos, NewRev},
rev_path => NewRevPath,
sequence => undefined,
branch_count => undefined,
att_hash => fabric2_util:hash_atts(Atts),
rev_size => fabric2_util:rev_size(Doc4)
},
% Gather the list of possible winnig revisions
Possible = case Target == Winner of
true when not Doc4#doc.deleted ->
[NewRevInfo];
true when Doc4#doc.deleted ->
case SecondPlace of
#{} -> [NewRevInfo, SecondPlace];
not_found -> [NewRevInfo]
end;
false ->
[NewRevInfo, Winner]
end,
% Sort the rev infos such that the winner is first
{NewWinner0, NonWinner} = case fabric2_util:sort_revinfos(Possible) of
[W] -> {W, not_found};
[W, NW] -> {W, NW}
end,
BranchCount = case Winner of
not_found -> 1;
#{branch_count := BC} -> BC
end,
NewWinner = NewWinner0#{branch_count := BranchCount},
ToUpdate = if NonWinner == not_found -> []; true -> [NonWinner] end,
ToRemove = if Target == not_found -> []; true -> [Target] end,
ok = fabric2_fdb:write_doc(
Db,
Doc4,
NewWinner,
Winner,
ToUpdate,
ToRemove
),
{ok, {NewRevPos, NewRev}}.
update_doc_replicated(Db, Doc0, _Options) ->
#doc{
id = DocId,
deleted = Deleted,
revs = {RevPos, [Rev | RevPath]}
} = Doc0,
DocRevInfo0 = #{
winner => undefined,
exists => false,
deleted => Deleted,
rev_id => {RevPos, Rev},
rev_path => RevPath,
sequence => undefined,
branch_count => undefined,
att_hash => <<>>,
rev_size => null
},
AllRevInfos = fabric2_fdb:get_all_revs(Db, DocId),
RevTree = lists:foldl(fun(RI, TreeAcc) ->
RIPath = fabric2_util:revinfo_to_path(RI),
{Merged, _} = couch_key_tree:merge(TreeAcc, RIPath),
Merged
end, [], AllRevInfos),
DocRevPath = fabric2_util:revinfo_to_path(DocRevInfo0),
{NewTree, Status} = couch_key_tree:merge(RevTree, DocRevPath),
if Status /= internal_node -> ok; true ->
% We already know this revision so nothing
% left to do.
?RETURN({Doc0, {ok, []}})
end,
% Its possible to have a replication with fewer than $revs_limit
% revisions which extends an existing branch. To avoid
% losing revision history we extract the new node from the
% tree and use the combined path after stemming.
{[{_, {RevPos, UnstemmedRevs}}], []}
= couch_key_tree:get(NewTree, [{RevPos, Rev}]),
Doc1 = stem_revisions(Db, Doc0#doc{revs = {RevPos, UnstemmedRevs}}),
{RevPos, [Rev | NewRevPath]} = Doc1#doc.revs,
DocRevInfo1 = DocRevInfo0#{rev_path := NewRevPath},
% Find any previous revision we knew about for
% validation and attachment handling.
AllLeafsFull = couch_key_tree:get_all_leafs_full(NewTree),
LeafPath = get_leaf_path(RevPos, Rev, AllLeafsFull),
PrevRevInfo = find_prev_revinfo(RevPos, LeafPath),
Doc2 = prep_and_validate(Db, Doc1, PrevRevInfo),
Doc3 = flush_doc_atts(Db, Doc2),
DocRevInfo2 = DocRevInfo1#{
atts_hash => fabric2_util:hash_atts(Doc3#doc.atts),
rev_size => fabric2_util:rev_size(Doc3)
},
% Possible winners are the previous winner and
% the new DocRevInfo
Winner = case fabric2_util:sort_revinfos(AllRevInfos) of
[#{winner := true} = WRI | _] -> WRI;
[] -> not_found
end,
{NewWinner0, NonWinner} = case Winner == PrevRevInfo of
true ->
{DocRevInfo2, not_found};
false ->
[W, NW] = fabric2_util:sort_revinfos([Winner, DocRevInfo2]),
{W, NW}
end,
NewWinner = NewWinner0#{branch_count := length(AllLeafsFull)},
ToUpdate = if NonWinner == not_found -> []; true -> [NonWinner] end,
ToRemove = if PrevRevInfo == not_found -> []; true -> [PrevRevInfo] end,
ok = fabric2_fdb:write_doc(
Db,
Doc3,
NewWinner,
Winner,
ToUpdate,
ToRemove
),
{ok, []}.
update_local_doc(Db, Doc0, _Options) ->
Doc1 = case increment_local_doc_rev(Doc0) of
{ok, Updated} -> Updated;
{error, Error} -> ?RETURN({Doc0, Error})
end,
ok = fabric2_fdb:write_local_doc(Db, Doc1),
#doc{revs = {0, [Rev]}} = Doc1,
{ok, {0, integer_to_binary(Rev)}}.
flush_doc_atts(Db, Doc) ->
#doc{
id = DocId,
atts = Atts
} = Doc,
NewAtts = lists:map(fun(Att) ->
case couch_att:fetch(data, Att) of
{loc, _, _, _} ->
Att;
_ ->
couch_att:flush(Db, DocId, Att)
end
end, Atts),
Doc#doc{atts = NewAtts}.
get_winning_rev_futures(Db, Docs) ->
lists:foldl(fun(Doc, Acc) ->
#doc{
id = DocId,
deleted = Deleted
} = Doc,
IsLocal = case DocId of
<<?LOCAL_DOC_PREFIX, _/binary>> -> true;
_ -> false
end,
if IsLocal -> Acc; true ->
NumRevs = if Deleted -> 2; true -> 1 end,
Future = fabric2_fdb:get_winning_revs_future(Db, DocId, NumRevs),
DocTag = doc_tag(Doc),
Acc#{DocTag => Future}
end
end, #{}, Docs).
prep_and_validate(Db, NewDoc, PrevRevInfo) ->
HasStubs = couch_doc:has_stubs(NewDoc),
HasVDUs = [] /= maps:get(validate_doc_update_funs, Db),
IsDDoc = case NewDoc#doc.id of
<<?DESIGN_DOC_PREFIX, _/binary>> -> true;
_ -> false
end,
WasDeleted = case PrevRevInfo of
not_found -> false;
#{deleted := D} -> D
end,
PrevDoc = case HasStubs orelse (HasVDUs and not IsDDoc) of
true when PrevRevInfo /= not_found, not WasDeleted ->
case fabric2_fdb:get_doc_body(Db, NewDoc#doc.id, PrevRevInfo) of
#doc{} = PDoc -> PDoc;
{not_found, _} -> nil
end;
_ ->
nil
end,
MergedDoc = if not HasStubs -> NewDoc; true ->
% This will throw an error if we have any
% attachment stubs missing data
couch_doc:merge_stubs(NewDoc, PrevDoc)
end,
check_duplicate_attachments(MergedDoc),
validate_doc_update(Db, MergedDoc, PrevDoc),
MergedDoc.
validate_doc_update(Db, #doc{id = <<"_design/", _/binary>>} = Doc, _) ->
case catch check_is_admin(Db) of
ok -> validate_ddoc(Db, Doc);
Error -> ?RETURN({Doc, Error})
end;
validate_doc_update(Db, Doc, PrevDoc) ->
#{
security_doc := Security,
validate_doc_update_funs := VDUs
} = Db,
Fun = fun() ->
JsonCtx = fabric2_util:user_ctx_to_json(Db),
lists:map(fun(VDU) ->
try
case VDU(Doc, PrevDoc, JsonCtx, Security) of
ok -> ok;
Error1 -> throw(Error1)
end
catch throw:Error2 ->
?RETURN({Doc, Error2})
end
end, VDUs)
end,
Stat = [couchdb, query_server, vdu_process_time],
if VDUs == [] -> ok; true ->
couch_stats:update_histogram(Stat, Fun)
end.
validate_ddoc(Db, DDoc) ->
try
ok = couch_index_server:validate(Db, couch_doc:with_ejson_body(DDoc))
catch
throw:{invalid_design_doc, Reason} ->
throw({bad_request, invalid_design_doc, Reason});
throw:{compilation_error, Reason} ->
throw({bad_request, compilation_error, Reason});
throw:Error ->
?RETURN({DDoc, Error})
end.
validate_atomic_update(_, false) ->
ok;
validate_atomic_update(AllDocs, true) ->
% TODO actually perform the validation. This requires some hackery, we need
% to basically extract the prep_and_validate_updates function from couch_db
% and only run that, without actually writing in case of a success.
Error = {not_implemented, <<"all_or_nothing is not supported">>},
PreCommitFailures = lists:map(fun(#doc{id=Id, revs = {Pos,Revs}}) ->
case Revs of [] -> RevId = <<>>; [RevId|_] -> ok end,
{{Id, {Pos, RevId}}, Error}
end, AllDocs),
throw({aborted, PreCommitFailures}).
check_duplicate_attachments(#doc{atts = Atts}) ->
lists:foldl(fun(Att, Names) ->
Name = couch_att:fetch(name, Att),
case ordsets:is_element(Name, Names) of
true -> throw({bad_request, <<"Duplicate attachments">>});
false -> ordsets:add_element(Name, Names)
end
end, ordsets:new(), Atts).
get_since_seq(Db, rev, <<>>) ->
get_since_seq(Db, rev, now);
get_since_seq(_Db, _Dir, Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0->
fabric2_util:seq_zero_vs();
get_since_seq(Db, Dir, Seq) when Seq == now; Seq == <<"now">> ->
CurrSeq = fabric2_fdb:get_last_change(Db),
get_since_seq(Db, Dir, CurrSeq);
get_since_seq(_Db, _Dir, Seq) when is_binary(Seq), size(Seq) == 24 ->
fabric2_fdb:next_vs(fabric2_fdb:seq_to_vs(Seq));
get_since_seq(Db, Dir, List) when is_list(List) ->
get_since_seq(Db, Dir, list_to_binary(List));
get_since_seq(_Db, _Dir, Seq) ->
erlang:error({invalid_since_seq, Seq}).
get_leaf_path(Pos, Rev, [{Pos, [{Rev, _RevInfo} | LeafPath]} | _]) ->
LeafPath;
get_leaf_path(Pos, Rev, [_WrongLeaf | RestLeafs]) ->
get_leaf_path(Pos, Rev, RestLeafs).
find_prev_revinfo(_Pos, []) ->
not_found;
find_prev_revinfo(Pos, [{_Rev, ?REV_MISSING} | RestPath]) ->
find_prev_revinfo(Pos - 1, RestPath);
find_prev_revinfo(_Pos, [{_Rev, #{} = RevInfo} | _]) ->
RevInfo.
increment_local_doc_rev(#doc{deleted = true} = Doc) ->
{ok, Doc#doc{revs = {0, [0]}}};
increment_local_doc_rev(#doc{revs = {0, []}} = Doc) ->
{ok, Doc#doc{revs = {0, [1]}}};
increment_local_doc_rev(#doc{revs = {0, [RevStr | _]}} = Doc) ->
try
PrevRev = binary_to_integer(RevStr),
{ok, Doc#doc{revs = {0, [PrevRev + 1]}}}
catch error:badarg ->
{error, <<"Invalid rev format">>}
end;
increment_local_doc_rev(#doc{}) ->
{error, <<"Invalid rev format">>}.
doc_to_revid(#doc{revs = Revs}) ->
case Revs of
{0, []} -> {0, <<>>};
{RevPos, [Rev | _]} -> {RevPos, Rev}
end.
tag_docs([]) ->
[];
tag_docs([#doc{meta = Meta} = Doc | Rest]) ->
Meta1 = lists:keystore(ref, 1, Meta, {ref, make_ref()}),
NewDoc = Doc#doc{meta = Meta1},
[NewDoc | tag_docs(Rest)].
doc_tag(#doc{meta = Meta}) ->
fabric2_util:get_value(ref, Meta).
idrevs({Id, Revs}) when is_list(Revs) ->
{docid(Id), [rev(R) || R <- Revs]}.
docid(DocId) when is_list(DocId) ->
list_to_binary(DocId);
docid(DocId) ->
DocId.
rev(Rev) when is_list(Rev); is_binary(Rev) ->
couch_doc:parse_rev(Rev);
rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) ->
Rev.
maybe_stop({ok, Acc}) ->
Acc;
maybe_stop({stop, Acc}) ->
throw({stop, Acc}).
set_design_doc_keys(Options1) ->
Dir = couch_util:get_value(dir, Options1, fwd),
Options2 = set_design_doc_start_key(Options1, Dir),
set_design_doc_end_key(Options2, Dir).
set_design_doc_start_key(Options, fwd) ->
Key1 = couch_util:get_value(start_key, Options, ?FIRST_DDOC_KEY),
Key2 = max(Key1, ?FIRST_DDOC_KEY),
lists:keystore(start_key, 1, Options, {start_key, Key2});
set_design_doc_start_key(Options, rev) ->
Key1 = couch_util:get_value(start_key, Options, ?LAST_DDOC_KEY),
Key2 = min(Key1, ?LAST_DDOC_KEY),
lists:keystore(start_key, 1, Options, {start_key, Key2}).
set_design_doc_end_key(Options, fwd) ->
case couch_util:get_value(end_key_gt, Options) of
undefined ->
Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY),
Key2 = min(Key1, ?LAST_DDOC_KEY),
lists:keystore(end_key, 1, Options, {end_key, Key2});
EKeyGT ->
Key2 = min(EKeyGT, ?LAST_DDOC_KEY),
lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2})
end;
set_design_doc_end_key(Options, rev) ->
case couch_util:get_value(end_key_gt, Options) of
undefined ->
Key1 = couch_util:get_value(end_key, Options, ?FIRST_DDOC_KEY),
Key2 = max(Key1, ?FIRST_DDOC_KEY),
lists:keystore(end_key, 1, Options, {end_key, Key2});
EKeyGT ->
Key2 = max(EKeyGT, ?FIRST_DDOC_KEY),
lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2})
end.
stem_revisions(#{} = Db, #doc{} = Doc) ->
#{revs_limit := RevsLimit} = Db,
#doc{revs = {RevPos, Revs}} = Doc,
case RevPos >= RevsLimit of
true -> Doc#doc{revs = {RevPos, lists:sublist(Revs, RevsLimit)}};
false -> Doc
end.
open_json_doc(Db, DocId, OpenOpts, DocOpts) ->
case fabric2_db:open_doc(Db, DocId, OpenOpts) of
{not_found, missing} ->
[];
{ok, #doc{deleted = true}} ->
[{doc, null}];
{ok, #doc{} = Doc} ->
[{doc, couch_doc:to_json_obj(Doc, DocOpts)}]
end.
get_cached_db(#{} = Db, Opts) when is_list(Opts) ->
MaxAge = fabric2_util:get_value(max_age, Opts, 0),
Now = erlang:monotonic_time(millisecond),
Age = Now - maps:get(check_current_ts, Db),
case Age < MaxAge of
true ->
Db;
false ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
fabric2_fdb:ensure_current(TxDb)
end)
end.
is_replicated(Options) when is_list(Options) ->
lists:member(replicated_changes, Options).
get_batch_size(Options) ->
case fabric2_util:get_value(batch_size, Options) of
undefined ->
config:get_integer("fabric", "update_docs_batch_size",
?DEFAULT_UPDATE_DOCS_BATCH_SIZE);
Val when is_integer(Val) ->
Val
end.