% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(fabric2_db).


-export([
    create/2,
    open/2,
    delete/2,
    undelete/4,

    list_dbs/0,
    list_dbs/1,
    list_dbs/3,

    list_dbs_info/0,
    list_dbs_info/1,
    list_dbs_info/3,

    list_deleted_dbs_info/0,
    list_deleted_dbs_info/1,
    list_deleted_dbs_info/3,

    check_is_admin/1,
    check_is_member/1,

    name/1,
    get_after_doc_read_fun/1,
    get_before_doc_update_fun/1,
    get_committed_update_seq/1,
    get_compacted_seq/1,
    get_compactor_pid/1,
    get_db_info/1,
    %% get_partition_info/2,
    get_del_doc_count/1,
    get_doc_count/1,
    get_doc_count/2,
    %% get_epochs/1,
    %% get_filepath/1,
    get_instance_start_time/1,
    get_pid/1,
    get_revs_limit/1,
    get_revs_limit/2,
    get_security/1,
    get_security/2,
    get_update_seq/1,
    get_user_ctx/1,
    get_uuid/1,
    %% get_purge_seq/1,
    %% get_oldest_purge_seq/1,
    %% get_purge_infos_limit/1,

    is_clustered/1,
    is_db/1,
    is_partitioned/1,
    is_system_db/1,
    is_system_db_name/1,
    is_replicator_db/1,
    is_users_db/1,

    set_revs_limit/2,
    %% set_purge_infos_limit/2,
    set_security/2,
    set_user_ctx/2,

    ensure_full_commit/1,
    ensure_full_commit/2,

    %% load_validation_funs/1,
    %% reload_validation_funs/1,

    open_doc/2,
    open_doc/3,
    open_doc_revs/4,
    %% open_doc_int/3,
    get_doc_info/2,
    get_full_doc_info/2,
    get_full_doc_infos/2,
    get_missing_revs/2,
    get_design_docs/1,
    %% get_purge_infos/2,

    %% get_minimum_purge_seq/1,
    %% purge_client_exists/3,

    validate_docid/1,
    %% doc_from_json_obj_validate/2,

    update_doc/2,
    update_doc/3,
    update_docs/2,
    update_docs/3,
    %% delete_doc/3,

    %% purge_docs/2,
    %% purge_docs/3,

    read_attachment/3,
    write_attachment/3,

    fold_docs/3,
    fold_docs/4,
    fold_docs/5,
    fold_design_docs/4,
    fold_local_docs/4,
    fold_changes/4,
    fold_changes/5,
    %% count_changes_since/2,
    %% fold_purge_infos/4,
    %% fold_purge_infos/5,

    %% calculate_start_seq/3,
    %% owner_of/2,

    %% start_compact/1,
    %% cancel_compact/1,
    %% wait_for_compaction/1,
    %% wait_for_compaction/2,

    dbname_suffix/1,
    validate_dbname/1,

    %% make_doc/5,
    new_revid/2,

    apply_open_doc_opts/3
]).


-include_lib("couch/include/couch_db.hrl").
-include("fabric2.hrl").


% Default max database name length is based on CouchDb < 4.x compatibility. See
% default.ini entry for additional information.
-define(DEFAULT_MAX_DATABASE_NAME_LENGTH, 238).

-define(DBNAME_REGEX,
    "^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*" % use the stock CouchDB regex
    "(\\.[0-9]{10,})?$" % but allow an optional shard timestamp at the end
).

-define(FIRST_DDOC_KEY, <<"_design/">>).
-define(LAST_DDOC_KEY, <<"_design0">>).

-define(RETURN(Term), throw({?MODULE, Term})).

-define(DEFAULT_UPDATE_DOCS_BATCH_SIZE, 2500000).


-record(bacc, {
    db,
    docs,
    batch_size,
    options,
    rev_futures,
    seen,
    results
}).


create(DbName, Options) ->
    case validate_dbname(DbName) of
        ok ->
            Result = fabric2_fdb:transactional(DbName, fun(TxDb) ->
                case fabric2_fdb:exists(TxDb) of
                    true ->
                        {error, file_exists};
                    false ->
                        fabric2_fdb:create(TxDb, Options)
                end
            end),
            % We cache outside of the transaction so that we're sure
            % that the transaction was committed.
            case Result of
                #{} = Db0 ->
                    Db1 = maybe_add_sys_db_callbacks(Db0),
                    ok = fabric2_server:store(Db1),
                    fabric2_db_plugin:after_db_create(DbName, get_uuid(Db1)),
                    {ok, Db1#{tx := undefined}};
                Error ->
                    Error
            end;
        Error ->
            Error
    end.


open(DbName, Options) ->
    UUID = fabric2_util:get_value(uuid, Options),
    case fabric2_server:fetch(DbName, UUID) of
        #{} = Db ->
            Db1 = maybe_set_user_ctx(Db, Options),
            Db2 = maybe_set_interactive(Db1, Options),
            {ok, require_member_check(Db2)};
        undefined ->
            Result = fabric2_fdb:transactional(DbName, fun(TxDb) ->
                fabric2_fdb:open(TxDb, Options)
            end),
            % Cache outside the transaction retry loop
            case Result of
                #{} = Db0 ->
                    Db1 = maybe_add_sys_db_callbacks(Db0),
                    ok = fabric2_server:store(Db1),
                    Db2 = Db1#{tx := undefined},
                    {ok, require_member_check(Db2)};
                Error ->
                    Error
            end
    end.


delete(DbName, Options) ->
    % Delete doesn't check user_ctx, that's done at the HTTP API level
    % here we just care to get the `database_does_not_exist` error thrown
    Options1 = lists:keystore(user_ctx, 1, Options, ?ADMIN_CTX),
    case lists:keyfind(deleted_at, 1, Options1) of
        {deleted_at, TimeStamp} ->
            fabric2_fdb:transactional(DbName, fun(TxDb) ->
                fabric2_fdb:remove_deleted_db(TxDb, TimeStamp)
            end);
        false ->
            {ok, Db} = open(DbName, Options1),
            Resp = fabric2_fdb:transactional(Db, fun(TxDb) ->
                fabric2_fdb:delete(TxDb)
            end),
            if Resp /= ok -> Resp; true ->
                fabric2_db_plugin:after_db_delete(DbName, get_uuid(Db)),
                fabric2_server:remove(DbName)
            end
    end.


undelete(DbName, TgtDbName, TimeStamp, Options) ->
    case validate_dbname(TgtDbName) of
        ok ->
            Resp = fabric2_fdb:transactional(DbName,
                fun(TxDb) ->
                    fabric2_fdb:undelete(TxDb, TgtDbName, TimeStamp)
                end
            ),
            if Resp /= ok -> ok; true ->
                {ok, Db} = open(TgtDbName, Options),
                fabric2_db_plugin:after_db_create(TgtDbName, get_uuid(Db))
            end,
            Resp;
        Error ->
            Error
    end.


list_dbs() ->
    list_dbs([]).


list_dbs(Options) ->
    Callback = fun(DbName, Acc) -> [DbName | Acc] end,
    DbNames = fabric2_fdb:transactional(fun(Tx) ->
        fabric2_fdb:list_dbs(Tx, Callback, [], Options)
    end),
    lists:reverse(DbNames).


list_dbs(UserFun, UserAcc0, Options) ->
    FoldFun = fun
        (DbName, Acc) -> maybe_stop(UserFun({row, [{id, DbName}]}, Acc))
    end,
    fabric2_fdb:transactional(fun(Tx) ->
        try
            UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)),
            UserAcc2 = fabric2_fdb:list_dbs(
                    Tx,
                    FoldFun,
                    UserAcc1,
                    Options
                ),
            {ok, maybe_stop(UserFun(complete, UserAcc2))}
        catch throw:{stop, FinalUserAcc} ->
            {ok, FinalUserAcc}
        end
    end).


list_dbs_info() ->
    list_dbs_info([]).


list_dbs_info(Options) ->
    Callback = fun(Value, Acc) ->
        NewAcc = case Value of
            {meta, _} -> Acc;
            {row, DbInfo} -> [DbInfo | Acc];
            complete -> Acc
        end,
        {ok, NewAcc}
    end,
    {ok, DbInfos} = list_dbs_info(Callback, [], Options),
    {ok, lists:reverse(DbInfos)}.


list_dbs_info(UserFun, UserAcc0, Options) ->
    FoldFun = fun(DbName, InfoFuture, {FutureQ, Count, Acc}) ->
        NewFutureQ = queue:in({DbName, InfoFuture}, FutureQ),
        drain_info_futures(NewFutureQ, Count + 1, UserFun, Acc)
    end,
    fabric2_fdb:transactional(fun(Tx) ->
        try
            UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)),
            InitAcc = {queue:new(), 0, UserAcc1},
            {FinalFutureQ, _, UserAcc2} = fabric2_fdb:list_dbs_info(
                    Tx,
                    FoldFun,
                    InitAcc,
                    Options
                ),
            UserAcc3 = drain_all_info_futures(FinalFutureQ, UserFun, UserAcc2),
            {ok, maybe_stop(UserFun(complete, UserAcc3))}
        catch throw:{stop, FinalUserAcc} ->
            {ok, FinalUserAcc}
        end
    end).


list_deleted_dbs_info() ->
    list_deleted_dbs_info([]).


list_deleted_dbs_info(Options) ->
    Callback = fun(Value, Acc) ->
        NewAcc = case Value of
            {meta, _} -> Acc;
            {row, DbInfo} -> [DbInfo | Acc];
            complete -> Acc
        end,
        {ok, NewAcc}
    end,
    {ok, DbInfos} = list_deleted_dbs_info(Callback, [], Options),
    {ok, lists:reverse(DbInfos)}.


