% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(fabric2_fdb).

-export([
    transactional/1,
    transactional/2,
    transactional/3,

    with_snapshot/2,

    create/2,
    open/2,
    ensure_current/1,
    delete/1,
    undelete/3,
    remove_deleted_db/2,
    exists/1,

    get_dir/1,

    list_dbs/4,
    list_dbs_info/4,
    list_deleted_dbs_info/4,

    get_info/1,
    get_info_future/2,
    get_info_wait/1,
    set_config/3,

    get_stat/2,
    incr_stat/3,
    incr_stat/4,

    get_all_revs/2,
    get_all_revs_future/2,
    get_winning_revs/3,
    get_winning_revs_future/3,
    get_revs_wait/2,
    get_non_deleted_rev/3,

    get_doc_body/3,
    get_doc_body_future/3,
    get_doc_body_wait/4,

    get_local_doc_rev_future/2,
    get_local_doc_rev_wait/1,
    get_local_doc_body_future/3,
    get_local_doc_body_wait/4,
    get_local_doc/2,
    get_local_doc_rev/3,

    write_doc/6,
    write_local_doc/2,

    read_attachment/3,
    write_attachment/4,

    get_last_change/1,

    fold_range/5,

    vs_to_seq/1,
    seq_to_vs/1,
    next_vs/1,

    new_versionstamp/1,

    get_approximate_tx_size/1,

    chunkify_binary/1,
    chunkify_binary/2,

    debug_cluster/0,
    debug_cluster/2
]).

-include_lib("couch/include/couch_db.hrl").
-include("fabric2.hrl").

-record(fold_acc, {
    db,
    restart_tx,
    start_key,
    end_key,
    limit,
    skip,
    retries,
    base_opts,
    user_fun,
    user_acc
}).

-record(info_future, {
    tx,
    db_prefix,
    changes_future,
    meta_future,
    uuid_future,
    retries = 0
}).