list_deleted_dbs_info(UserFun, UserAcc0, Options0) ->
    Dir = fabric2_util:get_value(dir, Options0, fwd),
    StartKey0 = fabric2_util:get_value(start_key, Options0),
    EndKey0 = fabric2_util:get_value(end_key, Options0),

    {FirstBinary, LastBinary} = case Dir of
        fwd -> {<<>>, <<255>>};
        rev -> {<<255>>, <<>>}
    end,

    StartKey1 = case StartKey0 of
        undefined ->
            {FirstBinary};
        DbName0 when is_binary(DbName0) ->
            {DbName0, FirstBinary};
        [DbName0, TimeStamp0] when is_binary(DbName0), is_binary(TimeStamp0) ->
            {DbName0, TimeStamp0};
        BadStartKey ->
            erlang:error({invalid_start_key, BadStartKey})
    end,
    EndKey1 = case EndKey0 of
        undefined ->
            {LastBinary};
        DbName1 when is_binary(DbName1) ->
            {DbName1, LastBinary};
        [DbName1, TimeStamp1] when is_binary(DbName1), is_binary(TimeStamp1) ->
            {DbName1, TimeStamp1};
        BadEndKey ->
            erlang:error({invalid_end_key, BadEndKey})
    end,

    Options1 = Options0 -- [{start_key, StartKey0}, {end_key, EndKey0}],
    Options2 = [
        {start_key, StartKey1},
        {end_key, EndKey1},
        {wrap_keys, false}
    ] ++ Options1,

    FoldFun = fun(DbName, TimeStamp, InfoFuture, {FutureQ, Count, Acc}) ->
        NewFutureQ = queue:in({DbName, TimeStamp, InfoFuture}, FutureQ),
        drain_deleted_info_futures(NewFutureQ, Count + 1, UserFun, Acc)
    end,
    fabric2_fdb:transactional(fun(Tx) ->
        try
            UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)),
            InitAcc = {queue:new(), 0, UserAcc1},
            {FinalFutureQ, _, UserAcc2} = fabric2_fdb:list_deleted_dbs_info(
                    Tx,
                    FoldFun,
                    InitAcc,
                    Options2
                ),
            UserAcc3 = drain_all_deleted_info_futures(
                    FinalFutureQ,
                    UserFun,
                    UserAcc2
                ),
            {ok, maybe_stop(UserFun(complete, UserAcc3))}
        catch throw:{stop, FinalUserAcc} ->
            {ok, FinalUserAcc}
        end
    end).


is_admin(Db, {SecProps}) when is_list(SecProps) ->
    case fabric2_db_plugin:check_is_admin(Db) of
        true ->
            true;
        false ->
            UserCtx = get_user_ctx(Db),
            {Admins} = get_admins(SecProps),
            is_authorized(Admins, UserCtx)
    end.


check_is_admin(Db) ->
    check_is_admin(Db, get_security(Db)).


check_is_admin(Db, SecDoc) ->
    case is_admin(Db, SecDoc) of
        true ->
            ok;
        false ->
            UserCtx = get_user_ctx(Db),
            Reason = <<"You are not a db or server admin.">>,
            throw_security_error(UserCtx, Reason)
    end.


check_is_member(Db) ->
    check_is_member(Db, get_security(Db)).


check_is_member(Db, SecDoc) ->
    case is_member(Db, SecDoc) of
        true ->
            ok;
        false ->
            UserCtx = get_user_ctx(Db),
            throw_security_error(UserCtx)
    end.


require_admin_check(#{} = Db) ->
    Db#{security_fun := fun check_is_admin/2}.


require_member_check(#{} = Db) ->
    Db#{security_fun := fun check_is_member/2}.


name(#{name := DbName}) ->
    DbName.


get_after_doc_read_fun(#{after_doc_read := AfterDocRead}) ->
    AfterDocRead.


get_before_doc_update_fun(#{before_doc_update := BeforeDocUpdate}) ->
    BeforeDocUpdate.

get_committed_update_seq(#{} = Db) ->
    get_update_seq(Db).


get_compacted_seq(#{} = Db) ->
    get_update_seq(Db).


get_compactor_pid(#{} = _Db) ->
    nil.


get_db_info(#{} = Db) ->
    DbProps = fabric2_fdb:transactional(Db, fun(TxDb) ->
        fabric2_fdb:get_info(TxDb)
    end),
    {ok, make_db_info(name(Db), DbProps)}.


get_del_doc_count(#{} = Db) ->
    get_doc_count(Db, <<"doc_del_count">>).


get_doc_count(Db) ->
    get_doc_count(Db, <<"doc_count">>).


get_doc_count(Db, undefined) ->
    get_doc_count(Db, <<"doc_count">>);

get_doc_count(Db, <<"_all_docs">>) ->
    get_doc_count(Db, <<"doc_count">>);

get_doc_count(DbName, <<"_design">>) ->
    get_doc_count(DbName, <<"doc_design_count">>);

get_doc_count(DbName, <<"_local">>) ->
    get_doc_count(DbName, <<"doc_local_count">>);

get_doc_count(Db, Key) ->
    fabric2_fdb:transactional(Db, fun(TxDb) ->
        fabric2_fdb:get_stat(TxDb, Key)
    end).


get_instance_start_time(#{}) ->
    0.


get_pid(#{}) ->
    nil.


get_revs_limit(#{} = Db) ->
    get_revs_limit(Db, []).


get_revs_limit(#{} = Db, Opts) ->
    CurrentDb = get_cached_db(Db, Opts),
    maps:get(revs_limit, CurrentDb).


get_security(#{} = Db) ->
    get_security(Db, []).


get_security(#{} = Db, Opts) ->
    CurrentDb = get_cached_db(Db, Opts),
    maps:get(security_doc, CurrentDb).


get_update_seq(#{} = Db) ->
    fabric2_fdb:transactional(Db, fun(TxDb) ->
        fabric2_fdb:get_last_change(TxDb)
    end).


get_user_ctx(#{user_ctx := UserCtx}) ->
    UserCtx.


get_uuid(#{uuid := UUID}) ->
    UUID.


is_clustered(#{}) ->
    false.


is_db(#{name := _}) ->
    true;
is_db(_) ->
    false.


is_partitioned(#{}) ->
    false.


is_system_db(#{name := DbName}) ->
    is_system_db_name(DbName).


is_system_db_name(DbName) when is_list(DbName) ->
    is_system_db_name(?l2b(DbName));
is_system_db_name(DbName) when is_binary(DbName) ->
    Suffix = filename:basename(DbName),
    case {filename:dirname(DbName), lists:member(Suffix, ?SYSTEM_DATABASES)} of
        {<<".">>, Result} -> Result;
        {_Prefix, false} -> false;
        {Prefix, true} ->
            ReOpts =  [{capture,none}, dollar_endonly],
            re:run(Prefix, ?DBNAME_REGEX, ReOpts) == match
    end.


is_replicator_db(#{name := DbName}) ->
    is_replicator_db(DbName);

is_replicator_db(DbName) when is_binary(DbName) ->
    fabric2_util:dbname_ends_with(DbName, <<"_replicator">>).


is_users_db(#{name := DbName}) ->
    is_users_db(DbName);

is_users_db(DbName) when is_binary(DbName) ->
    AuthenticationDb = config:get("chttpd_auth", "authentication_db"),
    CfgUsersSuffix = config:get("couchdb", "users_db_suffix", "_users"),

    IsAuthCache = if AuthenticationDb == undefined -> false; true ->
        DbName == ?l2b(AuthenticationDb)
    end,
    IsCfgUsersDb = fabric2_util:dbname_ends_with(DbName, ?l2b(CfgUsersSuffix)),
    IsGlobalUsersDb = fabric2_util:dbname_ends_with(DbName, <<"_users">>),

    IsAuthCache orelse IsCfgUsersDb orelse IsGlobalUsersDb.


set_revs_limit(#{} = Db0, RevsLimit) when is_integer(RevsLimit) ->
    Db1 = require_admin_check(Db0),
    Resp = fabric2_fdb:transactional(Db1, fun(TxDb) ->
        fabric2_fdb:set_config(TxDb, revs_limit, RevsLimit)
    end),
    case Resp of
        {ok, #{} = Db2} -> fabric2_server:store(Db2);
        Err -> Err
    end.


set_security(#{} = Db0, Security) ->
    Db1 = require_admin_check(Db0),
    ok = fabric2_util:validate_security_object(Security),
    Resp = fabric2_fdb:transactional(Db1, fun(TxDb) ->
        fabric2_fdb:set_config(TxDb, security_doc, Security)
    end),
    case Resp of
        {ok, #{} = Db2} -> fabric2_server:store(Db2);
        Err -> Err
    end.


set_user_ctx(#{} = Db, UserCtx) ->
    Db#{user_ctx := UserCtx}.


ensure_full_commit(#{}) ->
    {ok, 0}.


ensure_full_commit(#{}, _Timeout) ->
    {ok, 0}.


open_doc(#{} = Db, DocId) ->
    open_doc(Db, DocId, []).


open_doc(#{} = Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, _Options) ->
    fabric2_fdb:transactional(Db, fun(TxDb) ->
        case fabric2_fdb:get_local_doc(TxDb, DocId) of
            #doc{} = Doc -> {ok, Doc};
            Else -> Else
        end
    end);

open_doc(#{} = Db, DocId, Options) ->
    NeedsTreeOpts = [revs_info, conflicts, deleted_conflicts],
    NeedsTree = (Options -- NeedsTreeOpts /= Options),
    OpenDeleted = lists:member(deleted, Options),
    fabric2_fdb:transactional(Db, fun(TxDb) ->
        Revs = case NeedsTree of
            true -> fabric2_fdb:get_all_revs(TxDb, DocId);
            false -> fabric2_fdb:get_winning_revs(TxDb, DocId, 1)
        end,
        if Revs == [] -> {not_found, missing}; true ->
            #{winner := true} = RI = lists:last(Revs),
            case fabric2_fdb:get_doc_body(TxDb, DocId, RI) of
                #doc{deleted = true} when not OpenDeleted ->
                    {not_found, deleted};
                #doc{} = Doc ->
                    apply_open_doc_opts(Doc, Revs, Options);
                Else ->
                    Else
            end
        end
    end).


open_doc_revs(Db, DocId, Revs, Options) ->
    Latest = lists:member(latest, Options),
    fabric2_fdb:transactional(Db, fun(TxDb) ->
        AllRevInfos = fabric2_fdb:get_all_revs(TxDb, DocId),
        RevTree = lists:foldl(fun(RI, TreeAcc) ->
            RIPath = fabric2_util:revinfo_to_path(RI),
            {Merged, _} = couch_key_tree:merge(TreeAcc, RIPath),
            Merged
        end, [], AllRevInfos),
        {Found, Missing} = case Revs of
            all ->
                {couch_key_tree:get_all_leafs(RevTree), []};
            _ when Latest ->
                couch_key_tree:get_key_leafs(RevTree, Revs);
            _ ->
                couch_key_tree:get(RevTree, Revs)
        end,
        Docs = lists:map(fun({Value, {Pos, [Rev | RevPath]}}) ->
            case Value of
                ?REV_MISSING ->
                    % We have the rev in our list but know nothing about it
                    {{not_found, missing}, {Pos, Rev}};
                _ ->
                    RevInfo = #{
                        rev_id => {Pos, Rev},
                        rev_path => RevPath
                    },
                    case fabric2_fdb:get_doc_body(TxDb, DocId, RevInfo) of
                        #doc{} = Doc ->
                            apply_open_doc_opts(Doc, AllRevInfos, Options);
                        Else ->
                            {Else, {Pos, Rev}}
                    end
            end
        end, Found),
        MissingDocs = [{{not_found, missing}, MRev} || MRev <- Missing],
        {ok, Docs ++ MissingDocs}
    end).


get_doc_info(Db, DocId) ->
    case get_full_doc_info(Db, DocId) of
        not_found -> not_found;
        FDI -> couch_doc:to_doc_info(FDI)
    end.


get_full_doc_info(Db, DocId) ->
    RevInfos = fabric2_fdb:transactional(Db, fun(TxDb) ->
        fabric2_fdb:get_all_revs(TxDb, DocId)
    end),
    if RevInfos == [] -> not_found; true ->
        #{winner := true} = Winner = lists:last(RevInfos),
        RevTree = lists:foldl(fun(RI, TreeAcc) ->
            RIPath = fabric2_util:revinfo_to_path(RI),
            {Merged, _} = couch_key_tree:merge(TreeAcc, RIPath),
            Merged
        end, [], RevInfos),
        #full_doc_info{
            id = DocId,
            update_seq = fabric2_fdb:vs_to_seq(maps:get(sequence, Winner)),
            deleted = maps:get(deleted, Winner),
            rev_tree = RevTree
        }
    end.


get_full_doc_infos(Db, DocIds) ->
    fabric2_fdb:transactional(Db, fun(TxDb) ->
        lists:map(fun(DocId) ->
            get_full_doc_info(TxDb, DocId)
        end, DocIds)
    end).


get_missing_revs(Db, JsonIdRevs) ->
    IdRevs = [idrevs(IdR) || IdR <- JsonIdRevs],
    AllRevInfos = fabric2_fdb:transactional(Db, fun(TxDb) ->
        lists:foldl(fun({Id, _Revs}, Acc) ->
            case maps:is_key(Id, Acc) of
                true ->
                    Acc;
                false ->
                    RevInfos = fabric2_fdb:get_all_revs(TxDb, Id),
                    Acc#{Id => RevInfos}
            end
        end, #{}, IdRevs)
    end),
    AllMissing = lists:flatmap(fun({Id, Revs}) ->
        #{Id := RevInfos} = AllRevInfos,
        Missing = try
            lists:foldl(fun(RevInfo, RevAcc) ->
                if RevAcc /= [] -> ok; true ->
                    throw(all_found)
                end,
                filter_found_revs(RevInfo, RevAcc)
            end, Revs, RevInfos)
        catch throw:all_found ->
            []
        end,
        if Missing == [] -> []; true ->
            PossibleAncestors = find_possible_ancestors(RevInfos, Missing),
            [{Id, Missing, PossibleAncestors}]
        end
    end, IdRevs),
    {ok, AllMissing}.


get_design_docs(Db) ->
    fabric2_fdb:transactional(Db, fun(TxDb) ->
        #{
            db_prefix := DbPrefix
        } = TxDb,

        Prefix = erlfdb_tuple:pack({?DB_ALL_DOCS}, DbPrefix),
        Options = set_design_doc_keys([]),
        FoldFun = fun({Key, Val}, Acc) ->
            {DocId} = erlfdb_tuple:unpack(Key, Prefix),
            RevId = erlfdb_tuple:unpack(Val),
            Rev = #{
                rev_id => RevId,
                rev_path => []
            },
            Future = fabric2_fdb:get_doc_body_future(TxDb, DocId, Rev),
            [{DocId, Rev, Future} | Acc]
        end,
        Futures = fabric2_fdb:fold_range(TxDb, Prefix, FoldFun, [], Options),

        % Using foldl instead of map means that the design
        % docs come out in sorted order.
        lists:foldl(fun({DocId, Rev, Future}, Acc) ->
            [fabric2_fdb:get_doc_body_wait(TxDb, DocId, Rev, Future) | Acc]
        end, [], Futures)
    end).


validate_docid(<<"">>) ->
    throw({illegal_docid, <<"Document id must not be empty">>});
validate_docid(<<"_design/">>) ->
    throw({illegal_docid, <<"Illegal document id `_design/`">>});
validate_docid(<<"_local/">>) ->
    throw({illegal_docid, <<"Illegal document id `_local/`">>});
validate_docid(Id) when is_binary(Id) ->
    MaxLen = case config:get("couchdb", "max_document_id_length", "infinity") of
        "infinity" -> infinity;
        IntegerVal -> list_to_integer(IntegerVal)
    end,
    case MaxLen > 0 andalso byte_size(Id) > MaxLen of
        true -> throw({illegal_docid, <<"Document id is too long">>});
        false -> ok
    end,
    case couch_util:validate_utf8(Id) of
        false -> throw({illegal_docid, <<"Document id must be valid UTF-8">>});
        true -> ok
    end,
    case Id of
    <<?DESIGN_DOC_PREFIX, _/binary>> -> ok;
    <<?LOCAL_DOC_PREFIX, _/binary>> -> ok;
    <<"_", _/binary>> ->
        case fabric2_db_plugin:validate_docid(Id) of
            true ->
                ok;
            false ->
                throw(
                  {illegal_docid,
                   <<"Only reserved document ids may start with underscore.">>})
        end;
    _Else -> ok
    end;
validate_docid(Id) ->
    couch_log:debug("Document id is not a string: ~p", [Id]),
    throw({illegal_docid, <<"Document id must be a string">>}).


update_doc(Db, Doc) ->
    update_doc(Db, Doc, []).


update_doc(Db, Doc, Options) ->
    case update_docs(Db, [Doc], Options) of
        {ok, [{ok, NewRev}]} ->
            {ok, NewRev};
        {ok, [{{_Id, _Rev}, Error}]} ->
            throw(Error);
        {error, [{{_Id, _Rev}, Error}]} ->
            throw(Error);
        {error, [Error]} ->
            throw(Error);
        {ok, []} ->
            % replication success
            {Pos, [RevId | _]} = Doc#doc.revs,
            {ok, {Pos, RevId}}
    end.


update_docs(Db, Docs) ->
    update_docs(Db, Docs, []).


update_docs(Db, Docs0, Options) ->
    Docs1 = apply_before_doc_update(Db, Docs0, Options),
    try
        validate_atomic_update(Docs0, lists:member(all_or_nothing, Options)),

        Resps0 = batch_update_docs(Db, Docs1, Options),

        % Notify index builder
        fabric2_index:db_updated(name(Db)),

        % Convert errors
        Resps1 = lists:map(fun(Resp) ->
            case Resp of
                {#doc{} = Doc, Error} ->
                    #doc{
                        id = DocId,
                        revs = Revs
                    } = Doc,
                    RevId = case Revs of
                        {RevPos, [Rev | _]} -> {RevPos, Rev};
                        {0, []} -> {0, <<>>};
                        Else -> Else
                    end,
                    {{DocId, RevId}, Error};
                Else ->
                    Else
            end
        end, Resps0),
        case is_replicated(Options) of
            true ->
                {ok, lists:flatmap(fun(R) ->
                    case R of
                        {ok, []} -> [];
                        {{_, _}, {ok, []}} -> [];
                        Else -> [Else]
                    end
                end, Resps1)};
            false ->
                {ok, Resps1}
        end
    catch throw:{aborted, Errors} ->
        {aborted, Errors}
    end.


read_attachment(Db, DocId, AttId) ->
    fabric2_fdb:transactional(Db, fun(TxDb) ->
        fabric2_fdb:read_attachment(TxDb, DocId, AttId)
    end).


write_attachment(Db, DocId, Att) ->
    Data = couch_att:fetch(data, Att),
    Encoding = couch_att:fetch(encoding, Att),
    {ok, AttId} = fabric2_fdb:write_attachment(Db, DocId, Data, Encoding),
    couch_att:store(data, {loc, Db, DocId, AttId}, Att).


fold_docs(Db, UserFun, UserAcc) ->
    fold_docs(Db, UserFun, UserAcc, []).


fold_docs(Db, UserFun, UserAcc0, Options) ->
    fabric2_fdb:transactional(Db, fun(TxDb) ->
        try
            #{
                db_prefix := DbPrefix
            } = TxDb,

            Prefix = erlfdb_tuple:pack({?DB_ALL_DOCS}, DbPrefix),
            Meta = get_all_docs_meta(TxDb, Options),

            UserAcc1 = maybe_stop(UserFun({meta, Meta}, UserAcc0)),

            UserAcc2 = fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) ->
                {DocId} = erlfdb_tuple:unpack(K, Prefix),
                RevId = erlfdb_tuple:unpack(V),
                Row0 =  [
                    {id, DocId},
                    {key, DocId},
                    {value, {[{rev, couch_doc:rev_to_str(RevId)}]}}
                ],

                DocOpts = couch_util:get_value(doc_opts, Options, []),
                OpenOpts = [deleted | DocOpts],

                Row1 = case lists:keyfind(include_docs, 1, Options) of
                    {include_docs, true} ->
                        Row0 ++ open_json_doc(TxDb, DocId, OpenOpts, DocOpts);
                    _ ->
                        Row0
                end,

                maybe_stop(UserFun({row, Row1}, Acc))
            end, UserAcc1, Options),

            {ok, maybe_stop(UserFun(complete, UserAcc2))}
        catch throw:{stop, FinalUserAcc} ->
            {ok, FinalUserAcc}
        end
    end).


fold_docs(Db, DocIds, UserFun, UserAcc0, Options) ->
    fabric2_fdb:transactional(Db, fun(TxDb) ->
        try
            NeedsTreeOpts = [revs_info, conflicts, deleted_conflicts],
            NeedsTree = (Options -- NeedsTreeOpts /= Options),

            InitAcc = #{
                revs_q => queue:new(),
                revs_count => 0,
                body_q => queue:new(),
                body_count => 0,
                doc_opts => Options,
                user_acc => UserAcc0,
                user_fun => UserFun
            },

            FinalAcc1 = lists:foldl(fun(DocId, Acc) ->
                #{
                    revs_q := RevsQ,
                    revs_count := RevsCount
                } = Acc,
                Future = fold_docs_get_revs(TxDb, DocId, NeedsTree),
                NewAcc = Acc#{
                    revs_q := queue:in({DocId, Future}, RevsQ),
                    revs_count := RevsCount + 1
                },
                drain_fold_docs_revs_futures(TxDb, NewAcc)
            end, InitAcc, DocIds),

            FinalAcc2 = drain_all_fold_docs_revs_futures(TxDb, FinalAcc1),
            FinalAcc3 = drain_all_fold_docs_body_futures(TxDb, FinalAcc2),

            #{
                user_acc := FinalUserAcc
            } = FinalAcc3,
            {ok, FinalUserAcc}

        catch throw:{stop, StopUserAcc} ->
            {ok, StopUserAcc}
        end
    end).




fold_design_docs(Db, UserFun, UserAcc0, Options1) ->
    Options2 = set_design_doc_keys(Options1),
    fold_docs(Db, UserFun, UserAcc0, Options2).


fold_local_docs(Db, UserFun, UserAcc0, Options0) ->
     % This is mostly for testing and sanity checking. When calling from a test
     % namespace will be automatically set. We also assert when called from the
     % API the correct namespace was set
     Options = case lists:keyfind(namespace, 1, Options0) of
         {namespace, <<"_local">>} -> Options0;
         false -> [{namespace, <<"_local">>} | Options0]
     end,
     fabric2_fdb:transactional(Db, fun(TxDb) ->
        try
            #{
                db_prefix := DbPrefix
            } = TxDb,

            Prefix = erlfdb_tuple:pack({?DB_LOCAL_DOCS}, DbPrefix),
            Meta = get_all_docs_meta(TxDb, Options),

            UserAcc1 = maybe_stop(UserFun({meta, Meta}, UserAcc0)),

            UserAcc2 = fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) ->
                {DocId} = erlfdb_tuple:unpack(K, Prefix),
                Rev = fabric2_fdb:get_local_doc_rev(TxDb, DocId, V),
                maybe_stop(UserFun({row, [
                    {id, DocId},
                    {key, DocId},
                    {value, {[{rev, couch_doc:rev_to_str({0, Rev})}]}}
                ]}, Acc))
            end, UserAcc1, Options),

            {ok, maybe_stop(UserFun(complete, UserAcc2))}
        catch throw:{stop, FinalUserAcc} ->
            {ok, FinalUserAcc}
        end
    end).


fold_changes(Db, SinceSeq, UserFun, UserAcc) ->
    fold_changes(Db, SinceSeq, UserFun, UserAcc, []).


fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) ->
    fabric2_fdb:transactional(Db, fun(TxDb) ->
        try
            #{
                db_prefix := DbPrefix
            } = TxDb,

            Prefix = erlfdb_tuple:pack({?DB_CHANGES}, DbPrefix),

            Dir = case fabric2_util:get_value(dir, Options, fwd) of
                rev -> rev;
                _ -> fwd
            end,

            RestartTx = case fabric2_util:get_value(restart_tx, Options) of
                undefined -> [{restart_tx, true}];
                _AlreadySet -> []
            end,

            StartKey = get_since_seq(TxDb, Dir, SinceSeq),
            EndKey = case fabric2_util:get_value(end_key, Options) of
                undefined when Dir == rev ->
                    fabric2_util:seq_zero_vs();
                undefined ->
                    fabric2_util:seq_max_vs();
                EK when is_binary(EK) ->
                    fabric2_fdb:seq_to_vs(EK);
                EK when is_tuple(EK), element(1, EK) == versionstamp ->
                    EK
            end,
            BaseOpts = [{start_key, StartKey}] ++ RestartTx ++ Options,
            FoldOpts = lists:keystore(end_key, 1, BaseOpts, {end_key, EndKey}),

            {ok, fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) ->
                {SeqVS} = erlfdb_tuple:unpack(K, Prefix),
                {DocId, Deleted, RevId} = erlfdb_tuple:unpack(V),

                Change = #{
                    id => DocId,
                    sequence => fabric2_fdb:vs_to_seq(SeqVS),
                    rev_id => RevId,
                    deleted => Deleted
                },

                maybe_stop(UserFun(Change, Acc))
            end, UserAcc, FoldOpts)}
        catch throw:{stop, FinalUserAcc} ->
            {ok, FinalUserAcc}
        end
    end).


dbname_suffix(DbName) ->
    filename:basename(DbName).


validate_dbname(DbName) when is_list(DbName) ->
    validate_dbname(?l2b(DbName));

validate_dbname(DbName) when is_binary(DbName) ->
    fabric2_db_plugin:validate_dbname(
        DbName, DbName, fun validate_dbname_int/2).

validate_dbname_int(DbName, DbName) when is_binary(DbName) ->
    case validate_dbname_length(DbName) of
        ok -> validate_dbname_pat(DbName);
        {error, _} = Error -> Error
    end.


validate_dbname_length(DbName) ->
    MaxLength = config:get_integer("couchdb", "max_database_name_length",
        ?DEFAULT_MAX_DATABASE_NAME_LENGTH),
    case byte_size(DbName) =< MaxLength of
        true -> ok;
        false -> {error, {database_name_too_long, DbName}}
    end.


validate_dbname_pat(DbName) ->
    case re:run(DbName, ?DBNAME_REGEX, [{capture,none}, dollar_endonly]) of
        match ->
            ok;
        nomatch ->
            case is_system_db_name(DbName) of
                true -> ok;
                false -> {error, {illegal_database_name, DbName}}
            end
    end.


maybe_add_sys_db_callbacks(Db) ->
    IsReplicatorDb = is_replicator_db(Db),
    IsUsersDb = is_users_db(Db),

    {BDU, ADR} = if
        IsReplicatorDb ->
            {
                fun couch_replicator_docs:before_doc_update/3,
                fun couch_replicator_docs:after_doc_read/2
            };
        IsUsersDb ->
            {
                fun fabric2_users_db:before_doc_update/3,
                fun fabric2_users_db:after_doc_read/2
            };
        true ->
            {undefined, undefined}
    end,

    Db#{
        before_doc_update := BDU,
        after_doc_read := ADR
    }.


make_db_info(DbName, Props) ->
    BaseProps = [
        {cluster, {[{n, 0}, {q, 0}, {r, 0}, {w, 0}]}},
        {compact_running, false},
        {data_size, 0},
        {db_name, DbName},
        {disk_format_version, 0},
        {disk_size, 0},
        {instance_start_time, <<"0">>},
        {purge_seq, 0}
    ],

    lists:foldl(fun({Key, Val}, Acc) ->
        lists:keystore(Key, 1, Acc, {Key, Val})
    end, BaseProps, Props).


drain_info_futures(FutureQ, Count, _UserFun, Acc) when Count < 100 ->
    {FutureQ, Count, Acc};

drain_info_futures(FutureQ, Count, UserFun, Acc) when Count >= 100 ->
    {{value, {DbName, Future}}, RestQ} = queue:out(FutureQ),
    InfoProps = fabric2_fdb:get_info_wait(Future),
    DbInfo = make_db_info(DbName, InfoProps),
    NewAcc = maybe_stop(UserFun({row, DbInfo}, Acc)),
    {RestQ, Count - 1, NewAcc}.


drain_all_info_futures(FutureQ, UserFun, Acc) ->
    case queue:out(FutureQ) of
        {{value, {DbName, Future}}, RestQ} ->
            InfoProps = fabric2_fdb:get_info_wait(Future),
            DbInfo = make_db_info(DbName, InfoProps),
            NewAcc = maybe_stop(UserFun({row, DbInfo}, Acc)),
            drain_all_info_futures(RestQ, UserFun, NewAcc);
        {empty, _} ->
            Acc
    end.


drain_deleted_info_futures(FutureQ, Count, _UserFun, Acc) when Count < 100 ->
    {FutureQ, Count, Acc};

drain_deleted_info_futures(FutureQ, Count, UserFun, Acc) when Count >= 100 ->
    {{value, {DbName, TimeStamp, Future}}, RestQ} = queue:out(FutureQ),
    BaseProps = fabric2_fdb:get_info_wait(Future),
    DeletedProps = BaseProps ++ [
        {deleted, true},
        {timestamp, TimeStamp}
    ],
    DbInfo = make_db_info(DbName, DeletedProps),
    NewAcc = maybe_stop(UserFun({row, DbInfo}, Acc)),
    {RestQ, Count - 1, NewAcc}.


drain_all_deleted_info_futures(FutureQ, UserFun, Acc) ->
    case queue:out(FutureQ) of
        {{value, {DbName, TimeStamp, Future}}, RestQ} ->
            BaseProps = fabric2_fdb:get_info_wait(Future),
            DeletedProps = BaseProps ++ [
                {deleted, true},
                {timestamp, TimeStamp}
            ],
            DbInfo = make_db_info(DbName, DeletedProps),
            NewAcc = maybe_stop(UserFun({row, DbInfo}, Acc)),
            drain_all_deleted_info_futures(RestQ, UserFun, NewAcc);
        {empty, _} ->
            Acc
    end.


fold_docs_get_revs(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, _) ->
    fabric2_fdb:get_local_doc_rev_future(Db, DocId);

fold_docs_get_revs(Db, DocId, true) ->
    fabric2_fdb:get_all_revs_future(Db, DocId);

fold_docs_get_revs(Db, DocId, false) ->
    fabric2_fdb:get_winning_revs_future(Db, DocId, 1).


fold_docs_get_revs_wait(_Db, <<?LOCAL_DOC_PREFIX, _/binary>>, RevsFuture) ->
    Rev = fabric2_fdb:get_local_doc_rev_wait(RevsFuture),
    [Rev];

fold_docs_get_revs_wait(Db, _DocId, RevsFuture) ->
    fabric2_fdb:get_revs_wait(Db, RevsFuture).


fold_docs_get_doc_body_future(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId,
        [Rev]) ->
    fabric2_fdb:get_local_doc_body_future(Db, DocId, Rev);

fold_docs_get_doc_body_future(Db, DocId, Revs) ->
    Winner = get_rev_winner(Revs),
    fabric2_fdb:get_doc_body_future(Db, DocId, Winner).


fold_docs_get_doc_body_wait(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, [Rev],
        _DocOpts, BodyFuture) ->
    case fabric2_fdb:get_local_doc_body_wait(Db, DocId, Rev, BodyFuture) of
        {not_found, missing} -> {not_found, missing};
        Doc -> {ok, Doc}
    end;

fold_docs_get_doc_body_wait(Db, DocId, Revs, DocOpts, BodyFuture) ->
    RevInfo = get_rev_winner(Revs),
    Base = fabric2_fdb:get_doc_body_wait(Db, DocId, RevInfo,
        BodyFuture),
    apply_open_doc_opts(Base, Revs, DocOpts).


drain_fold_docs_revs_futures(_TxDb, #{revs_count := C} = Acc) when C < 100 ->
    Acc;
drain_fold_docs_revs_futures(TxDb, Acc) ->
    drain_one_fold_docs_revs_future(TxDb, Acc).


drain_all_fold_docs_revs_futures(_TxDb, #{revs_count := C} = Acc) when C =< 0 ->
    Acc;
drain_all_fold_docs_revs_futures(TxDb, #{revs_count := C} = Acc) when C > 0 ->
    NewAcc = drain_one_fold_docs_revs_future(TxDb, Acc),
    drain_all_fold_docs_revs_futures(TxDb, NewAcc).


drain_one_fold_docs_revs_future(TxDb, Acc) ->
    #{
        revs_q := RevsQ,
        revs_count := RevsCount,
        body_q := BodyQ,
        body_count := BodyCount
    } = Acc,
    {{value, {DocId, RevsFuture}}, RestRevsQ} = queue:out(RevsQ),

    Revs = fold_docs_get_revs_wait(TxDb, DocId, RevsFuture),
    DocFuture = case Revs of
        [] ->
            {DocId, [], not_found};
        [_ | _] ->
            BodyFuture = fold_docs_get_doc_body_future(TxDb, DocId, Revs),
            {DocId, Revs, BodyFuture}
    end,
    NewAcc = Acc#{
        revs_q := RestRevsQ,
        revs_count := RevsCount - 1,
        body_q := queue:in(DocFuture, BodyQ),
        body_count := BodyCount + 1
    },
    drain_fold_docs_body_futures(TxDb, NewAcc).


drain_fold_docs_body_futures(_TxDb, #{body_count := C} = Acc) when C < 100 ->
    Acc;
drain_fold_docs_body_futures(TxDb, Acc) ->
    drain_one_fold_docs_body_future(TxDb, Acc).


drain_all_fold_docs_body_futures(_TxDb, #{body_count := C} = Acc) when C =< 0 ->
    Acc;
drain_all_fold_docs_body_futures(TxDb, #{body_count := C} = Acc) when C > 0 ->
    NewAcc = drain_one_fold_docs_body_future(TxDb, Acc),
    drain_all_fold_docs_body_futures(TxDb, NewAcc).


drain_one_fold_docs_body_future(TxDb, Acc) ->
    #{
        body_q := BodyQ,
        body_count := BodyCount,
        doc_opts := DocOpts,
        user_fun := UserFun,
        user_acc := UserAcc
    } = Acc,
    {{value, {DocId, Revs, BodyFuture}}, RestBodyQ} = queue:out(BodyQ),
    Doc = case BodyFuture of
        not_found ->
            {not_found, missing};
        _ ->
            fold_docs_get_doc_body_wait(TxDb, DocId, Revs, DocOpts, BodyFuture)
    end,
    NewUserAcc = maybe_stop(UserFun(DocId, Doc, UserAcc)),
    Acc#{
        body_q := RestBodyQ,
        body_count := BodyCount - 1,
        user_acc := NewUserAcc
    }.


get_rev_winner(Revs) ->
    [Winner] = lists:filter(fun(Rev) ->
        maps:get(winner, Rev)
    end, Revs),
    Winner.


new_revid(Db, Doc) ->
    #doc{
        id = DocId,
        body = Body,
        revs = {OldStart, OldRevs},
        atts = Atts,
        deleted = Deleted
    } = Doc,

    {NewAtts, AttSigInfo} = lists:mapfoldl(fun(Att, Acc) ->
        [Name, Type, Data, Md5] = couch_att:fetch([name, type, data, md5], Att),
        case Data of
            {loc, _, _, _} ->
                {Att, [{Name, Type, Md5} | Acc]};
            _ ->
                Att1 = couch_att:flush(Db, DocId, Att),
                Att2 = couch_att:store(revpos, OldStart + 1, Att1),
                {Att2, [{Name, Type, couch_att:fetch(md5, Att2)} | Acc]}
        end
    end, [], Atts),

    Rev = case length(Atts) == length(AttSigInfo) of
        true ->
            OldRev = case OldRevs of [] -> 0; [OldRev0 | _] -> OldRev0 end,
            SigTerm = [Deleted, OldStart, OldRev, Body, AttSigInfo],
            couch_hash:md5_hash(term_to_binary(SigTerm, [{minor_version, 1}]));
        false ->
            erlang:error(missing_att_info)
    end,

    Doc#doc{
        revs = {OldStart + 1, [Rev | OldRevs]},
        atts = NewAtts
    }.


get_all_docs_meta(TxDb, Options) ->
    NS = couch_util:get_value(namespace, Options),
    DocCount = get_doc_count(TxDb, NS),
    case lists:keyfind(update_seq, 1, Options) of
        {_, true} ->
            UpdateSeq = fabric2_db:get_update_seq(TxDb),
            [{update_seq, UpdateSeq}];
        _ ->
            []
    end ++ [{total, DocCount}, {offset, null}].


maybe_set_interactive(#{} = Db, Options) ->
    Interactive = fabric2_util:get_value(interactive, Options, false),
    Db#{interactive := Interactive}.


maybe_set_user_ctx(Db, Options) ->
    case fabric2_util:get_value(user_ctx, Options) of
        #user_ctx{} = UserCtx ->
            set_user_ctx(Db, UserCtx);
        undefined ->
            Db
    end.


is_member(Db, {SecProps}) when is_list(SecProps) ->
    case is_admin(Db, {SecProps}) of
        true ->
            true;
        false ->
            case is_public_db(SecProps) of
                true ->
                    true;
                false ->
                    {Members} = get_members(SecProps),
                    UserCtx = get_user_ctx(Db),
                    is_authorized(Members, UserCtx)
            end
    end.


is_authorized(Group, UserCtx) ->
    #user_ctx{
        name = UserName,
        roles = UserRoles
    } = UserCtx,
    Names = fabric2_util:get_value(<<"names">>, Group, []),
    Roles = fabric2_util:get_value(<<"roles">>, Group, []),
    case check_security(roles, UserRoles, [<<"_admin">> | Roles]) of
        true ->
            true;
        false ->
            check_security(names, UserName, Names)
    end.


check_security(roles, [], _) ->
    false;
check_security(roles, UserRoles, Roles) ->
    UserRolesSet = ordsets:from_list(UserRoles),
    RolesSet = ordsets:from_list(Roles),
    not ordsets:is_disjoint(UserRolesSet, RolesSet);
check_security(names, _, []) ->
    false;
check_security(names, null, _) ->
    false;
check_security(names, UserName, Names) ->
    lists:member(UserName, Names).


throw_security_error(#user_ctx{name = null} = UserCtx) ->
    Reason = <<"You are not authorized to access this db.">>,
    throw_security_error(UserCtx, Reason);
throw_security_error(#user_ctx{name = _} = UserCtx) ->
    Reason = <<"You are not allowed to access this db.">>,
    throw_security_error(UserCtx, Reason).


throw_security_error(#user_ctx{} = UserCtx, Reason) ->
    Error = security_error_type(UserCtx),
    throw({Error, Reason}).


security_error_type(#user_ctx{name = null}) ->
    unauthorized;
security_error_type(#user_ctx{name = _}) ->
    forbidden.


is_public_db(SecProps) ->
    {Members} = get_members(SecProps),
    Names = fabric2_util:get_value(<<"names">>, Members, []),
    Roles = fabric2_util:get_value(<<"roles">>, Members, []),
    Names =:= [] andalso Roles =:= [].


get_admins(SecProps) ->
    fabric2_util:get_value(<<"admins">>, SecProps, {[]}).


get_members(SecProps) ->
    % we fallback to readers here for backwards compatibility
    case fabric2_util:get_value(<<"members">>, SecProps) of
        undefined ->
            fabric2_util:get_value(<<"readers">>, SecProps, {[]});
        Members ->
            Members
    end.


apply_open_doc_opts(Doc0, Revs, Options) ->
    IncludeRevsInfo = lists:member(revs_info, Options),
    IncludeConflicts = lists:member(conflicts, Options),
    IncludeDelConflicts = lists:member(deleted_conflicts, Options),
    IncludeLocalSeq = lists:member(local_seq, Options),

    % This revs_info becomes fairly useless now that we're
    % not keeping old document bodies around...
    Meta1 = if not IncludeRevsInfo -> []; true ->
        {Pos, [Rev | RevPath]} = Doc0#doc.revs,
        RevPathMissing = lists:map(fun(R) -> {R, missing} end, RevPath),
        [{revs_info, Pos, [{Rev, available} | RevPathMissing]}]
    end,

    Meta2 = if not IncludeConflicts -> []; true ->
        Conflicts = [RI || RI = #{winner := false, deleted := false} <- Revs],
        if Conflicts == [] -> []; true ->
            ConflictRevs = [maps:get(rev_id, RI) || RI <- Conflicts],
            [{conflicts, ConflictRevs}]
        end
    end,

    Meta3 = if not IncludeDelConflicts -> []; true ->
        DelConflicts = [RI || RI = #{winner := false, deleted := true} <- Revs],
        if DelConflicts == [] -> []; true ->
            DelConflictRevs = [maps:get(rev_id, RI) || RI <- DelConflicts],
            [{deleted_conflicts, DelConflictRevs}]
        end
    end,

    Meta4 = if not IncludeLocalSeq -> []; true ->
        #{winner := true, sequence := SeqVS} = lists:last(Revs),
        [{local_seq, fabric2_fdb:vs_to_seq(SeqVS)}]
    end,

    Doc1 = case lists:keyfind(atts_since, 1, Options) of
        {_, PossibleAncestors} ->
            #doc{
                revs = DocRevs,
                atts = Atts0
            } = Doc0,
            RevPos = find_ancestor_rev_pos(DocRevs, PossibleAncestors),
            Atts1 = lists:map(fun(Att) ->
                [AttPos, Data] = couch_att:fetch([revpos, data], Att),
                if  AttPos > RevPos -> couch_att:store(data, Data, Att);
                    true -> couch_att:store(data, stub, Att)
                end
            end, Atts0),
            Doc0#doc{atts = Atts1};
        false ->
            Doc0
    end,

    {ok, Doc1#doc{meta = Meta1 ++ Meta2 ++ Meta3 ++ Meta4}}.


find_ancestor_rev_pos({_, []}, _PossibleAncestors) ->
    0;
find_ancestor_rev_pos(_DocRevs, []) ->
    0;
find_ancestor_rev_pos({RevPos, [RevId | Rest]}, AttsSinceRevs) ->
    case lists:member({RevPos, RevId}, AttsSinceRevs) of
        true -> RevPos;
        false -> find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs)
    end.


filter_found_revs(RevInfo, Revs) ->
    #{
        rev_id := {Pos, Rev},
        rev_path := RevPath
    } = RevInfo,
    FullRevPath = [Rev | RevPath],
    lists:flatmap(fun({FindPos, FindRev} = RevIdToFind) ->
        if FindPos > Pos -> [RevIdToFind]; true ->
            % Add 1 because lists:nth is 1 based
            Idx = Pos - FindPos + 1,
            case Idx > length(FullRevPath) of
                true ->
                    [RevIdToFind];
                false ->
                    case lists:nth(Idx, FullRevPath) == FindRev of
                        true -> [];
                        false -> [RevIdToFind]
                    end
            end
        end
    end, Revs).


find_possible_ancestors(RevInfos, MissingRevs) ->
    % Find any revinfos that are possible ancestors
    % of the missing revs. A possible ancestor is
    % any rev that has a start position less than
    % any missing revision. Stated alternatively,
    % find any revinfo that could theoretically
    % extended to be one or more of the missing
    % revisions.
    %
    % Since we are looking at any missing revision
    % we can just compare against the maximum missing
    % start position.
    MaxMissingPos = case MissingRevs of
        [] -> 0;
        [_ | _] -> lists:max([Start || {Start, _Rev} <- MissingRevs])
    end,
    lists:flatmap(fun(RevInfo) ->
        #{rev_id := {RevPos, _} = RevId} = RevInfo,
        case RevPos < MaxMissingPos of
            true -> [RevId];
            false -> []
        end
    end, RevInfos).


apply_before_doc_update(Db, Docs, Options) ->
    UpdateType = case lists:member(replicated_changes, Options) of
        true -> replicated_changes;
        false -> interactive_edit
    end,
    lists:map(fun(Doc) ->
        fabric2_db_plugin:before_doc_update(Db, Doc, UpdateType)
    end, Docs).


update_doc_int(#{} = Db, #doc{} = Doc, Options) ->
    IsLocal = case Doc#doc.id of
        <<?LOCAL_DOC_PREFIX, _/binary>> -> true;
        _ -> false
    end,
    try
        case {IsLocal, is_replicated(Options)} of
            {false, false} -> update_doc_interactive(Db, Doc, Options);
            {false, true} -> update_doc_replicated(Db, Doc, Options);
            {true, _} -> update_local_doc(Db, Doc, Options)
        end
    catch throw:{?MODULE, Return} ->
        Return
    end.


batch_update_docs(Db, Docs, Options) ->
    BAcc = #bacc{
        db = Db,
        docs = Docs,
        batch_size = get_batch_size(Options),
        options = Options,
        rev_futures = #{},
        seen = [],
        results = []
    },
    #bacc{results = Res} = batch_update_docs(BAcc),
    lists:reverse(Res).


batch_update_docs(#bacc{docs = []} = BAcc) ->
    BAcc;

batch_update_docs(#bacc{db = Db} = BAcc) ->
    #bacc{
        db = Db,
        docs = Docs,
        options = Options
    } = BAcc,

    BAccTx2 = fabric2_fdb:transactional(Db, fun(TxDb) ->
        BAccTx = BAcc#bacc{db = TxDb},
        case is_replicated(Options) of
            false ->
                Tagged = tag_docs(Docs),
                RevFutures = get_winning_rev_futures(TxDb, Tagged),
                BAccTx1 = BAccTx#bacc{
                    docs = Tagged,
                    rev_futures = RevFutures
                },
                batch_update_interactive_tx(BAccTx1);
            true ->
                BAccTx1 = batch_update_replicated_tx(BAccTx),
                % For replicated updates reset `seen` after every transaction
                BAccTx1#bacc{seen = []}
        end
    end),

    % Clean up after the transaction ends so we can recurse with a clean state
    maps:map(fun(Tag, RangeFuture) when is_reference(Tag) ->
        ok = erlfdb:cancel(RangeFuture, [flush])
    end, BAccTx2#bacc.rev_futures),

    BAcc1 = BAccTx2#bacc{
        db = Db,
        rev_futures = #{}
    },

    batch_update_docs(BAcc1).


batch_update_interactive_tx(#bacc{docs = []} = BAcc) ->
    BAcc;

batch_update_interactive_tx(#bacc{} = BAcc) ->
    #bacc{
        db = TxDb,
        docs = [Doc | Docs],
        options = Options,
        batch_size = MaxSize,
        rev_futures = RevFutures,
        seen = Seen,
        results = Results
    } = BAcc,
    {Res, Seen1} = try
        update_docs_interactive(TxDb, Doc, Options, RevFutures, Seen)
    catch throw:{?MODULE, Return} ->
        {Return, Seen}
    end,
    BAcc1 = BAcc#bacc{
        docs = Docs,
        results = [Res | Results],
        seen = Seen1
    },
    case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of
        true -> BAcc1;
        false -> batch_update_interactive_tx(BAcc1)
    end.


batch_update_replicated_tx(#bacc{docs = []} = BAcc) ->
    BAcc;

batch_update_replicated_tx(#bacc{} = BAcc) ->
    #bacc{
        db = TxDb,
        docs = [Doc | Docs],
        options = Options,
        batch_size = MaxSize,
        seen = Seen,
        results = Results
    } = BAcc,
    case lists:member(Doc#doc.id, Seen) of
        true ->
            % If we already updated this doc in the current transaction, wait
            % till the next transaction to update it again.
            BAcc;
        false ->
            Res = update_doc_int(TxDb, Doc, Options),
            BAcc1 = BAcc#bacc{
                docs = Docs,
                results = [Res | Results],
                seen = [Doc#doc.id | Seen]
            },
            case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of
                true -> BAcc1;
                false -> batch_update_replicated_tx(BAcc1)
            end
    end.


update_docs_interactive(Db, #doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} = Doc,
        Options, _Futures, SeenIds) ->
    {update_local_doc(Db, Doc, Options), SeenIds};

update_docs_interactive(Db, Doc, Options, Futures, SeenIds) ->
    case lists:member(Doc#doc.id, SeenIds) of
        true ->
            {conflict, SeenIds};
        false ->
            Future = maps:get(doc_tag(Doc), Futures),
            case update_doc_interactive(Db, Doc, Future, Options) of
                {ok, _} = Resp ->
                    {Resp, [Doc#doc.id | SeenIds]};
                _ = Resp ->
                    {Resp, SeenIds}
            end
    end.


update_doc_interactive(Db, Doc0, Options) ->
    % Get the current winning revision. This is needed
    % regardless of which branch we're updating. The extra
    % revision we're grabbing is an optimization to
    % save us a round trip if we end up deleting
    % the winning revision branch.
    NumRevs = if Doc0#doc.deleted -> 2; true -> 1 end,
    Future = fabric2_fdb:get_winning_revs_future(Db, Doc0#doc.id, NumRevs),
    update_doc_interactive(Db, Doc0, Future, Options).


update_doc_interactive(Db, Doc0, Future, _Options) ->
    RevInfos = fabric2_fdb:get_revs_wait(Db, Future),
    {Winner, SecondPlace} = case RevInfos of
        [] -> {not_found, not_found};
        [WRI] -> {WRI, not_found};
        [WRI, SPRI] -> {WRI, SPRI}
    end,
    WinnerRevId = case Winner of
        not_found ->
            {0, <<>>};
        _ ->
            case maps:get(deleted, Winner) of
                true -> {0, <<>>};
                false -> maps:get(rev_id, Winner)
            end
    end,

    % Check that a revision was specified if required
    Doc0RevId = doc_to_revid(Doc0),
    HasRev = Doc0RevId =/= {0, <<>>},
    if HasRev orelse WinnerRevId == {0, <<>>} -> ok; true ->
        ?RETURN({Doc0, conflict})
    end,

    % Allow inserting new deleted documents. Only works when the document has
    % never existed to match CouchDB 3.x
    case not HasRev andalso Doc0#doc.deleted andalso is_map(Winner) of
        true -> ?RETURN({Doc0, conflict});
        false -> ok
    end,

    % Get the target revision to update
    Target = case Doc0RevId == WinnerRevId of
        true ->
            Winner;
        false ->
            case fabric2_fdb:get_non_deleted_rev(Db, Doc0#doc.id, Doc0RevId) of
                #{deleted := false} = Target0 ->
                    Target0;
                not_found ->
                    % Either a missing revision or a deleted
                    % revision. Either way a conflict. Note
                    % that we get not_found for a deleted revision
                    % because we only check for the non-deleted
                    % key in fdb
                    ?RETURN({Doc0, conflict})
            end
    end,

    Doc1 = case Winner of
        #{deleted := true} when not Doc0#doc.deleted ->
            % When recreating a deleted document we want to extend
            % the winning revision branch rather than create a
            % new branch. If we did not do this we could be
            % recreating into a state that previously existed.
            Doc0#doc{revs = fabric2_util:revinfo_to_revs(Winner)};
        #{} ->
            % Otherwise we're extending the target's revision
            % history with this update
            Doc0#doc{revs = fabric2_util:revinfo_to_revs(Target)};
        not_found ->
            % Creating a new doc means our revs start empty
            Doc0
    end,

    % Validate the doc update and create the
    % new revinfo map
    Doc2 = prep_and_validate(Db, Doc1, Target),

    Doc3 = new_revid(Db, Doc2),

    #doc{
        deleted = NewDeleted,
        revs = {NewRevPos, [NewRev | NewRevPath]},
        atts = Atts
    } = Doc4 = stem_revisions(Db, Doc3),

    NewRevInfo = #{
        winner => undefined,
        exists => false,
        deleted => NewDeleted,
        rev_id => {NewRevPos, NewRev},
        rev_path => NewRevPath,
        sequence => undefined,
        branch_count => undefined,
        att_hash => fabric2_util:hash_atts(Atts),
        rev_size => fabric2_util:rev_size(Doc4)
    },

    % Gather the list of possible winnig revisions
    Possible = case Target == Winner of
        true when not Doc4#doc.deleted ->
            [NewRevInfo];
        true when Doc4#doc.deleted ->
            case SecondPlace of
                #{} -> [NewRevInfo, SecondPlace];
                not_found -> [NewRevInfo]
            end;
        false ->
            [NewRevInfo, Winner]
    end,

    % Sort the rev infos such that the winner is first
    {NewWinner0, NonWinner} = case fabric2_util:sort_revinfos(Possible) of
        [W] -> {W, not_found};
        [W, NW] -> {W, NW}
    end,

    BranchCount = case Winner of
        not_found -> 1;
        #{branch_count := BC} -> BC
    end,
    NewWinner = NewWinner0#{branch_count := BranchCount},
    ToUpdate = if NonWinner == not_found -> []; true -> [NonWinner] end,
    ToRemove = if Target == not_found -> []; true -> [Target] end,

    ok = fabric2_fdb:write_doc(
            Db,
            Doc4,
            NewWinner,
            Winner,
            ToUpdate,
            ToRemove
        ),

    {ok, {NewRevPos, NewRev}}.


update_doc_replicated(Db, Doc0, _Options) ->
    #doc{
        id = DocId,
        deleted = Deleted,
        revs = {RevPos, [Rev | RevPath]}
    } = Doc0,

    DocRevInfo0 = #{
        winner => undefined,
        exists => false,
        deleted => Deleted,
        rev_id => {RevPos, Rev},
        rev_path => RevPath,
        sequence => undefined,
        branch_count => undefined,
        att_hash => <<>>,
        rev_size => null
    },

    AllRevInfos = fabric2_fdb:get_all_revs(Db, DocId),

    RevTree = lists:foldl(fun(RI, TreeAcc) ->
        RIPath = fabric2_util:revinfo_to_path(RI),
        {Merged, _} = couch_key_tree:merge(TreeAcc, RIPath),
        Merged
    end, [], AllRevInfos),

    DocRevPath = fabric2_util:revinfo_to_path(DocRevInfo0),

    {NewTree, Status} = couch_key_tree:merge(RevTree, DocRevPath),
    if Status /= internal_node -> ok; true ->
        % We already know this revision so nothing
        % left to do.
        ?RETURN({Doc0, {ok, []}})
    end,

    % Its possible to have a replication with fewer than $revs_limit
    % revisions which extends an existing branch. To avoid
    % losing revision history we extract the new node from the
    % tree and use the combined path after stemming.
    {[{_, {RevPos, UnstemmedRevs}}], []}
            = couch_key_tree:get(NewTree, [{RevPos, Rev}]),

    Doc1 = stem_revisions(Db, Doc0#doc{revs = {RevPos, UnstemmedRevs}}),

    {RevPos, [Rev | NewRevPath]} = Doc1#doc.revs,
    DocRevInfo1 = DocRevInfo0#{rev_path := NewRevPath},

    % Find any previous revision we knew about for
    % validation and attachment handling.
    AllLeafsFull = couch_key_tree:get_all_leafs_full(NewTree),
    LeafPath = get_leaf_path(RevPos, Rev, AllLeafsFull),
    PrevRevInfo = find_prev_revinfo(RevPos, LeafPath),
    Doc2 = prep_and_validate(Db, Doc1, PrevRevInfo),
    Doc3 = flush_doc_atts(Db, Doc2),
    DocRevInfo2 = DocRevInfo1#{
        atts_hash => fabric2_util:hash_atts(Doc3#doc.atts),
        rev_size => fabric2_util:rev_size(Doc3)
    },

    % Possible winners are the previous winner and
    % the new DocRevInfo
    Winner = case fabric2_util:sort_revinfos(AllRevInfos) of
        [#{winner := true} = WRI | _] -> WRI;
        [] -> not_found
    end,
    {NewWinner0, NonWinner} = case Winner == PrevRevInfo of
        true ->
            {DocRevInfo2, not_found};
        false ->
            [W, NW] = fabric2_util:sort_revinfos([Winner, DocRevInfo2]),
            {W, NW}
    end,

    NewWinner = NewWinner0#{branch_count := length(AllLeafsFull)},
    ToUpdate = if NonWinner == not_found -> []; true -> [NonWinner] end,
    ToRemove = if PrevRevInfo == not_found -> []; true -> [PrevRevInfo] end,

    ok = fabric2_fdb:write_doc(
            Db,
            Doc3,
            NewWinner,
            Winner,
            ToUpdate,
            ToRemove
        ),

    {ok, []}.


update_local_doc(Db, Doc0, _Options) ->
    Doc1 = case increment_local_doc_rev(Doc0) of
        {ok, Updated} -> Updated;
        {error, Error} -> ?RETURN({Doc0, Error})
    end,

    ok = fabric2_fdb:write_local_doc(Db, Doc1),

    #doc{revs = {0, [Rev]}} = Doc1,
    {ok, {0, integer_to_binary(Rev)}}.


flush_doc_atts(Db, Doc) ->
    #doc{
        id = DocId,
        atts = Atts
    } = Doc,
    NewAtts = lists:map(fun(Att) ->
        case couch_att:fetch(data, Att) of
            {loc, _, _, _} ->
                Att;
            _ ->
                couch_att:flush(Db, DocId, Att)
        end
    end, Atts),
    Doc#doc{atts = NewAtts}.


get_winning_rev_futures(Db, Docs) ->
    lists:foldl(fun(Doc, Acc) ->
        #doc{
            id = DocId,
            deleted = Deleted
        } = Doc,
        IsLocal = case DocId of
            <<?LOCAL_DOC_PREFIX, _/binary>> -> true;
            _ -> false
        end,
        if IsLocal -> Acc; true ->
            NumRevs = if Deleted -> 2; true -> 1 end,
            Future = fabric2_fdb:get_winning_revs_future(Db, DocId, NumRevs),
            DocTag = doc_tag(Doc),
            Acc#{DocTag => Future}
        end
    end, #{}, Docs).


prep_and_validate(Db, NewDoc, PrevRevInfo) ->
    HasStubs = couch_doc:has_stubs(NewDoc),
    HasVDUs = [] /= maps:get(validate_doc_update_funs, Db),
    IsDDoc = case NewDoc#doc.id of
        <<?DESIGN_DOC_PREFIX, _/binary>> -> true;
        _ -> false
    end,

    WasDeleted = case PrevRevInfo of
        not_found -> false;
        #{deleted := D} -> D
    end,

    PrevDoc = case HasStubs orelse (HasVDUs and not IsDDoc) of
        true when PrevRevInfo /= not_found, not WasDeleted ->
            case fabric2_fdb:get_doc_body(Db, NewDoc#doc.id, PrevRevInfo) of
                #doc{} = PDoc -> PDoc;
                {not_found, _} -> nil
            end;
        _ ->
            nil
    end,

    MergedDoc = if not HasStubs -> NewDoc; true ->
        % This will throw an error if we have any
        % attachment stubs missing data
        couch_doc:merge_stubs(NewDoc, PrevDoc)
    end,
    check_duplicate_attachments(MergedDoc),
    validate_doc_update(Db, MergedDoc, PrevDoc),
    MergedDoc.


validate_doc_update(Db, #doc{id = <<"_design/", _/binary>>} = Doc, _) ->
    case catch check_is_admin(Db) of
        ok -> validate_ddoc(Db, Doc);
        Error -> ?RETURN({Doc, Error})
    end;
validate_doc_update(Db, Doc, PrevDoc) ->
    #{
        security_doc := Security,
        validate_doc_update_funs := VDUs
    } = Db,
    Fun = fun() ->
        JsonCtx = fabric2_util:user_ctx_to_json(Db),
        lists:map(fun(VDU) ->
            try
                case VDU(Doc, PrevDoc, JsonCtx, Security) of
                    ok -> ok;
                    Error1 -> throw(Error1)
                end
            catch throw:Error2 ->
                ?RETURN({Doc, Error2})
            end
        end, VDUs)
    end,
    Stat = [couchdb, query_server, vdu_process_time],
    if VDUs == [] -> ok; true ->
        couch_stats:update_histogram(Stat, Fun)
    end.


validate_ddoc(Db, DDoc) ->
    try
        ok = couch_mrview:validate(Db, couch_doc:with_ejson_body(DDoc))
    catch
        throw:{invalid_design_doc, Reason} ->
            throw({bad_request, invalid_design_doc, Reason});
        throw:{compilation_error, Reason} ->
            throw({bad_request, compilation_error, Reason});
        throw:Error ->
            ?RETURN({DDoc, Error})
    end.


validate_atomic_update(_, false) ->
    ok;
validate_atomic_update(AllDocs, true) ->
    % TODO actually perform the validation.  This requires some hackery, we need
    % to basically extract the prep_and_validate_updates function from couch_db
    % and only run that, without actually writing in case of a success.
    Error = {not_implemented, <<"all_or_nothing is not supported">>},
    PreCommitFailures = lists:map(fun(#doc{id=Id, revs = {Pos,Revs}}) ->
        case Revs of [] -> RevId = <<>>; [RevId|_] -> ok end,
        {{Id, {Pos, RevId}}, Error}
    end, AllDocs),
    throw({aborted, PreCommitFailures}).


check_duplicate_attachments(#doc{atts = Atts}) ->
    lists:foldl(fun(Att, Names) ->
        Name = couch_att:fetch(name, Att),
        case ordsets:is_element(Name, Names) of
            true -> throw({bad_request, <<"Duplicate attachments">>});
            false -> ordsets:add_element(Name, Names)
        end
    end, ordsets:new(), Atts).


get_since_seq(Db, rev, <<>>) ->
    get_since_seq(Db, rev, now);

get_since_seq(_Db, _Dir, Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0->
    fabric2_util:seq_zero_vs();

get_since_seq(Db, Dir, Seq) when Seq == now; Seq == <<"now">> ->
    CurrSeq = fabric2_fdb:get_last_change(Db),
    get_since_seq(Db, Dir, CurrSeq);

get_since_seq(_Db, _Dir, Seq) when is_binary(Seq), size(Seq) == 24 ->
    fabric2_fdb:next_vs(fabric2_fdb:seq_to_vs(Seq));

get_since_seq(Db, Dir, List) when is_list(List) ->
    get_since_seq(Db, Dir, list_to_binary(List));

get_since_seq(_Db, _Dir, Seq) ->
    erlang:error({invalid_since_seq, Seq}).


get_leaf_path(Pos, Rev, [{Pos, [{Rev, _RevInfo} | LeafPath]} | _]) ->
    LeafPath;
get_leaf_path(Pos, Rev, [_WrongLeaf | RestLeafs]) ->
    get_leaf_path(Pos, Rev, RestLeafs).


find_prev_revinfo(_Pos, []) ->
    not_found;
find_prev_revinfo(Pos, [{_Rev, ?REV_MISSING} | RestPath]) ->
    find_prev_revinfo(Pos - 1, RestPath);
find_prev_revinfo(_Pos, [{_Rev, #{} = RevInfo} | _]) ->
    RevInfo.


increment_local_doc_rev(#doc{deleted = true} = Doc) ->
    {ok, Doc#doc{revs = {0, [0]}}};
increment_local_doc_rev(#doc{revs = {0, []}} = Doc) ->
    {ok, Doc#doc{revs = {0, [1]}}};
increment_local_doc_rev(#doc{revs = {0, [RevStr | _]}} = Doc) ->
    try
        PrevRev = binary_to_integer(RevStr),
        {ok, Doc#doc{revs = {0, [PrevRev + 1]}}}
    catch error:badarg ->
        {error, <<"Invalid rev format">>}
    end;
increment_local_doc_rev(#doc{}) ->
    {error, <<"Invalid rev format">>}.


doc_to_revid(#doc{revs = Revs}) ->
    case Revs of
        {0, []} -> {0, <<>>};
        {RevPos, [Rev | _]} -> {RevPos, Rev}
    end.


tag_docs([]) ->
    [];
tag_docs([#doc{meta = Meta} = Doc | Rest]) ->
    Meta1 = lists:keystore(ref, 1, Meta, {ref, make_ref()}),
    NewDoc = Doc#doc{meta = Meta1},
    [NewDoc | tag_docs(Rest)].


doc_tag(#doc{meta = Meta}) ->
    fabric2_util:get_value(ref, Meta).


idrevs({Id, Revs}) when is_list(Revs) ->
    {docid(Id), [rev(R) || R <- Revs]}.


docid(DocId) when is_list(DocId) ->
    list_to_binary(DocId);
docid(DocId) ->
    DocId.


rev(Rev) when is_list(Rev); is_binary(Rev) ->
    couch_doc:parse_rev(Rev);
rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) ->
    Rev.


maybe_stop({ok, Acc}) ->
    Acc;
maybe_stop({stop, Acc}) ->
    throw({stop, Acc}).


set_design_doc_keys(Options1) ->
    Dir = couch_util:get_value(dir, Options1, fwd),
    Options2 = set_design_doc_start_key(Options1, Dir),
    set_design_doc_end_key(Options2, Dir).


set_design_doc_start_key(Options, fwd) ->
    Key1 = couch_util:get_value(start_key, Options, ?FIRST_DDOC_KEY),
    Key2 = max(Key1, ?FIRST_DDOC_KEY),
    lists:keystore(start_key, 1, Options, {start_key, Key2});

set_design_doc_start_key(Options, rev) ->
    Key1 = couch_util:get_value(start_key, Options, ?LAST_DDOC_KEY),
    Key2 = min(Key1, ?LAST_DDOC_KEY),
    lists:keystore(start_key, 1, Options, {start_key, Key2}).


set_design_doc_end_key(Options, fwd) ->
    case couch_util:get_value(end_key_gt, Options) of
        undefined ->
            Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY),
            Key2 = min(Key1, ?LAST_DDOC_KEY),
            lists:keystore(end_key, 1, Options, {end_key, Key2});
        EKeyGT ->
            Key2 = min(EKeyGT, ?LAST_DDOC_KEY),
            lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2})
    end;

set_design_doc_end_key(Options, rev) ->
    case couch_util:get_value(end_key_gt, Options) of
        undefined ->
            Key1 = couch_util:get_value(end_key, Options, ?FIRST_DDOC_KEY),
            Key2 = max(Key1, ?FIRST_DDOC_KEY),
            lists:keystore(end_key, 1, Options, {end_key, Key2});
        EKeyGT ->
            Key2 = max(EKeyGT, ?FIRST_DDOC_KEY),
            lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2})
    end.


stem_revisions(#{} = Db, #doc{} = Doc) ->
    #{revs_limit := RevsLimit} = Db,
    #doc{revs = {RevPos, Revs}} = Doc,
    case RevPos >= RevsLimit of
        true -> Doc#doc{revs = {RevPos, lists:sublist(Revs, RevsLimit)}};
        false -> Doc
    end.


open_json_doc(Db, DocId, OpenOpts, DocOpts) ->
    case fabric2_db:open_doc(Db, DocId, OpenOpts) of
        {not_found, missing} ->
            [];
        {ok, #doc{deleted = true}} ->
            [{doc, null}];
        {ok, #doc{} = Doc} ->
            [{doc, couch_doc:to_json_obj(Doc, DocOpts)}]
    end.


get_cached_db(#{} = Db, Opts) when is_list(Opts) ->
    MaxAge = fabric2_util:get_value(max_age, Opts, 0),
    Now = erlang:monotonic_time(millisecond),
    Age = Now - maps:get(check_current_ts, Db),
    case Age < MaxAge of
        true ->
            Db;
        false ->
            fabric2_fdb:transactional(Db, fun(TxDb) ->
                fabric2_fdb:ensure_current(TxDb)
            end)
    end.


is_replicated(Options) when is_list(Options) ->
    lists:member(replicated_changes, Options).


get_batch_size(Options) ->
    case fabric2_util:get_value(batch_size, Options) of
        undefined ->
            config:get_integer("fabric", "update_docs_batch_size",
                ?DEFAULT_UPDATE_DOCS_BATCH_SIZE);
        Val when is_integer(Val) ->
            Val
    end.