transactional(Fun) ->
    do_transaction(Fun, undefined, #{}).

transactional(DbName, Fun) when is_binary(DbName), is_function(Fun) ->
    transactional(DbName, #{}, Fun);
transactional(#{} = Db, Fun) when is_function(Fun) ->
    transactional(Db, #{}, Fun).

transactional(DbName, #{} = TxOptions, Fun) when is_binary(DbName) ->
    with_span(Fun, #{'db.name' => DbName}, fun() ->
        do_transaction(
            fun(Tx) ->
                Fun(init_db(Tx, DbName))
            end,
            undefined,
            TxOptions
        )
    end);
transactional(#{tx := undefined} = Db, #{} = TxOptions, Fun) ->
    DbName = maps:get(name, Db, undefined),
    try
        Db1 = refresh(Db),
        Reopen = maps:get(reopen, Db1, false),
        Db2 = maps:remove(reopen, Db1),
        LayerPrefix =
            case Reopen of
                true -> undefined;
                false -> maps:get(layer_prefix, Db2)
            end,
        with_span(Fun, #{'db.name' => DbName}, fun() ->
            do_transaction(
                fun(Tx) ->
                    case Reopen of
                        true -> Fun(reopen(Db2#{tx => Tx}));
                        false -> Fun(Db2#{tx => Tx})
                    end
                end,
                LayerPrefix,
                TxOptions
            )
        end)
    catch
        throw:{?MODULE, reopen} ->
            with_span('db.reopen', #{'db.name' => DbName}, fun() ->
                transactional(Db#{reopen => true}, Fun)
            end)
    end;
transactional(#{tx := {erlfdb_snapshot, _}} = Db, #{} = _TxOptions, Fun) ->
    DbName = maps:get(name, Db, undefined),
    with_span(Fun, #{'db.name' => DbName}, fun() ->
        Fun(Db)
    end);
transactional(#{tx := {erlfdb_transaction, _}} = Db, #{} = _TxOptions, Fun) ->
    DbName = maps:get(name, Db, undefined),
    with_span(Fun, #{'db.name' => DbName}, fun() ->
        Fun(Db)
    end).

do_transaction(Fun, LayerPrefix, #{} = TxOptions) when is_function(Fun, 1) ->
    Db = get_db_handle(),
    try
        erlfdb:transactional(Db, fun(Tx) ->
            apply_tx_options(Tx, TxOptions),
            case get(erlfdb_trace) of
                Name when is_binary(Name) ->
                    UId = erlang:unique_integer([positive]),
                    UIdBin = integer_to_binary(UId, 36),
                    TxId = <<Name/binary, "_", UIdBin/binary>>,
                    erlfdb:set_option(Tx, transaction_logging_enable, TxId);
                _ ->
                    ok
            end,
            case is_transaction_applied(Tx) of
                true ->
                    get_previous_transaction_result();
                false ->
                    execute_transaction(Tx, Fun, LayerPrefix)
            end
        end)
    after
        clear_transaction()
    end.

apply_tx_options(Tx, #{} = TxOptions) ->
    maps:map(
        fun(K, V) ->
            erlfdb:set_option(Tx, K, V)
        end,
        TxOptions
    ).

with_snapshot(#{tx := {erlfdb_transaction, _} = Tx} = TxDb, Fun) ->
    SSDb = TxDb#{tx := erlfdb:snapshot(Tx)},
    Fun(SSDb);
with_snapshot(#{tx := {erlfdb_snapshot, _}} = SSDb, Fun) ->
    Fun(SSDb).

create(#{} = Db0, Options) ->
    #{
        name := DbName,
        tx := Tx,
        layer_prefix := LayerPrefix
    } = Db1 = ensure_current(Db0, false),

    DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
    HCA = erlfdb_hca:create(erlfdb_tuple:pack({?DB_HCA}, LayerPrefix)),
    AllocPrefix = erlfdb_hca:allocate(HCA, Tx),
    DbPrefix = erlfdb_tuple:pack({?DBS, AllocPrefix}, LayerPrefix),
    erlfdb:set(Tx, DbKey, DbPrefix),

    % This key is responsible for telling us when something in
    % the database cache (i.e., fabric2_server's ets table) has
    % changed and requires re-loading. This currently includes
    % revs_limit and validate_doc_update functions. There's
    % no order to versioning here. Its just a value that changes
    % that is used in the ensure_current check.
    DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
    DbVersion = fabric2_util:uuid(),
    erlfdb:set(Tx, DbVersionKey, DbVersion),

    UUID = fabric2_util:uuid(),

    Defaults = [
        {?DB_CONFIG, <<"uuid">>, UUID},
        {?DB_CONFIG, <<"revs_limit">>, ?uint2bin(1000)},
        {?DB_CONFIG, <<"security_doc">>, <<"{}">>},
        {?DB_STATS, <<"doc_count">>, ?uint2bin(0)},
        {?DB_STATS, <<"doc_del_count">>, ?uint2bin(0)},
        {?DB_STATS, <<"doc_design_count">>, ?uint2bin(0)},
        {?DB_STATS, <<"doc_local_count">>, ?uint2bin(0)},
        {?DB_STATS, <<"sizes">>, <<"external">>, ?uint2bin(2)},
        {?DB_STATS, <<"sizes">>, <<"views">>, ?uint2bin(0)}
    ],
    lists:foreach(
        fun
            ({P, K, V}) ->
                Key = erlfdb_tuple:pack({P, K}, DbPrefix),
                erlfdb:set(Tx, Key, V);
            ({P, S, K, V}) ->
                Key = erlfdb_tuple:pack({P, S, K}, DbPrefix),
                erlfdb:set(Tx, Key, V)
        end,
        Defaults
    ),

    UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}),
    Options1 = lists:keydelete(user_ctx, 1, Options),

    Db2 = Db1#{
        uuid => UUID,
        db_prefix => DbPrefix,
        db_version => DbVersion,

        revs_limit => 1000,
        security_doc => {[]},
        user_ctx => UserCtx,
        check_current_ts => erlang:monotonic_time(millisecond),

        validate_doc_update_funs => [],
        before_doc_update => undefined,
        after_doc_read => undefined,
        % All other db things as we add features,

        db_options => Options1,
        interactive => false
    },
    aegis:init_db(Db2, Options).

open(#{} = Db0, Options) ->
    #{
        name := DbName,
        tx := Tx,
        layer_prefix := LayerPrefix
    } = Db1 = ensure_current(Db0, false),

    DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
    DbPrefix =
        case erlfdb:wait(erlfdb:get(Tx, DbKey)) of
            Bin when is_binary(Bin) -> Bin;
            not_found -> erlang:error(database_does_not_exist)
        end,

    DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
    DbVersion = erlfdb:wait(erlfdb:get(Tx, DbVersionKey)),

    UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}),
    Options1 = lists:keydelete(user_ctx, 1, Options),

    UUID = fabric2_util:get_value(uuid, Options1),
    Options2 = lists:keydelete(uuid, 1, Options1),

    Interactive = fabric2_util:get_value(interactive, Options2, false),
    Options3 = lists:keydelete(interactive, 1, Options2),

    Db2 = Db1#{
        db_prefix => DbPrefix,
        db_version => DbVersion,

        uuid => <<>>,
        revs_limit => 1000,
        security_doc => {[]},

        user_ctx => UserCtx,
        check_current_ts => erlang:monotonic_time(millisecond),

        % Place holders until we implement these
        % bits.
        validate_doc_update_funs => [],
        before_doc_update => undefined,
        after_doc_read => undefined,

        db_options => Options3,
        interactive => Interactive
    },

    Db3 = load_config(Db2),
    Db4 = aegis:open_db(Db3),

    case {UUID, Db4} of
        {undefined, _} -> ok;
        {<<_/binary>>, #{uuid := UUID}} -> ok;
        {<<_/binary>>, #{uuid := _}} -> erlang:error(database_does_not_exist)
    end,

    load_validate_doc_funs(Db4).

% Match on `name` in the function head since some non-fabric2 db
% objects might not have names and so they don't get cached
refresh(#{tx := undefined, name := DbName} = Db) ->
    #{
        uuid := UUID,
        md_version := OldVer
    } = Db,

    case fabric2_server:fetch(DbName, UUID) of
        % Relying on these assumptions about the `md_version` value:
        %  - It is bumped every time `db_version` is bumped
        %  - Is a versionstamp, so we can check which one is newer
        %  - If it is `not_found`, it would sort less than a binary value
        #{md_version := Ver} = Db1 when Ver > OldVer ->
            Db1#{
                user_ctx := maps:get(user_ctx, Db),
                security_fun := maps:get(security_fun, Db),
                interactive := maps:get(interactive, Db)
            };
        _ ->
            Db
    end;
refresh(#{} = Db) ->
    Db.

reopen(#{} = OldDb) ->
    require_transaction(OldDb),
    #{
        tx := Tx,
        name := DbName,
        uuid := UUID,
        db_options := Options,
        user_ctx := UserCtx,
        security_fun := SecurityFun,
        interactive := Interactive
    } = OldDb,
    Options1 = lists:keystore(user_ctx, 1, Options, {user_ctx, UserCtx}),
    NewDb = open(init_db(Tx, DbName), Options1),

    % Check if database was re-created
    case {Interactive, maps:get(uuid, NewDb)} of
        {true, _} -> ok;
        {false, UUID} -> ok;
        {false, _OtherUUID} -> error(database_does_not_exist)
    end,

    NewDb#{security_fun := SecurityFun, interactive := Interactive}.

delete(#{} = Db) ->
    DoRecovery = fabric2_util:do_recovery(),
    case DoRecovery of
        true -> soft_delete_db(Db);
        false -> hard_delete_db(Db)
    end.

undelete(#{} = Db0, TgtDbName, TimeStamp) ->
    #{
        name := DbName,
        tx := Tx,
        layer_prefix := LayerPrefix
    } = ensure_current(Db0, false),
    DbKey = erlfdb_tuple:pack({?ALL_DBS, TgtDbName}, LayerPrefix),
    case erlfdb:wait(erlfdb:get(Tx, DbKey)) of
        Bin when is_binary(Bin) ->
            file_exists;
        not_found ->
            DeletedDbTupleKey = {
                ?DELETED_DBS,
                DbName,
                TimeStamp
            },
            DeleteDbKey = erlfdb_tuple:pack(DeletedDbTupleKey, LayerPrefix),
            case erlfdb:wait(erlfdb:get(Tx, DeleteDbKey)) of
                not_found ->
                    not_found;
                DbPrefix ->
                    erlfdb:set(Tx, DbKey, DbPrefix),
                    erlfdb:clear(Tx, DeleteDbKey),
                    bump_db_version(#{
                        tx => Tx,
                        db_prefix => DbPrefix
                    }),
                    ok
            end
    end.

remove_deleted_db(#{} = Db0, TimeStamp) ->
    #{
        name := DbName,
        tx := Tx,
        layer_prefix := LayerPrefix
    } = ensure_current(Db0, false),

    DeletedDbTupleKey = {
        ?DELETED_DBS,
        DbName,
        TimeStamp
    },
    DeletedDbKey = erlfdb_tuple:pack(DeletedDbTupleKey, LayerPrefix),
    case erlfdb:wait(erlfdb:get(Tx, DeletedDbKey)) of
        not_found ->
            not_found;
        DbPrefix ->
            erlfdb:clear(Tx, DeletedDbKey),
            erlfdb:clear_range_startswith(Tx, DbPrefix),
            bump_db_version(#{
                tx => Tx,
                db_prefix => DbPrefix
            }),
            ok
    end.

exists(#{name := DbName} = Db) when is_binary(DbName) ->
    #{
        tx := Tx,
        layer_prefix := LayerPrefix
    } = ensure_current(Db, false),

    DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
    case erlfdb:wait(erlfdb:get(Tx, DbKey)) of
        Bin when is_binary(Bin) -> true;
        not_found -> false
    end.

get_dir(Tx) ->
    Root = erlfdb_directory:root(),
    Dir = fabric2_server:fdb_directory(),
    CouchDB = erlfdb_directory:create_or_open(Tx, Root, Dir),
    erlfdb_directory:get_name(CouchDB).

list_dbs(Tx, Callback, AccIn, Options0) ->
    Options =
        case fabric2_util:get_value(restart_tx, Options0) of
            undefined -> [{restart_tx, true} | Options0];
            _AlreadySet -> Options0
        end,
    LayerPrefix = get_dir(Tx),
    Prefix = erlfdb_tuple:pack({?ALL_DBS}, LayerPrefix),
    fold_range(
        {tx, Tx},
        Prefix,
        fun({K, _V}, Acc) ->
            {DbName} = erlfdb_tuple:unpack(K, Prefix),
            Callback(DbName, Acc)
        end,
        AccIn,
        Options
    ).

list_dbs_info(Tx, Callback, AccIn, Options0) ->
    Options =
        case fabric2_util:get_value(restart_tx, Options0) of
            undefined -> [{restart_tx, true} | Options0];
            _AlreadySet -> Options0
        end,
    LayerPrefix = get_dir(Tx),
    Prefix = erlfdb_tuple:pack({?ALL_DBS}, LayerPrefix),
    fold_range(
        {tx, Tx},
        Prefix,
        fun({DbNameKey, DbPrefix}, Acc) ->
            {DbName} = erlfdb_tuple:unpack(DbNameKey, Prefix),
            InfoFuture = get_info_future(Tx, DbPrefix),
            Callback(DbName, InfoFuture, Acc)
        end,
        AccIn,
        Options
    ).

list_deleted_dbs_info(Tx, Callback, AccIn, Options0) ->
    Options =
        case fabric2_util:get_value(restart_tx, Options0) of
            undefined -> [{restart_tx, true} | Options0];
            _AlreadySet -> Options0
        end,
    LayerPrefix = get_dir(Tx),
    Prefix = erlfdb_tuple:pack({?DELETED_DBS}, LayerPrefix),
    fold_range(
        {tx, Tx},
        Prefix,
        fun({DbKey, DbPrefix}, Acc) ->
            {DbName, TimeStamp} = erlfdb_tuple:unpack(DbKey, Prefix),
            InfoFuture = get_info_future(Tx, DbPrefix),
            Callback(DbName, TimeStamp, InfoFuture, Acc)
        end,
        AccIn,
        Options
    ).

get_info(#{} = Db) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),
    DbInfo = get_info_wait(get_info_future(Tx, DbPrefix)),
    AegisProps = aegis:get_db_info(Db),
    [{encryption, {AegisProps}} | DbInfo].

get_info_future(Tx, DbPrefix) ->
    {CStart, CEnd} = erlfdb_tuple:range({?DB_CHANGES}, DbPrefix),
    ChangesFuture = erlfdb:get_range(Tx, CStart, CEnd, [
        {streaming_mode, exact},
        {limit, 1},
        {reverse, true}
    ]),

    UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix),
    UUIDFuture = erlfdb:get(Tx, UUIDKey),

    StatsPrefix = erlfdb_tuple:pack({?DB_STATS}, DbPrefix),
    MetaFuture = erlfdb:get_range_startswith(Tx, StatsPrefix),

    % Save the tx object only if it's read-only as we might retry to get the
    % future again after the tx was reset
    SaveTx =
        case erlfdb:get_writes_allowed(Tx) of
            true -> undefined;
            false -> Tx
        end,

    #info_future{
        tx = SaveTx,
        db_prefix = DbPrefix,
        changes_future = ChangesFuture,
        meta_future = MetaFuture,
        uuid_future = UUIDFuture
    }.

get_info_wait(#info_future{tx = Tx, retries = Retries} = Future) when
    Tx =:= undefined orelse Retries >= 2
->
    get_info_wait_int(Future);
get_info_wait(#info_future{tx = Tx, retries = Retries} = Future) ->
    try
        get_info_wait_int(Future)
    catch
        error:{erlfdb_error, ?ERLFDB_TRANSACTION_CANCELLED} ->
            Future1 = get_info_future(Tx, Future#info_future.db_prefix),
            get_info_wait(Future1#info_future{retries = Retries + 1});
        error:{erlfdb_error, Error} when ?ERLFDB_IS_RETRYABLE(Error) ->
            ok = erlfdb:reset(Tx),
            Future1 = get_info_future(Tx, Future#info_future.db_prefix),
            get_info_wait(Future1#info_future{retries = Retries + 1})
    end.

load_config(#{} = Db) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = Db,

    {Start, End} = erlfdb_tuple:range({?DB_CONFIG}, DbPrefix),
    Future = erlfdb:get_range(Tx, Start, End),

    lists:foldl(
        fun({K, V}, DbAcc) ->
            {?DB_CONFIG, Key} = erlfdb_tuple:unpack(K, DbPrefix),
            case Key of
                <<"uuid">> -> DbAcc#{uuid := V};
                <<"revs_limit">> -> DbAcc#{revs_limit := ?bin2uint(V)};
                <<"security_doc">> -> DbAcc#{security_doc := ?JSON_DECODE(V)}
            end
        end,
        Db,
        erlfdb:wait(Future)
    ).

set_config(#{} = Db0, Key, Val) when is_atom(Key) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = Db = ensure_current(Db0),
    {BinKey, BinVal} =
        case Key of
            uuid -> {<<"uuid">>, Val};
            revs_limit -> {<<"revs_limit">>, ?uint2bin(max(1, Val))};
            security_doc -> {<<"security_doc">>, ?JSON_ENCODE(Val)}
        end,
    DbKey = erlfdb_tuple:pack({?DB_CONFIG, BinKey}, DbPrefix),
    erlfdb:set(Tx, DbKey, BinVal),
    {ok, DbVersion} = bump_db_version(Db),
    {ok, Db#{db_version := DbVersion, Key := Val}}.

get_stat(#{} = Db, StatKey) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    Key = erlfdb_tuple:pack({?DB_STATS, StatKey}, DbPrefix),

    % Might need to figure out some sort of type
    % system here. Uints are because stats are all
    % atomic op adds for the moment.
    ?bin2uint(erlfdb:wait(erlfdb:get(Tx, Key))).

incr_stat(_Db, _StatKey, 0) ->
    ok;
incr_stat(#{} = Db, StatKey, Increment) when is_integer(Increment) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    Key = erlfdb_tuple:pack({?DB_STATS, StatKey}, DbPrefix),
    erlfdb:add(Tx, Key, Increment).

incr_stat(_Db, _Section, _Key, 0) ->
    ok;
incr_stat(#{} = Db, Section, Key, Increment) when is_integer(Increment) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    BinKey = erlfdb_tuple:pack({?DB_STATS, Section, Key}, DbPrefix),
    erlfdb:add(Tx, BinKey, Increment).

get_all_revs(#{} = Db, DocId) ->
    DbName = maps:get(name, Db, undefined),
    with_span('db.get_all_revs', #{'db.name' => DbName, 'doc.id' => DocId}, fun() ->
        Future = get_all_revs_future(Db, DocId),
        get_revs_wait(Db, Future)
    end).

get_all_revs_future(#{} = Db, DocId) ->
    Options = [{streaming_mode, want_all}],
    get_revs_future(Db, DocId, Options).

get_winning_revs(Db, DocId, NumRevs) ->
    DbName = maps:get(name, Db, undefined),
    with_span('db.get_winning_revs', #{'db.name' => DbName, 'doc.id' => DocId}, fun() ->
        Future = get_winning_revs_future(Db, DocId, NumRevs),
        get_revs_wait(Db, Future)
    end).

get_winning_revs_future(#{} = Db, DocId, NumRevs) ->
    Options = [{reverse, true}, {limit, NumRevs}],
    get_revs_future(Db, DocId, Options).

get_revs_future(#{} = Db, DocId, Options) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    {StartKey, EndKey} = erlfdb_tuple:range({?DB_REVS, DocId}, DbPrefix),
    erlfdb:fold_range_future(Tx, StartKey, EndKey, Options).

get_revs_wait(#{} = Db, RangeFuture) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    RevRows = erlfdb:fold_range_wait(
        Tx,
        RangeFuture,
        fun({K, V}, Acc) ->
            Key = erlfdb_tuple:unpack(K, DbPrefix),
            Val = erlfdb_tuple:unpack(V),
            [fdb_to_revinfo(Key, Val) | Acc]
        end,
        []
    ),
    lists:reverse(RevRows).

get_non_deleted_rev(#{} = Db, DocId, RevId) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    {RevPos, Rev} = RevId,

    BaseKey = {?DB_REVS, DocId, true, RevPos, Rev},
    Key = erlfdb_tuple:pack(BaseKey, DbPrefix),
    case erlfdb:wait(erlfdb:get(Tx, Key)) of
        not_found ->
            not_found;
        Val ->
            fdb_to_revinfo(BaseKey, erlfdb_tuple:unpack(Val))
    end.

get_doc_body(Db, DocId, RevInfo) ->
    DbName = maps:get(name, Db, undefined),
    with_span('db.get_doc_body', #{'db.name' => DbName, 'doc.id' => DocId}, fun() ->
        Future = get_doc_body_future(Db, DocId, RevInfo),
        get_doc_body_wait(Db, DocId, RevInfo, Future)
    end).

get_doc_body_future(#{} = Db, DocId, RevInfo) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    #{
        rev_id := {RevPos, Rev}
    } = RevInfo,

    Key = {?DB_DOCS, DocId, RevPos, Rev},
    {StartKey, EndKey} = erlfdb_tuple:range(Key, DbPrefix),
    erlfdb:fold_range_future(Tx, StartKey, EndKey, []).

get_doc_body_wait(#{} = Db0, DocId, RevInfo, Future) ->
    #{
        tx := Tx
    } = Db = ensure_current(Db0),

    #{
        rev_id := {RevPos, Rev},
        rev_path := RevPath
    } = RevInfo,

    FoldFun = aegis:wrap_fold_fun(Db, fun({_K, V}, Acc) ->
        [V | Acc]
    end),
    RevBodyRows = erlfdb:fold_range_wait(Tx, Future, FoldFun, []),
    BodyRows = lists:reverse(RevBodyRows),

    fdb_to_doc(Db, DocId, RevPos, [Rev | RevPath], BodyRows).

get_local_doc_rev_future(Db, DocId) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, DocId}, DbPrefix),
    erlfdb:get(Tx, Key).

get_local_doc_rev_wait(Future) ->
    erlfdb:wait(Future).

get_local_doc_body_future(#{} = Db, DocId, _Rev) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    Prefix = erlfdb_tuple:pack({?DB_LOCAL_DOC_BODIES, DocId}, DbPrefix),
    erlfdb:get_range_startswith(Tx, Prefix).

get_local_doc_body_wait(#{} = Db0, DocId, Rev, Future) ->
    Db = ensure_current(Db0),

    {_, Chunks} = lists:unzip(aegis:decrypt(Db, erlfdb:wait(Future))),
    fdb_to_local_doc(Db, DocId, Rev, Chunks).

get_local_doc(#{} = Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId) ->
    RevFuture = get_local_doc_rev_future(Db, DocId),
    Rev = get_local_doc_rev_wait(RevFuture),

    BodyFuture = get_local_doc_body_future(Db, DocId, Rev),
    get_local_doc_body_wait(Db, DocId, Rev, BodyFuture).

get_local_doc_rev(_Db0, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, Val) ->
    case Val of
        <<255, RevBin/binary>> ->
            % Versioned local docs
            try
                case erlfdb_tuple:unpack(RevBin) of
                    {?CURR_LDOC_FORMAT, Rev, _Size} -> Rev
                end
            catch
                _:_ ->
                    erlang:error({invalid_local_doc_rev, DocId, Val})
            end;
        <<131, _/binary>> ->
            % Compatibility clause for an older encoding format
            try binary_to_term(Val, [safe]) of
                {Rev, _} -> Rev;
                _ -> erlang:error({invalid_local_doc_rev, DocId, Val})
            catch
                error:badarg ->
                    erlang:error({invalid_local_doc_rev, DocId, Val})
            end;
        <<_/binary>> ->
            try binary_to_integer(Val) of
                IntVal when IntVal >= 0 ->
                    Val;
                _ ->
                    erlang:error({invalid_local_doc_rev, DocId, Val})
            catch
                error:badarg ->
                    erlang:error({invalid_local_doc_rev, DocId, Val})
            end
    end.

write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = Db = ensure_current(Db0),

    #doc{
        id = DocId,
        deleted = Deleted,
        atts = Atts
    } = Doc,

    % Doc body

    ok = write_doc_body(Db, Doc),

    % Attachment bookkeeping

    % If a document's attachments have changed we have to scan
    % for any attachments that may need to be deleted. The check
    % for `>= 2` is a bit subtle. The important point is that
    % one of the revisions will be from the new document so we
    % have to find at least one more beyond that to assert that
    % the attachments have not changed.
    AttHash = fabric2_util:hash_atts(Atts),
    RevsToCheck = [NewWinner0] ++ ToUpdate ++ ToRemove,
    AttHashCount = lists:foldl(
        fun(Att, Count) ->
            #{att_hash := RevAttHash} = Att,
            case RevAttHash == AttHash of
                true -> Count + 1;
                false -> Count
            end
        end,
        0,
        RevsToCheck
    ),
    if
        AttHashCount == length(RevsToCheck) ->
            ok;
        AttHashCount >= 2 ->
            ok;
        true ->
            cleanup_attachments(Db, DocId, Doc, ToRemove)
    end,

    % Revision tree

    NewWinner = NewWinner0#{
        winner := true
    },
    NewRevId = maps:get(rev_id, NewWinner),

    {WKey, WVal, WinnerVS} = revinfo_to_fdb(Tx, DbPrefix, DocId, NewWinner),
    ok = erlfdb:set_versionstamped_value(Tx, WKey, WVal),

    lists:foreach(
        fun(RI0) ->
            RI = RI0#{winner := false},
            {K, V, undefined} = revinfo_to_fdb(Tx, DbPrefix, DocId, RI),
            ok = erlfdb:set(Tx, K, V)
        end,
        ToUpdate
    ),

    lists:foreach(
        fun(RI0) ->
            RI = RI0#{winner := false},
            {K, _, undefined} = revinfo_to_fdb(Tx, DbPrefix, DocId, RI),
            ok = erlfdb:clear(Tx, K),
            ok = clear_doc_body(Db, DocId, RI0)
        end,
        ToRemove
    ),

    % _all_docs

    UpdateStatus =
        case {OldWinner, NewWinner} of
            {not_found, #{deleted := false}} ->
                created;
            {not_found, #{deleted := true}} ->
                replicate_deleted;
            {#{deleted := true}, #{deleted := false}} ->
                recreated;
            {#{deleted := false}, #{deleted := false}} ->
                updated;
            {#{deleted := false}, #{deleted := true}} ->
                deleted;
            {#{deleted := true}, #{deleted := true}} ->
                ignore
        end,

    case UpdateStatus of
        replicate_deleted ->
            ok;
        ignore ->
            ok;
        deleted ->
            ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
            ok = erlfdb:clear(Tx, ADKey);
        _ ->
            ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
            ADVal = erlfdb_tuple:pack(NewRevId),
            ok = erlfdb:set(Tx, ADKey, ADVal)
    end,

    % _changes

    if
        OldWinner == not_found ->
            ok;
        true ->
            OldSeq = maps:get(sequence, OldWinner),
            OldSeqKey = erlfdb_tuple:pack({?DB_CHANGES, OldSeq}, DbPrefix),
            erlfdb:clear(Tx, OldSeqKey)
    end,

    NewSeqKey = erlfdb_tuple:pack_vs({?DB_CHANGES, WinnerVS}, DbPrefix),
    NewSeqVal = erlfdb_tuple:pack({DocId, Deleted, NewRevId}),
    erlfdb:set_versionstamped_key(Tx, NewSeqKey, NewSeqVal),

    % Bump db version on design doc changes

    IsDDoc =
        case Doc#doc.id of
            <<?DESIGN_DOC_PREFIX, _/binary>> -> true;
            _ -> false
        end,

    if
        not IsDDoc -> ok;
        true -> bump_db_version(Db)
    end,

    % Update our document counts

    case UpdateStatus of
        created ->
            if
                not IsDDoc -> ok;
                true -> incr_stat(Db, <<"doc_design_count">>, 1)
            end,
            incr_stat(Db, <<"doc_count">>, 1);
        recreated ->
            if
                not IsDDoc -> ok;
                true -> incr_stat(Db, <<"doc_design_count">>, 1)
            end,
            incr_stat(Db, <<"doc_count">>, 1),
            incr_stat(Db, <<"doc_del_count">>, -1);
        replicate_deleted ->
            incr_stat(Db, <<"doc_del_count">>, 1);
        ignore ->
            ok;
        deleted ->
            if
                not IsDDoc -> ok;
                true -> incr_stat(Db, <<"doc_design_count">>, -1)
            end,
            incr_stat(Db, <<"doc_count">>, -1),
            incr_stat(Db, <<"doc_del_count">>, 1);
        updated ->
            ok
    end,

    fabric2_db_plugin:after_doc_write(
        Db,
        Doc,
        NewWinner,
        OldWinner,
        NewRevId,
        WinnerVS
    ),

    % Update database size
    AddSize = sum_add_rev_sizes([NewWinner | ToUpdate]),
    RemSize = sum_rem_rev_sizes(ToRemove),
    incr_stat(Db, <<"sizes">>, <<"external">>, AddSize - RemSize),

    ok.

write_local_doc(#{} = Db0, Doc) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = Db = ensure_current(Db0),

    Id = Doc#doc.id,

    {LDocKey, LDocVal, NewSize, Rows} = local_doc_to_fdb(Db, Doc),

    {WasDeleted, PrevSize} =
        case erlfdb:wait(erlfdb:get(Tx, LDocKey)) of
            <<255, RevBin/binary>> ->
                case erlfdb_tuple:unpack(RevBin) of
                    {?CURR_LDOC_FORMAT, _Rev, Size} ->
                        {false, Size}
                end;
            <<_/binary>> ->
                {false, 0};
            not_found ->
                {true, 0}
        end,

    BPrefix = erlfdb_tuple:pack({?DB_LOCAL_DOC_BODIES, Id}, DbPrefix),

    case Doc#doc.deleted of
        true ->
            erlfdb:clear(Tx, LDocKey),
            erlfdb:clear_range_startswith(Tx, BPrefix);
        false ->
            erlfdb:set(Tx, LDocKey, LDocVal),
            % Make sure to clear the whole range, in case there was a larger
            % document body there before.
            erlfdb:clear_range_startswith(Tx, BPrefix),
            lists:foreach(
                fun({K, V}) ->
                    erlfdb:set(Tx, K, aegis:encrypt(Db, K, V))
                end,
                Rows
            )
    end,

    case {WasDeleted, Doc#doc.deleted} of
        {true, false} ->
            incr_stat(Db, <<"doc_local_count">>, 1);
        {false, true} ->
            incr_stat(Db, <<"doc_local_count">>, -1);
        _ ->
            ok
    end,

    incr_stat(Db, <<"sizes">>, <<"external">>, NewSize - PrevSize),

    ok.

read_attachment(#{} = Db, DocId, AttId) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix),
    Data =
        case erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)) of
            not_found ->
                throw({not_found, missing});
            KVs ->
                {_, Chunks} = lists:unzip(aegis:decrypt(Db, KVs)),
                iolist_to_binary(Chunks)
        end,

    IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix),
    case erlfdb:wait(erlfdb:get(Tx, IdKey)) of
        <<>> ->
            % Old format, before CURR_ATT_STORAGE_VER = 0
            Data;
        <<_/binary>> = InfoBin ->
            {?CURR_ATT_STORAGE_VER, Compressed} = erlfdb_tuple:unpack(InfoBin),
            case Compressed of
                true -> binary_to_term(Data, [safe]);
                false -> Data
            end
    end.

write_attachment(#{} = Db, DocId, Data, Encoding) when
    is_binary(Data), is_atom(Encoding)
->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    AttId = fabric2_util:uuid(),

    {Data1, Compressed} =
        case Encoding of
            gzip ->
                {Data, false};
            _ ->
                Opts = [{minor_version, 1}, {compressed, 6}],
                CompressedData = term_to_binary(Data, Opts),
                case size(CompressedData) < Data of
                    true -> {CompressedData, true};
                    false -> {Data, false}
                end
        end,

    IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix),
    InfoVal = erlfdb_tuple:pack({?CURR_ATT_STORAGE_VER, Compressed}),
    ok = erlfdb:set(Tx, IdKey, InfoVal),

    Chunks = chunkify_binary(Data1),

    lists:foldl(
        fun(Chunk, ChunkId) ->
            AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId, ChunkId}, DbPrefix),
            ok = erlfdb:set(Tx, AttKey, aegis:encrypt(Db, AttKey, Chunk)),
            ChunkId + 1
        end,
        0,
        Chunks
    ),
    {ok, AttId}.

get_last_change(#{} = Db) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    {Start, End} = erlfdb_tuple:range({?DB_CHANGES}, DbPrefix),
    Options = [{limit, 1}, {reverse, true}],
    case erlfdb:get_range(Tx, Start, End, Options) of
        [] ->
            vs_to_seq(fabric2_util:seq_zero_vs());
        [{K, _V}] ->
            {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix),
            vs_to_seq(SeqVS)
    end.

fold_range(TxOrDb, RangePrefix, UserFun, UserAcc, Options) ->
    {Db, Tx} =
        case TxOrDb of
            {tx, TxObj} ->
                {undefined, TxObj};
            #{} = DbObj ->
                DbObj1 = #{tx := TxObj} = ensure_current(DbObj),
                {DbObj1, TxObj}
        end,
    % FoundationDB treats a limit 0 of as unlimited so we guard against it
    case fabric2_util:get_value(limit, Options) of
        0 ->
            UserAcc;
        _ ->
            FAcc = get_fold_acc(Db, RangePrefix, UserFun, UserAcc, Options),
            try
                fold_range(Tx, FAcc)
            after
                erase(?PDICT_FOLD_ACC_STATE)
            end
    end.

fold_range(Tx, FAcc) ->
    #fold_acc{
        start_key = Start,
        end_key = End,
        limit = Limit,
        base_opts = BaseOpts,
        restart_tx = DoRestart
    } = FAcc,
    case DoRestart of
        false -> ok;
        true -> ok = erlfdb:set_option(Tx, disallow_writes)
    end,
    Opts = [{limit, Limit} | BaseOpts],
    Callback = fun fold_range_cb/2,
    try
        #fold_acc{
            user_acc = FinalUserAcc
        } = erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts),
        FinalUserAcc
    catch
        error:{erlfdb_error, Error} when
            ?ERLFDB_IS_RETRYABLE(Error) andalso DoRestart
        ->
            % Possibly handle cluster_version_changed and future_version as well to
            % continue iteration instead fallback to transactional and retrying
            % from the beginning which is bound to fail when streaming data out to a
            % socket.
            fold_range(Tx, restart_fold(Tx, FAcc))
    end.

vs_to_seq(VS) when is_tuple(VS) ->
    % 51 is the versionstamp type tag
    <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({VS}),
    fabric2_util:to_hex(SeqBin).

seq_to_vs(Seq) when is_binary(Seq) ->
    Seq1 = fabric2_util:from_hex(Seq),
    % 51 is the versionstamp type tag
    Seq2 = <<51:8, Seq1/binary>>,
    {VS} = erlfdb_tuple:unpack(Seq2),
    VS.

next_vs({versionstamp, VS, Batch, TxId}) ->
    {V, B, T} =
        case TxId < 16#FFFF of
            true ->
                {VS, Batch, TxId + 1};
            false ->
                case Batch < 16#FFFF of
                    true ->
                        {VS, Batch + 1, 0};
                    false ->
                        {VS + 1, 0, 0}
                end
        end,
    {versionstamp, V, B, T};
next_vs({versionstamp, VS, Batch}) ->
    {V, B} =
        case Batch < 16#FFFF of
            true ->
                {VS, Batch + 1};
            false ->
                {VS + 1, 0}
        end,
    {versionstamp, V, B}.

new_versionstamp(Tx) ->
    TxId = erlfdb:get_next_tx_id(Tx),
    {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.

get_approximate_tx_size(#{} = TxDb) ->
    require_transaction(TxDb),
    #{tx := Tx} = TxDb,
    erlfdb:wait(erlfdb:get_approximate_size(Tx)).

chunkify_binary(Data) ->
    chunkify_binary(Data, binary_chunk_size()).

chunkify_binary(Data, Size) ->
    case Data of
        <<>> ->
            [];
        <<Head:Size/binary, Rest/binary>> ->
            [Head | chunkify_binary(Rest, Size)];
        <<_/binary>> when size(Data) < Size ->
            [Data]
    end.

debug_cluster() ->
    debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).

debug_cluster(Start, End) ->
    transactional(fun(Tx) ->
        lists:foreach(
            fun({Key, Val}) ->
                io:format(standard_error, "~s => ~s~n", [
                    string:pad(erlfdb_util:repr(Key), 60),
                    erlfdb_util:repr(Val)
                ])
            end,
            erlfdb:get_range(Tx, Start, End)
        )
    end).

init_db(Tx, DbName) ->
    Prefix = get_dir(Tx),
    Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)),
    #{
        name => DbName,
        tx => Tx,
        layer_prefix => Prefix,
        md_version => Version,

        security_fun => undefined,
        db_options => []
    }.

load_validate_doc_funs(#{} = Db) ->
    FoldFun = fun
        ({row, Row}, Acc) ->
            DDocInfo = #{id => fabric2_util:get_value(id, Row)},
            {ok, [DDocInfo | Acc]};
        (_, Acc) ->
            {ok, Acc}
    end,

    Options = [
        {start_key, <<"_design/">>},
        {end_key, <<"_design0">>}
    ],

    {ok, Infos1} = fabric2_db:fold_docs(Db, FoldFun, [], Options),

    Infos2 = lists:map(
        fun(Info) ->
            #{
                id := DDocId = <<"_design/", _/binary>>
            } = Info,
            Info#{
                rev_info => get_winning_revs_future(Db, DDocId, 1)
            }
        end,
        Infos1
    ),

    Infos3 = lists:flatmap(
        fun(Info) ->
            #{
                id := DDocId,
                rev_info := RevInfoFuture
            } = Info,
            [RevInfo] = get_revs_wait(Db, RevInfoFuture),
            #{deleted := Deleted} = RevInfo,
            if
                Deleted ->
                    [];
                true ->
                    [
                        Info#{
                            rev_info := RevInfo,
                            body => get_doc_body_future(Db, DDocId, RevInfo)
                        }
                    ]
            end
        end,
        Infos2
    ),

    VDUs = lists:flatmap(
        fun(Info) ->
            #{
                id := DDocId,
                rev_info := RevInfo,
                body := BodyFuture
            } = Info,
            #doc{} = Doc = get_doc_body_wait(Db, DDocId, RevInfo, BodyFuture),
            case couch_doc:get_validate_doc_fun(Doc) of
                nil -> [];
                Fun -> [Fun]
            end
        end,
        Infos3
    ),

    Db#{
        validate_doc_update_funs := VDUs
    }.

bump_metadata_version(Tx) ->
    % The 14 zero bytes is pulled from the PR for adding the
    % metadata version key. Not sure why 14 bytes when version
    % stamps are only 80, but whatever for now.
    erlfdb:set_versionstamped_value(Tx, ?METADATA_VERSION_KEY, <<0:112>>).

check_metadata_version(#{} = Db) ->
    #{
        tx := Tx,
        md_version := Version
    } = Db,

    AlreadyChecked = get(?PDICT_CHECKED_MD_IS_CURRENT),
    if
        AlreadyChecked == true ->
            {current, Db};
        true ->
            case erlfdb:wait(erlfdb:get_ss(Tx, ?METADATA_VERSION_KEY)) of
                Version ->
                    put(?PDICT_CHECKED_MD_IS_CURRENT, true),
                    % We want to set a read conflict on the db version as we'd want
                    % to conflict with any writes to this particular db. However
                    % during db creation db prefix might not exist yet so we don't
                    % add a read-conflict on it then.
                    case maps:get(db_prefix, Db, not_found) of
                        not_found ->
                            ok;
                        <<_/binary>> = DbPrefix ->
                            DbVerKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
                            erlfdb:add_read_conflict_key(Tx, DbVerKey)
                    end,
                    {current, Db};
                NewVersion ->
                    {stale, Db#{md_version := NewVersion}}
            end
    end.

bump_db_version(#{} = Db) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = Db,

    DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
    DbVersion = fabric2_util:uuid(),
    ok = erlfdb:set(Tx, DbVersionKey, DbVersion),
    ok = bump_metadata_version(Tx),
    {ok, DbVersion}.

check_db_version(#{} = Db, CheckDbVersion) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix,
        db_version := DbVersion
    } = Db,

    AlreadyChecked = get(?PDICT_CHECKED_DB_IS_CURRENT),
    if
        not CheckDbVersion orelse AlreadyChecked == true ->
            current;
        true ->
            DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
            case erlfdb:wait(erlfdb:get(Tx, DbVersionKey)) of
                DbVersion ->
                    put(?PDICT_CHECKED_DB_IS_CURRENT, true),
                    current;
                _NewDBVersion ->
                    stale
            end
    end.

soft_delete_db(Db) ->
    #{
        name := DbName,
        tx := Tx,
        layer_prefix := LayerPrefix,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
    Timestamp = list_to_binary(fabric2_util:iso8601_timestamp()),
    DeletedDbKeyTuple = {?DELETED_DBS, DbName, Timestamp},
    DeletedDbKey = erlfdb_tuple:pack(DeletedDbKeyTuple, LayerPrefix),
    case erlfdb:wait(erlfdb:get(Tx, DeletedDbKey)) of
        not_found ->
            erlfdb:set(Tx, DeletedDbKey, DbPrefix),
            erlfdb:clear(Tx, DbKey),
            bump_db_version(Db),
            ok;
        _Val ->
            {deletion_frequency_exceeded, DbName}
    end.

hard_delete_db(Db) ->
    #{
        name := DbName,
        tx := Tx,
        layer_prefix := LayerPrefix,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),

    erlfdb:clear(Tx, DbKey),
    erlfdb:clear_range_startswith(Tx, DbPrefix),
    bump_metadata_version(Tx),
    ok.

write_doc_body(#{} = Db0, #doc{} = Doc) ->
    #{
        tx := Tx
    } = Db = ensure_current(Db0),

    Rows = doc_to_fdb(Db, Doc),
    lists:foreach(
        fun({Key, Value}) ->
            ok = erlfdb:set(Tx, Key, aegis:encrypt(Db, Key, Value))
        end,
        Rows
    ).

clear_doc_body(_Db, _DocId, not_found) ->
    % No old body to clear
    ok;
clear_doc_body(#{} = Db, DocId, #{} = RevInfo) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = Db,

    #{
        rev_id := {RevPos, Rev}
    } = RevInfo,

    BaseKey = {?DB_DOCS, DocId, RevPos, Rev},
    {StartKey, EndKey} = erlfdb_tuple:range(BaseKey, DbPrefix),
    ok = erlfdb:clear_range(Tx, StartKey, EndKey).

cleanup_attachments(Db, DocId, NewDoc, ToRemove) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = Db,

    RemoveRevs = lists:map(fun(#{rev_id := RevId}) -> RevId end, ToRemove),

    % Gather all known document revisions
    {ok, DiskDocs} = fabric2_db:open_doc_revs(Db, DocId, all, []),
    AllDocs = [{ok, NewDoc} | DiskDocs],

    % Get referenced attachment ids
    ActiveIdSet = lists:foldl(
        fun({ok, Doc}, Acc) ->
            #doc{
                revs = {Pos, [Rev | _]}
            } = Doc,
            case lists:member({Pos, Rev}, RemoveRevs) of
                true ->
                    Acc;
                false ->
                    lists:foldl(
                        fun(Att, InnerAcc) ->
                            {loc, _Db, _DocId, AttId} = couch_att:fetch(data, Att),
                            sets:add_element(AttId, InnerAcc)
                        end,
                        Acc,
                        Doc#doc.atts
                    )
            end
        end,
        sets:new(),
        AllDocs
    ),

    AttPrefix = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId}, DbPrefix),
    Options = [{streaming_mode, want_all}],
    Future = erlfdb:get_range_startswith(Tx, AttPrefix, Options),

    ExistingIdSet = lists:foldl(
        fun({K, _}, Acc) ->
            {?DB_ATT_NAMES, DocId, AttId} = erlfdb_tuple:unpack(K, DbPrefix),
            sets:add_element(AttId, Acc)
        end,
        sets:new(),
        erlfdb:wait(Future)
    ),

    AttsToRemove = sets:subtract(ExistingIdSet, ActiveIdSet),

    lists:foreach(
        fun(AttId) ->
            IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix),
            erlfdb:clear(Tx, IdKey),

            ChunkKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix),
            erlfdb:clear_range_startswith(Tx, ChunkKey)
        end,
        sets:to_list(AttsToRemove)
    ).

revinfo_to_fdb(Tx, DbPrefix, DocId, #{winner := true} = RevId) ->
    #{
        deleted := Deleted,
        rev_id := {RevPos, Rev},
        rev_path := RevPath,
        branch_count := BranchCount,
        att_hash := AttHash,
        rev_size := RevSize
    } = RevId,
    VS = new_versionstamp(Tx),
    Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev},
    Val = {
        ?CURR_REV_FORMAT,
        VS,
        BranchCount,
        list_to_tuple(RevPath),
        AttHash,
        RevSize
    },
    KBin = erlfdb_tuple:pack(Key, DbPrefix),
    VBin = erlfdb_tuple:pack_vs(Val),
    {KBin, VBin, VS};
revinfo_to_fdb(_Tx, DbPrefix, DocId, #{} = RevId) ->
    #{
        deleted := Deleted,
        rev_id := {RevPos, Rev},
        rev_path := RevPath,
        att_hash := AttHash,
        rev_size := RevSize
    } = RevId,
    Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev},
    Val = {?CURR_REV_FORMAT, list_to_tuple(RevPath), AttHash, RevSize},
    KBin = erlfdb_tuple:pack(Key, DbPrefix),
    VBin = erlfdb_tuple:pack(Val),
    {KBin, VBin, undefined}.

fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _, _, _, _} = Val) ->
    {?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key,
    {_RevFormat, Sequence, BranchCount, RevPath, AttHash, RevSize} = Val,
    #{
        winner => true,
        exists => true,
        deleted => not NotDeleted,
        rev_id => {RevPos, Rev},
        rev_path => tuple_to_list(RevPath),
        sequence => Sequence,
        branch_count => BranchCount,
        att_hash => AttHash,
        rev_size => RevSize
    };
fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _, _} = Val) ->
    {?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key,
    {_RevFormat, RevPath, AttHash, RevSize} = Val,
    #{
        winner => false,
        exists => true,
        deleted => not NotDeleted,
        rev_id => {RevPos, Rev},
        rev_path => tuple_to_list(RevPath),
        sequence => undefined,
        branch_count => undefined,
        att_hash => AttHash,
        rev_size => RevSize
    };
fdb_to_revinfo(Key, {0, Seq, BCount, RPath}) ->
    Val = {1, Seq, BCount, RPath, <<>>},
    fdb_to_revinfo(Key, Val);
fdb_to_revinfo(Key, {0, RPath}) ->
    Val = {1, RPath, <<>>},
    fdb_to_revinfo(Key, Val);
fdb_to_revinfo(Key, {1, Seq, BCount, RPath, AttHash}) ->
    % Don't forget to change ?CURR_REV_FORMAT to 2 here when it increments
    Val = {?CURR_REV_FORMAT, Seq, BCount, RPath, AttHash, 0},
    fdb_to_revinfo(Key, Val);
fdb_to_revinfo(Key, {1, RPath, AttHash}) ->
    % Don't forget to change ?CURR_REV_FORMAT to 2 here when it increments
    Val = {?CURR_REV_FORMAT, RPath, AttHash, 0},
    fdb_to_revinfo(Key, Val).

doc_to_fdb(Db, #doc{} = Doc) ->
    #{
        db_prefix := DbPrefix
    } = Db,

    #doc{
        id = Id,
        revs = {Start, [Rev | _]},
        body = Body,
        atts = Atts,
        deleted = Deleted
    } = Doc,

    DiskAtts = lists:map(fun couch_att:to_disk_term/1, Atts),

    Opts = [{minor_version, 1}, {compressed, 6}],
    Value = term_to_binary({Body, DiskAtts, Deleted}, Opts),
    Chunks = chunkify_binary(Value),

    {Rows, _} = lists:mapfoldl(
        fun(Chunk, ChunkId) ->
            Key = erlfdb_tuple:pack({?DB_DOCS, Id, Start, Rev, ChunkId}, DbPrefix),
            {{Key, Chunk}, ChunkId + 1}
        end,
        0,
        Chunks
    ),

    Rows.

fdb_to_doc(_Db, _DocId, _Pos, _Path, []) ->
    {not_found, missing};
fdb_to_doc(Db, DocId, Pos, Path, BinRows) when is_list(BinRows) ->
    Bin = iolist_to_binary(BinRows),
    {Body, DiskAtts, Deleted} = binary_to_term(Bin, [safe]),
    Atts = lists:map(
        fun(Att) ->
            couch_att:from_disk_term(Db, DocId, Att)
        end,
        DiskAtts
    ),
    Doc0 = #doc{
        id = DocId,
        revs = {Pos, Path},
        body = Body,
        atts = Atts,
        deleted = Deleted
    },

    case Db of
        #{after_doc_read := undefined} -> Doc0;
        #{after_doc_read := ADR} -> ADR(Doc0, Db)
    end.

local_doc_to_fdb(Db, #doc{} = Doc) ->
    #{
        db_prefix := DbPrefix
    } = Db,

    #doc{
        id = Id,
        revs = {0, [Rev]},
        body = Body
    } = Doc,

    Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id}, DbPrefix),

    StoreRev =
        case Rev of
            _ when is_integer(Rev) -> integer_to_binary(Rev);
            _ when is_binary(Rev) -> Rev
        end,

    BVal = term_to_binary(Body, [{minor_version, 1}, {compressed, 6}]),
    {Rows, _} = lists:mapfoldl(
        fun(Chunk, ChunkId) ->
            K = erlfdb_tuple:pack({?DB_LOCAL_DOC_BODIES, Id, ChunkId}, DbPrefix),
            {{K, Chunk}, ChunkId + 1}
        end,
        0,
        chunkify_binary(BVal)
    ),

    NewSize = fabric2_util:ldoc_size(Doc),
    RawValue = erlfdb_tuple:pack({?CURR_LDOC_FORMAT, StoreRev, NewSize}),

    % Prefix our tuple encoding to make upgrades easier
    Value = <<255, RawValue/binary>>,

    {Key, Value, NewSize, Rows}.

fdb_to_local_doc(_Db, _DocId, not_found, []) ->
    {not_found, missing};
fdb_to_local_doc(_Db, DocId, <<131, _/binary>> = Val, []) ->
    % This is an upgrade clause for the old encoding. We allow reading the old
    % value and will perform an upgrade of the storage format on an update.
    {Rev, Body} = binary_to_term(Val, [safe]),
    #doc{
        id = DocId,
        revs = {0, [Rev]},
        deleted = false,
        body = Body
    };
fdb_to_local_doc(_Db, DocId, <<255, RevBin/binary>>, Rows) when is_list(Rows) ->
    Rev =
        case erlfdb_tuple:unpack(RevBin) of
            {?CURR_LDOC_FORMAT, Rev0, _Size} -> Rev0
        end,

    BodyBin = iolist_to_binary(Rows),
    Body = binary_to_term(BodyBin, [safe]),

    #doc{
        id = DocId,
        revs = {0, [Rev]},
        deleted = false,
        body = Body
    };
fdb_to_local_doc(Db, DocId, RawRev, Rows) ->
    BaseRev = erlfdb_tuple:pack({?CURR_LDOC_FORMAT, RawRev, 0}),
    Rev = <<255, BaseRev/binary>>,
    fdb_to_local_doc(Db, DocId, Rev, Rows).

sum_add_rev_sizes(RevInfos) ->
    lists:foldl(
        fun(RI, Acc) ->
            #{
                exists := Exists,
                rev_size := Size
            } = RI,
            case Exists of
                true -> Acc;
                false -> Size + Acc
            end
        end,
        0,
        RevInfos
    ).

sum_rem_rev_sizes(RevInfos) ->
    lists:foldl(
        fun(RI, Acc) ->
            #{
                exists := true,
                rev_size := Size
            } = RI,
            Size + Acc
        end,
        0,
        RevInfos
    ).

get_fold_acc(Db, RangePrefix, UserCallback, UserAcc, Options) when
    is_map(Db) orelse Db =:= undefined
->
    Reverse =
        case fabric2_util:get_value(dir, Options) of
            rev -> true;
            _ -> false
        end,

    StartKey0 = fabric2_util:get_value(start_key, Options),
    EndKeyGt = fabric2_util:get_value(end_key_gt, Options),
    EndKey0 = fabric2_util:get_value(end_key, Options, EndKeyGt),
    InclusiveEnd = EndKeyGt == undefined,
    WrapKeys = fabric2_util:get_value(wrap_keys, Options) /= false,

    % CouchDB swaps the key meanings based on the direction
    % of the fold. FoundationDB does not so we have to
    % swap back here.
    {StartKey1, EndKey1} =
        case Reverse of
            false -> {StartKey0, EndKey0};
            true -> {EndKey0, StartKey0}
        end,

    % Set the maximum bounds for the start and endkey
    StartKey2 =
        case StartKey1 of
            undefined ->
                <<RangePrefix/binary, 16#00>>;
            SK2 when not WrapKeys ->
                erlfdb_tuple:pack(SK2, RangePrefix);
            SK2 ->
                erlfdb_tuple:pack({SK2}, RangePrefix)
        end,

    EndKey2 =
        case EndKey1 of
            undefined ->
                <<RangePrefix/binary, 16#FF>>;
            EK2 when Reverse andalso not WrapKeys ->
                PackedEK = erlfdb_tuple:pack(EK2, RangePrefix),
                <<PackedEK/binary, 16#FF>>;
            EK2 when Reverse ->
                PackedEK = erlfdb_tuple:pack({EK2}, RangePrefix),
                <<PackedEK/binary, 16#FF>>;
            EK2 when not WrapKeys ->
                erlfdb_tuple:pack(EK2, RangePrefix);
            EK2 ->
                erlfdb_tuple:pack({EK2}, RangePrefix)
        end,

    % FoundationDB ranges are applied as SK <= key < EK
    % By default, CouchDB is SK <= key <= EK with the
    % optional inclusive_end=false option changing that
    % to SK <= key < EK. Also, remember that CouchDB
    % swaps the meaning of SK and EK based on direction.
    %
    % Thus we have this wonderful bit of logic to account
    % for all of those combinations.

    StartKey3 =
        case {Reverse, InclusiveEnd} of
            {true, false} ->
                erlfdb_key:first_greater_than(StartKey2);
            _ ->
                StartKey2
        end,

    EndKey3 =
        case {Reverse, InclusiveEnd} of
            {false, true} when EndKey0 /= undefined ->
                erlfdb_key:first_greater_than(EndKey2);
            {true, _} ->
                erlfdb_key:first_greater_than(EndKey2);
            _ ->
                EndKey2
        end,

    Skip =
        case fabric2_util:get_value(skip, Options) of
            S when is_integer(S), S >= 0 -> S;
            _ -> 0
        end,

    Limit =
        case fabric2_util:get_value(limit, Options) of
            L when is_integer(L), L >= 0 -> L + Skip;
            undefined -> 0
        end,

    TargetBytes =
        case fabric2_util:get_value(target_bytes, Options) of
            T when is_integer(T), T >= 0 -> [{target_bytes, T}];
            undefined -> []
        end,

    StreamingMode =
        case fabric2_util:get_value(streaming_mode, Options) of
            undefined -> [];
            Name when is_atom(Name) -> [{streaming_mode, Name}]
        end,

    Snapshot =
        case fabric2_util:get_value(snapshot, Options) of
            undefined -> [];
            B when is_boolean(B) -> [{snapshot, B}]
        end,

    BaseOpts =
        [{reverse, Reverse}] ++
            TargetBytes ++
            StreamingMode ++
            Snapshot,

    RestartTx = fabric2_util:get_value(restart_tx, Options, false),

    #fold_acc{
        db = Db,
        start_key = StartKey3,
        end_key = EndKey3,
        skip = Skip,
        limit = Limit,
        retries = 0,
        base_opts = BaseOpts,
        restart_tx = RestartTx,
        user_fun = UserCallback,
        user_acc = UserAcc
    }.

fold_range_cb({K, V}, #fold_acc{} = Acc) ->
    #fold_acc{
        skip = Skip,
        limit = Limit,
        user_fun = UserFun,
        user_acc = UserAcc,
        base_opts = Opts
    } = Acc,
    Acc1 =
        case Skip =:= 0 of
            true ->
                UserAcc1 = UserFun({K, V}, UserAcc),
                Acc#fold_acc{limit = max(0, Limit - 1), user_acc = UserAcc1};
            false ->
                Acc#fold_acc{skip = Skip - 1, limit = Limit - 1}
        end,
    Acc2 =
        case fabric2_util:get_value(reverse, Opts, false) of
            true -> Acc1#fold_acc{end_key = erlfdb_key:last_less_or_equal(K)};
            false -> Acc1#fold_acc{start_key = erlfdb_key:first_greater_than(K)}
        end,
    put(?PDICT_FOLD_ACC_STATE, Acc2),
    Acc2.

restart_fold(Tx, #fold_acc{} = Acc) ->
    erase(?PDICT_CHECKED_MD_IS_CURRENT),

    ok = erlfdb:reset(Tx),

    % During buggify test runs MaxRetries would be -1
    MaxRetries = fabric2_server:get_retry_limit(),
    case {erase(?PDICT_FOLD_ACC_STATE), Acc#fold_acc.retries} of
        {#fold_acc{db = Db} = Acc1, _} ->
            Acc1#fold_acc{db = check_db_instance(Db), retries = 0};
        {undefined, Retries} when
            Retries < MaxRetries orelse
                MaxRetries =:= -1
        ->
            Db = check_db_instance(Acc#fold_acc.db),
            Acc#fold_acc{db = Db, retries = Retries + 1};
        {undefined, _} ->
            error(fold_range_not_progressing)
    end.

get_db_handle() ->
    case get(?PDICT_DB_KEY) of
        undefined ->
            {ok, Db} = application:get_env(fabric, db),
            put(?PDICT_DB_KEY, Db),
            Db;
        Db ->
            Db
    end.

require_transaction(#{tx := {erlfdb_snapshot, _}} = _Db) ->
    ok;
require_transaction(#{tx := {erlfdb_transaction, _}} = _Db) ->
    ok;
require_transaction(#{} = _Db) ->
    erlang:error(transaction_required).

ensure_current(Db) ->
    ensure_current(Db, true).

ensure_current(#{} = Db0, CheckDbVersion) ->
    require_transaction(Db0),
    Db3 =
        case check_metadata_version(Db0) of
            {current, Db1} ->
                Db1;
            {stale, Db1} ->
                case check_db_version(Db1, CheckDbVersion) of
                    current ->
                        % If db version is current, update cache with the latest
                        % metadata so other requests can immediately see the
                        % refreshed db handle.
                        Now = erlang:monotonic_time(millisecond),
                        Db2 = Db1#{check_current_ts := Now},
                        fabric2_server:maybe_update(Db2),
                        Db2;
                    stale ->
                        fabric2_server:maybe_remove(Db1),
                        throw({?MODULE, reopen})
                end
        end,
    case maps:get(security_fun, Db3) of
        SecurityFun when is_function(SecurityFun, 2) ->
            #{security_doc := SecDoc} = Db3,
            ok = SecurityFun(Db3, SecDoc),
            Db3#{security_fun := undefined};
        undefined ->
            Db3
    end.

check_db_instance(undefined) ->
    undefined;
check_db_instance(#{} = Db) ->
    require_transaction(Db),
    case check_metadata_version(Db) of
        {current, Db1} ->
            Db1;
        {stale, Db1} ->
            #{
                tx := Tx,
                uuid := UUID,
                db_prefix := DbPrefix
            } = Db1,
            UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix),
            case erlfdb:wait(erlfdb:get(Tx, UUIDKey)) of
                UUID -> Db1;
                _ -> error(database_does_not_exist)
            end
    end.

is_transaction_applied(Tx) ->
    was_commit_unknown_result() andalso
        has_transaction_id() andalso
        transaction_id_exists(Tx).

get_previous_transaction_result() ->
    get(?PDICT_TX_RES_KEY).

execute_transaction(Tx, Fun, LayerPrefix) ->
    put(?PDICT_CHECKED_MD_IS_CURRENT, false),
    put(?PDICT_CHECKED_DB_IS_CURRENT, false),
    Result = Fun(Tx),
    case erlfdb:is_read_only(Tx) of
        true ->
            ok;
        false ->
            erlfdb:set(Tx, get_transaction_id(Tx, LayerPrefix), <<>>),
            put(?PDICT_TX_RES_KEY, Result)
    end,
    Result.

clear_transaction() ->
    fabric2_txids:remove(get(?PDICT_TX_ID_KEY)),
    erase(?PDICT_CHECKED_DB_IS_CURRENT),
    erase(?PDICT_CHECKED_MD_IS_CURRENT),
    erase(?PDICT_TX_ID_KEY),
    erase(?PDICT_TX_RES_KEY),
    erase(?PDICT_TX_RES_WAS_UNKNOWN).

was_commit_unknown_result() ->
    case get(?PDICT_TX_RES_WAS_UNKNOWN) of
        true ->
            true;
        undefined ->
            case erlfdb:get_last_error() == ?ERLFDB_COMMIT_UNKNOWN_RESULT of
                true ->
                    put(?PDICT_TX_RES_WAS_UNKNOWN, true),
                    true;
                false ->
                    false
            end
    end.

has_transaction_id() ->
    is_binary(get(?PDICT_TX_ID_KEY)).

transaction_id_exists(Tx) ->
    erlfdb:wait(erlfdb:get(Tx, get(?PDICT_TX_ID_KEY))) == <<>>.

get_transaction_id(Tx, LayerPrefix) ->
    case get(?PDICT_TX_ID_KEY) of
        undefined ->
            TxId = fabric2_txids:create(Tx, LayerPrefix),
            put(?PDICT_TX_ID_KEY, TxId),
            TxId;
        TxId when is_binary(TxId) ->
            TxId
    end.

with_span(Operation, ExtraTags, Fun) ->
    case ctrace:has_span() of
        true ->
            Tags = maps:merge(
                #{
                    'span.kind' => <<"client">>,
                    component => <<"couchdb.fabric">>,
                    'db.instance' => fabric2_server:fdb_cluster(),
                    'db.namespace' => fabric2_server:fdb_directory(),
                    'db.type' => <<"fdb">>,
                    nonce => get(nonce),
                    pid => self()
                },
                ExtraTags
            ),
            ctrace:with_span(Operation, Tags, Fun);
        false ->
            Fun()
    end.

get_info_wait_int(#info_future{} = InfoFuture) ->
    #info_future{
        db_prefix = DbPrefix,
        changes_future = ChangesFuture,
        uuid_future = UUIDFuture,
        meta_future = MetaFuture
    } = InfoFuture,

    RawSeq =
        case erlfdb:wait(ChangesFuture) of
            [] ->
                vs_to_seq(fabric2_util:seq_zero_vs());
            [{SeqKey, _}] ->
                {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(SeqKey, DbPrefix),
                vs_to_seq(SeqVS)
        end,
    CProp = {update_seq, RawSeq},

    UUIDProp = {uuid, erlfdb:wait(UUIDFuture)},

    MProps = lists:foldl(
        fun({K, V}, Acc) ->
            case erlfdb_tuple:unpack(K, DbPrefix) of
                {?DB_STATS, <<"doc_count">>} ->
                    [{doc_count, ?bin2uint(V)} | Acc];
                {?DB_STATS, <<"doc_del_count">>} ->
                    [{doc_del_count, ?bin2uint(V)} | Acc];
                {?DB_STATS, <<"sizes">>, Name} ->
                    Val = ?bin2uint(V),
                    {_, {Sizes}} = lists:keyfind(sizes, 1, Acc),
                    NewSizes = lists:keystore(Name, 1, Sizes, {Name, Val}),
                    lists:keystore(sizes, 1, Acc, {sizes, {NewSizes}});
                {?DB_STATS, _} ->
                    Acc
            end
        end,
        [{sizes, {[]}}],
        erlfdb:wait(MetaFuture)
    ),

    [CProp, UUIDProp | MProps].

binary_chunk_size() ->
    ConfigBinaryChunkSize = config:get_integer(
        "fabric",
        "binary_chunk_size",
        ?DEFAULT_BINARY_CHUNK_SIZE_BYTES
    ),
    min(ConfigBinaryChunkSize, ?DEFAULT_BINARY_CHUNK_SIZE_BYTES).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

fdb_to_revinfo_version_compatibility_test() ->
    DocId = <<"doc_id">>,
    FirstRevFormat = 0,
    RevPos = 1,
    Rev = <<60, 84, 174, 140, 210, 120, 192, 18, 100, 148, 9, 181, 129, 165, 248, 92>>,
    RevPath = {},
    NotDeleted = true,
    Sequence = {versionstamp, 10873034897377, 0, 0},
    BranchCount = 1,

    KeyWinner = {?DB_REVS, DocId, NotDeleted, RevPos, Rev},
    ValWinner = {FirstRevFormat, Sequence, BranchCount, RevPath},
    ExpectedWinner = expected(
        true, BranchCount, NotDeleted, RevPos, Rev, RevPath, Sequence
    ),
    ?assertEqual(ExpectedWinner, fdb_to_revinfo(KeyWinner, ValWinner)),

    KeyLoser = {?DB_REVS, DocId, NotDeleted, RevPos, Rev},
    ValLoser = {FirstRevFormat, RevPath},
    ExpectedLoser = expected(
        false, undefined, NotDeleted, RevPos, Rev, RevPath, undefined
    ),
    ?assertEqual(ExpectedLoser, fdb_to_revinfo(KeyLoser, ValLoser)),
    ok.

expected(Winner, BranchCount, NotDeleted, RevPos, Rev, RevPath, Sequence) ->
    #{
        att_hash => <<>>,
        branch_count => BranchCount,
        deleted => not NotDeleted,
        exists => true,
        rev_id => {RevPos, Rev},
        rev_path => tuple_to_list(RevPath),
        rev_size => 0,
        sequence => Sequence,
        winner => Winner
    }.

-endif.
