% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(fabric2_fdb).


-export([
    transactional/1,
    transactional/2,
    transactional/3,

    with_snapshot/2,

    create/2,
    open/2,
    ensure_current/1,
    delete/1,
    undelete/3,
    remove_deleted_db/2,
    exists/1,

    get_dir/1,

    list_dbs/4,
    list_dbs_info/4,
    list_deleted_dbs_info/4,

    get_info/1,
    get_info_future/2,
    get_info_wait/1,
    set_config/3,

    get_stat/2,
    incr_stat/3,
    incr_stat/4,

    get_all_revs/2,
    get_all_revs_future/2,
    get_winning_revs/3,
    get_winning_revs_future/3,
    get_revs_wait/2,
    get_non_deleted_rev/3,

    get_doc_body/3,
    get_doc_body_future/3,
    get_doc_body_wait/4,

    get_local_doc_rev_future/2,
    get_local_doc_rev_wait/1,
    get_local_doc_body_future/3,
    get_local_doc_body_wait/4,
    get_local_doc/2,
    get_local_doc_rev/3,

    write_doc/6,
    write_local_doc/2,

    read_attachment/3,
    write_attachment/4,

    get_last_change/1,

    fold_range/5,

    vs_to_seq/1,
    seq_to_vs/1,
    next_vs/1,

    new_versionstamp/1,

    get_approximate_tx_size/1,

    chunkify_binary/1,
    chunkify_binary/2,

    debug_cluster/0,
    debug_cluster/2
]).


-include_lib("couch/include/couch_db.hrl").
-include("fabric2.hrl").


-record(fold_acc, {
    db,
    restart_tx,
    start_key,
    end_key,
    limit,
    skip,
    retries,
    base_opts,
    user_fun,
    user_acc
}).

-record(info_future, {
    tx,
    db_prefix,
    changes_future,
    meta_future,
    uuid_future,
    retries = 0
}).


transactional(Fun) ->
    do_transaction(Fun, undefined, #{}).


transactional(DbName, Fun) when is_binary(DbName), is_function(Fun) ->
    transactional(DbName, #{}, Fun);

transactional(#{} = Db, Fun) when is_function(Fun) ->
    transactional(Db, #{}, Fun).



transactional(DbName, #{} = TxOptions, Fun) when is_binary(DbName) ->
    with_span(Fun, #{'db.name' => DbName}, fun() ->
        do_transaction(fun(Tx) ->
            Fun(init_db(Tx, DbName))
        end, undefined, TxOptions)
    end);

transactional(#{tx := undefined} = Db, #{} = TxOptions, Fun) ->
    DbName = maps:get(name, Db, undefined),
    try
        Db1 = refresh(Db),
        Reopen = maps:get(reopen, Db1, false),
        Db2 = maps:remove(reopen, Db1),
        LayerPrefix = case Reopen of
            true -> undefined;
            false -> maps:get(layer_prefix, Db2)
        end,
        with_span(Fun, #{'db.name' => DbName}, fun() ->
            do_transaction(fun(Tx) ->
                case Reopen of
                    true -> Fun(reopen(Db2#{tx => Tx}));
                    false -> Fun(Db2#{tx => Tx})
                end
            end, LayerPrefix, TxOptions)
        end)
    catch throw:{?MODULE, reopen} ->
        with_span('db.reopen', #{'db.name' => DbName}, fun() ->
            transactional(Db#{reopen => true}, Fun)
        end)
    end;
transactional(#{tx := {erlfdb_snapshot, _}} = Db, #{} = _TxOptions, Fun) ->
    DbName = maps:get(name, Db, undefined),
    with_span(Fun, #{'db.name' => DbName}, fun() ->
        Fun(Db)
    end);

transactional(#{tx := {erlfdb_transaction, _}} = Db, #{} = _TxOptions, Fun) ->
    DbName = maps:get(name, Db, undefined),
    with_span(Fun, #{'db.name' => DbName}, fun() ->
        Fun(Db)
    end).


do_transaction(Fun, LayerPrefix, #{} = TxOptions) when is_function(Fun, 1) ->
    Db = get_db_handle(),
    try
        erlfdb:transactional(Db, fun(Tx) ->
            apply_tx_options(Tx, TxOptions),
            case get(erlfdb_trace) of
                Name when is_binary(Name) ->
                    UId = erlang:unique_integer([positive]),
                    UIdBin = integer_to_binary(UId, 36),
                    TxId = <<Name/binary, "_", UIdBin/binary>>,
                    erlfdb:set_option(Tx, transaction_logging_enable, TxId);
                _ ->
                    ok
            end,
            case is_transaction_applied(Tx) of
                true ->
                    get_previous_transaction_result();
                false ->
                    execute_transaction(Tx, Fun, LayerPrefix)
            end
        end)
    after
        clear_transaction()
    end.


apply_tx_options(Tx, #{} = TxOptions) ->
    maps:map(fun(K, V) ->
        erlfdb:set_option(Tx, K, V)
    end, TxOptions).


with_snapshot(#{tx := {erlfdb_transaction, _} = Tx} = TxDb, Fun) ->
    SSDb = TxDb#{tx := erlfdb:snapshot(Tx)},
    Fun(SSDb);

with_snapshot(#{tx := {erlfdb_snapshot, _}} = SSDb, Fun) ->
    Fun(SSDb).


create(#{} = Db0, Options) ->
    #{
        name := DbName,
        tx := Tx,
        layer_prefix := LayerPrefix
    } = Db1 = ensure_current(Db0, false),

    DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
    HCA = erlfdb_hca:create(erlfdb_tuple:pack({?DB_HCA}, LayerPrefix)),
    AllocPrefix = erlfdb_hca:allocate(HCA, Tx),
    DbPrefix = erlfdb_tuple:pack({?DBS, AllocPrefix}, LayerPrefix),
    erlfdb:set(Tx, DbKey, DbPrefix),

    % This key is responsible for telling us when something in
    % the database cache (i.e., fabric2_server's ets table) has
    % changed and requires re-loading. This currently includes
    % revs_limit and validate_doc_update functions. There's
    % no order to versioning here. Its just a value that changes
    % that is used in the ensure_current check.
    DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
    DbVersion = fabric2_util:uuid(),
    erlfdb:set(Tx, DbVersionKey, DbVersion),

    UUID = fabric2_util:uuid(),

    Defaults = [
        {?DB_CONFIG, <<"uuid">>, UUID},
        {?DB_CONFIG, <<"revs_limit">>, ?uint2bin(1000)},
        {?DB_CONFIG, <<"security_doc">>, <<"{}">>},
        {?DB_STATS, <<"doc_count">>, ?uint2bin(0)},
        {?DB_STATS, <<"doc_del_count">>, ?uint2bin(0)},
        {?DB_STATS, <<"doc_design_count">>, ?uint2bin(0)},
        {?DB_STATS, <<"doc_local_count">>, ?uint2bin(0)},
        {?DB_STATS, <<"sizes">>, <<"external">>, ?uint2bin(2)},
        {?DB_STATS, <<"sizes">>, <<"views">>, ?uint2bin(0)}
    ],
    lists:foreach(fun
        ({P, K, V}) ->
            Key = erlfdb_tuple:pack({P, K}, DbPrefix),
            erlfdb:set(Tx, Key, V);
        ({P, S, K, V}) ->
            Key = erlfdb_tuple:pack({P, S, K}, DbPrefix),
            erlfdb:set(Tx, Key, V)
    end, Defaults),

    UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}),
    Options1 = lists:keydelete(user_ctx, 1, Options),

    Db2 = Db1#{
        uuid => UUID,
        db_prefix => DbPrefix,
        db_version => DbVersion,

        revs_limit => 1000,
        security_doc => {[]},
        user_ctx => UserCtx,
        check_current_ts => erlang:monotonic_time(millisecond),

        validate_doc_update_funs => [],
        before_doc_update => undefined,
        after_doc_read => undefined,
        % All other db things as we add features,

        db_options => Options1,
        interactive => false
    },
    aegis:init_db(Db2, Options).


open(#{} = Db0, Options) ->
    #{
        name := DbName,
        tx := Tx,
        layer_prefix := LayerPrefix
    } = Db1 = ensure_current(Db0, false),

    DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
    DbPrefix = case erlfdb:wait(erlfdb:get(Tx, DbKey)) of
        Bin when is_binary(Bin) -> Bin;
        not_found -> erlang:error(database_does_not_exist)
    end,

    DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
    DbVersion = erlfdb:wait(erlfdb:get(Tx, DbVersionKey)),

    UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}),
    Options1 = lists:keydelete(user_ctx, 1, Options),

    UUID = fabric2_util:get_value(uuid, Options1),
    Options2 = lists:keydelete(uuid, 1, Options1),

    Interactive = fabric2_util:get_value(interactive, Options2, false),
    Options3 = lists:keydelete(interactive, 1, Options2),

    Db2 = Db1#{
        db_prefix => DbPrefix,
        db_version => DbVersion,

        uuid => <<>>,
        revs_limit => 1000,
        security_doc => {[]},

        user_ctx => UserCtx,
        check_current_ts => erlang:monotonic_time(millisecond),

        % Place holders until we implement these
        % bits.
        validate_doc_update_funs => [],
        before_doc_update => undefined,
        after_doc_read => undefined,

        db_options => Options3,
        interactive => Interactive
    },

    Db3 = load_config(Db2),
    Db4 = aegis:open_db(Db3),

    case {UUID, Db4} of
        {undefined, _} -> ok;
        {<<_/binary>>, #{uuid := UUID}} -> ok;
        {<<_/binary>>, #{uuid := _}} -> erlang:error(database_does_not_exist)
    end,

    load_validate_doc_funs(Db4).


% Match on `name` in the function head since some non-fabric2 db
% objects might not have names and so they don't get cached
refresh(#{tx := undefined, name := DbName} = Db) ->
    #{
        uuid := UUID,
        md_version := OldVer
    } = Db,

    case fabric2_server:fetch(DbName, UUID) of
        % Relying on these assumptions about the `md_version` value:
        %  - It is bumped every time `db_version` is bumped
        %  - Is a versionstamp, so we can check which one is newer
        %  - If it is `not_found`, it would sort less than a binary value
        #{md_version := Ver} = Db1 when Ver > OldVer ->
            Db1#{
                user_ctx := maps:get(user_ctx, Db),
                security_fun := maps:get(security_fun, Db),
                interactive := maps:get(interactive, Db)
            };
        _ ->
            Db
    end;

refresh(#{} = Db) ->
    Db.



reopen(#{} = OldDb) ->
    require_transaction(OldDb),
    #{
        tx := Tx,
        name := DbName,
        uuid := UUID,
        db_options := Options,
        user_ctx := UserCtx,
        security_fun := SecurityFun,
        interactive := Interactive
    } = OldDb,
    Options1 = lists:keystore(user_ctx, 1, Options, {user_ctx, UserCtx}),
    NewDb = open(init_db(Tx, DbName), Options1),

    % Check if database was re-created
    case {Interactive, maps:get(uuid, NewDb)} of
        {true, _} -> ok;
        {false, UUID} -> ok;
        {false, _OtherUUID} -> error(database_does_not_exist)
    end,

    NewDb#{security_fun := SecurityFun, interactive := Interactive}.


delete(#{} = Db) ->
    DoRecovery = fabric2_util:do_recovery(),
    case DoRecovery of
        true -> soft_delete_db(Db);
        false -> hard_delete_db(Db)
    end.


undelete(#{} = Db0, TgtDbName, TimeStamp) ->
    #{
        name := DbName,
        tx := Tx,
        layer_prefix := LayerPrefix
    } = ensure_current(Db0, false),
    DbKey = erlfdb_tuple:pack({?ALL_DBS, TgtDbName}, LayerPrefix),
    case erlfdb:wait(erlfdb:get(Tx, DbKey)) of
        Bin when is_binary(Bin) ->
            file_exists;
        not_found ->
            DeletedDbTupleKey = {
                ?DELETED_DBS,
                DbName,
                TimeStamp
            },
            DeleteDbKey = erlfdb_tuple:pack(DeletedDbTupleKey, LayerPrefix),
            case erlfdb:wait(erlfdb:get(Tx, DeleteDbKey)) of
                not_found ->
                    not_found;
                DbPrefix ->
                    erlfdb:set(Tx, DbKey, DbPrefix),
                    erlfdb:clear(Tx, DeleteDbKey),
                    bump_db_version(#{
                        tx => Tx,
                        db_prefix => DbPrefix
                    }),
                    ok
            end
    end.


remove_deleted_db(#{} = Db0, TimeStamp) ->
    #{
        name := DbName,
        tx := Tx,
        layer_prefix := LayerPrefix
    } = ensure_current(Db0, false),

    DeletedDbTupleKey = {
        ?DELETED_DBS,
        DbName,
        TimeStamp
    },
    DeletedDbKey = erlfdb_tuple:pack(DeletedDbTupleKey, LayerPrefix),
    case erlfdb:wait(erlfdb:get(Tx, DeletedDbKey)) of
        not_found ->
            not_found;
        DbPrefix ->
            erlfdb:clear(Tx, DeletedDbKey),
            erlfdb:clear_range_startswith(Tx, DbPrefix),
            bump_db_version(#{
                tx => Tx,
                db_prefix => DbPrefix
            }),
            ok
    end.


exists(#{name := DbName} = Db) when is_binary(DbName) ->
    #{
        tx := Tx,
        layer_prefix := LayerPrefix
    } = ensure_current(Db, false),

    DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
    case erlfdb:wait(erlfdb:get(Tx, DbKey)) of
        Bin when is_binary(Bin) -> true;
        not_found -> false
    end.


get_dir(Tx) ->
    Root = erlfdb_directory:root(),
    Dir = fabric2_server:fdb_directory(),
    CouchDB = erlfdb_directory:create_or_open(Tx, Root, Dir),
    erlfdb_directory:get_name(CouchDB).


list_dbs(Tx, Callback, AccIn, Options0) ->
    Options = case fabric2_util:get_value(restart_tx, Options0) of
        undefined -> [{restart_tx, true} | Options0];
        _AlreadySet -> Options0
    end,
    LayerPrefix = get_dir(Tx),
    Prefix = erlfdb_tuple:pack({?ALL_DBS}, LayerPrefix),
    fold_range({tx, Tx}, Prefix, fun({K, _V}, Acc) ->
        {DbName} = erlfdb_tuple:unpack(K, Prefix),
        Callback(DbName, Acc)
    end, AccIn, Options).


list_dbs_info(Tx, Callback, AccIn, Options0) ->
    Options = case fabric2_util:get_value(restart_tx, Options0) of
        undefined -> [{restart_tx, true} | Options0];
        _AlreadySet -> Options0
    end,
    LayerPrefix = get_dir(Tx),
    Prefix = erlfdb_tuple:pack({?ALL_DBS}, LayerPrefix),
    fold_range({tx, Tx}, Prefix, fun({DbNameKey, DbPrefix}, Acc) ->
        {DbName} = erlfdb_tuple:unpack(DbNameKey, Prefix),
        InfoFuture = get_info_future(Tx, DbPrefix),
        Callback(DbName, InfoFuture, Acc)
    end, AccIn, Options).


list_deleted_dbs_info(Tx, Callback, AccIn, Options0) ->
    Options = case fabric2_util:get_value(restart_tx, Options0) of
        undefined -> [{restart_tx, true} | Options0];
        _AlreadySet -> Options0
    end,
    LayerPrefix = get_dir(Tx),
    Prefix = erlfdb_tuple:pack({?DELETED_DBS}, LayerPrefix),
    fold_range({tx, Tx}, Prefix, fun({DbKey, DbPrefix}, Acc) ->
        {DbName, TimeStamp} = erlfdb_tuple:unpack(DbKey, Prefix),
        InfoFuture = get_info_future(Tx, DbPrefix),
        Callback(DbName, TimeStamp, InfoFuture, Acc)
    end, AccIn, Options).


get_info(#{} = Db) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),
    DbInfo = get_info_wait(get_info_future(Tx, DbPrefix)),
    AegisProps = aegis:get_db_info(Db),
    [{encryption, {AegisProps}} | DbInfo].


get_info_future(Tx, DbPrefix) ->
    {CStart, CEnd} = erlfdb_tuple:range({?DB_CHANGES}, DbPrefix),
    ChangesFuture = erlfdb:get_range(Tx, CStart, CEnd, [
        {streaming_mode, exact},
        {limit, 1},
        {reverse, true}
    ]),

    UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix),
    UUIDFuture = erlfdb:get(Tx, UUIDKey),

    StatsPrefix = erlfdb_tuple:pack({?DB_STATS}, DbPrefix),
    MetaFuture = erlfdb:get_range_startswith(Tx, StatsPrefix),

    % Save the tx object only if it's read-only as we might retry to get the
    % future again after the tx was reset
    SaveTx = case erlfdb:get_writes_allowed(Tx) of
        true -> undefined;
        false -> Tx
    end,

    #info_future{
        tx = SaveTx,
        db_prefix = DbPrefix,
        changes_future = ChangesFuture,
        meta_future = MetaFuture,
        uuid_future = UUIDFuture
    }.


get_info_wait(#info_future{tx = Tx, retries = Retries} = Future)
        when Tx =:= undefined orelse Retries >= 2 ->
    get_info_wait_int(Future);

get_info_wait(#info_future{tx = Tx, retries = Retries} = Future) ->
    try
        get_info_wait_int(Future)
    catch
        error:{erlfdb_error, ?ERLFDB_TRANSACTION_CANCELLED} ->
            Future1 = get_info_future(Tx, Future#info_future.db_prefix),
            get_info_wait(Future1#info_future{retries = Retries + 1});
        error:{erlfdb_error, Error} when ?ERLFDB_IS_RETRYABLE(Error) ->
            ok = erlfdb:reset(Tx),
            Future1 = get_info_future(Tx, Future#info_future.db_prefix),
            get_info_wait(Future1#info_future{retries = Retries + 1})
    end.


load_config(#{} = Db) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = Db,

    {Start, End} = erlfdb_tuple:range({?DB_CONFIG}, DbPrefix),
    Future = erlfdb:get_range(Tx, Start, End),

    lists:foldl(fun({K, V}, DbAcc) ->
        {?DB_CONFIG, Key} = erlfdb_tuple:unpack(K, DbPrefix),
        case Key of
            <<"uuid">> ->  DbAcc#{uuid := V};
            <<"revs_limit">> -> DbAcc#{revs_limit := ?bin2uint(V)};
            <<"security_doc">> -> DbAcc#{security_doc := ?JSON_DECODE(V)}
        end
    end, Db, erlfdb:wait(Future)).


set_config(#{} = Db0, Key, Val) when is_atom(Key) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = Db = ensure_current(Db0),
    {BinKey, BinVal} = case Key of
        uuid -> {<<"uuid">>, Val};
        revs_limit -> {<<"revs_limit">>, ?uint2bin(max(1, Val))};
        security_doc -> {<<"security_doc">>, ?JSON_ENCODE(Val)}
    end,
    DbKey = erlfdb_tuple:pack({?DB_CONFIG, BinKey}, DbPrefix),
    erlfdb:set(Tx, DbKey, BinVal),
    {ok, DbVersion} = bump_db_version(Db),
    {ok, Db#{db_version := DbVersion, Key := Val}}.


get_stat(#{} = Db, StatKey) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    Key = erlfdb_tuple:pack({?DB_STATS, StatKey}, DbPrefix),

    % Might need to figure out some sort of type
    % system here. Uints are because stats are all
    % atomic op adds for the moment.
    ?bin2uint(erlfdb:wait(erlfdb:get(Tx, Key))).


incr_stat(_Db, _StatKey, 0) ->
    ok;

incr_stat(#{} = Db, StatKey, Increment) when is_integer(Increment) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    Key = erlfdb_tuple:pack({?DB_STATS, StatKey}, DbPrefix),
    erlfdb:add(Tx, Key, Increment).


incr_stat(_Db, _Section, _Key, 0) ->
    ok;

incr_stat(#{} = Db, Section, Key, Increment) when is_integer(Increment) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    BinKey = erlfdb_tuple:pack({?DB_STATS, Section, Key}, DbPrefix),
    erlfdb:add(Tx, BinKey, Increment).


get_all_revs(#{} = Db, DocId) ->
    DbName = maps:get(name, Db, undefined),
    with_span('db.get_all_revs', #{'db.name' => DbName, 'doc.id' => DocId}, fun() ->
        Future = get_all_revs_future(Db, DocId),
        get_revs_wait(Db, Future)
    end).


get_all_revs_future(#{} = Db, DocId) ->
    Options = [{streaming_mode, want_all}],
    get_revs_future(Db, DocId, Options).


get_winning_revs(Db, DocId, NumRevs) ->
    DbName = maps:get(name, Db, undefined),
    with_span('db.get_winning_revs', #{'db.name' => DbName, 'doc.id' => DocId}, fun() ->
        Future = get_winning_revs_future(Db, DocId, NumRevs),
        get_revs_wait(Db, Future)
    end).


get_winning_revs_future(#{} = Db, DocId, NumRevs) ->
    Options = [{reverse, true}, {limit, NumRevs}],
    get_revs_future(Db, DocId, Options).


get_revs_future(#{} = Db, DocId, Options) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    {StartKey, EndKey} = erlfdb_tuple:range({?DB_REVS, DocId}, DbPrefix),
    erlfdb:fold_range_future(Tx, StartKey, EndKey, Options).


get_revs_wait(#{} = Db, RangeFuture) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    RevRows = erlfdb:fold_range_wait(Tx, RangeFuture, fun({K, V}, Acc) ->
        Key = erlfdb_tuple:unpack(K, DbPrefix),
        Val = erlfdb_tuple:unpack(V),
        [fdb_to_revinfo(Key, Val) | Acc]
    end, []),
    lists:reverse(RevRows).


get_non_deleted_rev(#{} = Db, DocId, RevId) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    {RevPos, Rev} = RevId,

    BaseKey = {?DB_REVS, DocId, true, RevPos, Rev},
    Key = erlfdb_tuple:pack(BaseKey, DbPrefix),
    case erlfdb:wait(erlfdb:get(Tx, Key)) of
        not_found ->
            not_found;
        Val ->
            fdb_to_revinfo(BaseKey, erlfdb_tuple:unpack(Val))
    end.


get_doc_body(Db, DocId, RevInfo) ->
    DbName = maps:get(name, Db, undefined),
    with_span('db.get_doc_body', #{'db.name' => DbName, 'doc.id' => DocId}, fun() ->
        Future = get_doc_body_future(Db, DocId, RevInfo),
        get_doc_body_wait(Db, DocId, RevInfo, Future)
    end).


get_doc_body_future(#{} = Db, DocId, RevInfo) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    #{
        rev_id := {RevPos, Rev}
    } = RevInfo,

    Key = {?DB_DOCS, DocId, RevPos, Rev},
    {StartKey, EndKey} = erlfdb_tuple:range(Key, DbPrefix),
    erlfdb:fold_range_future(Tx, StartKey, EndKey, []).


get_doc_body_wait(#{} = Db0, DocId, RevInfo, Future) ->
    #{
        tx := Tx
    } = Db = ensure_current(Db0),

    #{
        rev_id := {RevPos, Rev},
        rev_path := RevPath
    } = RevInfo,

    FoldFun = aegis:wrap_fold_fun(Db, fun({_K, V}, Acc) ->
        [V | Acc]
    end),
    RevBodyRows = erlfdb:fold_range_wait(Tx, Future, FoldFun, []),
    BodyRows = lists:reverse(RevBodyRows),

    fdb_to_doc(Db, DocId, RevPos, [Rev | RevPath], BodyRows).


get_local_doc_rev_future(Db, DocId) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, DocId}, DbPrefix),
    erlfdb:get(Tx, Key).


get_local_doc_rev_wait(Future) ->
    erlfdb:wait(Future).


get_local_doc_body_future(#{} = Db, DocId, _Rev) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    Prefix = erlfdb_tuple:pack({?DB_LOCAL_DOC_BODIES, DocId}, DbPrefix),
    erlfdb:get_range_startswith(Tx, Prefix).


get_local_doc_body_wait(#{} = Db0, DocId, Rev, Future) ->
    Db = ensure_current(Db0),

    {_, Chunks} = lists:unzip(aegis:decrypt(Db, erlfdb:wait(Future))),
    fdb_to_local_doc(Db, DocId, Rev, Chunks).


get_local_doc(#{} = Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId) ->
    RevFuture = get_local_doc_rev_future(Db, DocId),
    Rev = get_local_doc_rev_wait(RevFuture),

    BodyFuture = get_local_doc_body_future(Db, DocId, Rev),
    get_local_doc_body_wait(Db, DocId, Rev, BodyFuture).


get_local_doc_rev(_Db0, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, Val) ->
    case Val of
        <<255, RevBin/binary>> ->
            % Versioned local docs
            try
                case erlfdb_tuple:unpack(RevBin) of
                    {?CURR_LDOC_FORMAT, Rev, _Size} -> Rev
                end
            catch _:_ ->
                erlang:error({invalid_local_doc_rev, DocId, Val})
            end;
        <<131, _/binary>> ->
            % Compatibility clause for an older encoding format
            try binary_to_term(Val, [safe]) of
                {Rev, _} -> Rev;
                _ -> erlang:error({invalid_local_doc_rev, DocId, Val})
            catch
                error:badarg ->
                    erlang:error({invalid_local_doc_rev, DocId, Val})
            end;
        <<_/binary>> ->
            try binary_to_integer(Val) of
                IntVal when IntVal >= 0 ->
                    Val;
                _ ->
                    erlang:error({invalid_local_doc_rev, DocId, Val})
            catch
                error:badarg ->
                    erlang:error({invalid_local_doc_rev, DocId, Val})
            end
    end.


write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = Db = ensure_current(Db0),

    #doc{
        id = DocId,
        deleted = Deleted,
        atts = Atts
    } = Doc,

    % Doc body

    ok = write_doc_body(Db, Doc),

    % Attachment bookkeeping

    % If a document's attachments have changed we have to scan
    % for any attachments that may need to be deleted. The check
    % for `>= 2` is a bit subtle. The important point is that
    % one of the revisions will be from the new document so we
    % have to find at least one more beyond that to assert that
    % the attachments have not changed.
    AttHash = fabric2_util:hash_atts(Atts),
    RevsToCheck = [NewWinner0] ++ ToUpdate ++ ToRemove,
    AttHashCount = lists:foldl(fun(Att, Count) ->
        #{att_hash := RevAttHash} = Att,
        case RevAttHash == AttHash of
            true -> Count + 1;
            false -> Count
        end
    end, 0, RevsToCheck),
    if
        AttHashCount == length(RevsToCheck) ->
            ok;
        AttHashCount >= 2 ->
            ok;
        true ->
            cleanup_attachments(Db, DocId, Doc, ToRemove)
    end,

    % Revision tree

    NewWinner = NewWinner0#{
        winner := true
    },
    NewRevId = maps:get(rev_id, NewWinner),

    {WKey, WVal, WinnerVS} = revinfo_to_fdb(Tx, DbPrefix, DocId, NewWinner),
    ok = erlfdb:set_versionstamped_value(Tx, WKey, WVal),

    lists:foreach(fun(RI0) ->
        RI = RI0#{winner := false},
        {K, V, undefined} = revinfo_to_fdb(Tx, DbPrefix, DocId, RI),
        ok = erlfdb:set(Tx, K, V)
    end, ToUpdate),

    lists:foreach(fun(RI0) ->
        RI = RI0#{winner := false},
        {K, _, undefined} = revinfo_to_fdb(Tx, DbPrefix, DocId, RI),
        ok = erlfdb:clear(Tx, K),
        ok = clear_doc_body(Db, DocId, RI0)
    end, ToRemove),

    % _all_docs

    UpdateStatus = case {OldWinner, NewWinner} of
        {not_found, #{deleted := false}} ->
            created;
        {not_found, #{deleted := true}} ->
            replicate_deleted;
        {#{deleted := true}, #{deleted := false}} ->
            recreated;
        {#{deleted := false}, #{deleted := false}} ->
            updated;
        {#{deleted := false}, #{deleted := true}} ->
            deleted;
        {#{deleted := true}, #{deleted := true}} ->
            ignore
    end,

    case UpdateStatus of
        replicate_deleted ->
            ok;
        ignore ->
            ok;
        deleted ->
            ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
            ok = erlfdb:clear(Tx, ADKey);
        _ ->
            ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
            ADVal = erlfdb_tuple:pack(NewRevId),
            ok = erlfdb:set(Tx, ADKey, ADVal)
    end,

    % _changes

    if OldWinner == not_found -> ok; true ->
        OldSeq = maps:get(sequence, OldWinner),
        OldSeqKey = erlfdb_tuple:pack({?DB_CHANGES, OldSeq}, DbPrefix),
        erlfdb:clear(Tx, OldSeqKey)
    end,

    NewSeqKey = erlfdb_tuple:pack_vs({?DB_CHANGES, WinnerVS}, DbPrefix),
    NewSeqVal = erlfdb_tuple:pack({DocId, Deleted, NewRevId}),
    erlfdb:set_versionstamped_key(Tx, NewSeqKey, NewSeqVal),

    % Bump db version on design doc changes

    IsDDoc = case Doc#doc.id of
        <<?DESIGN_DOC_PREFIX, _/binary>> -> true;
        _ -> false
    end,

    if not IsDDoc -> ok; true ->
        bump_db_version(Db)
    end,

    % Update our document counts

    case UpdateStatus of
        created ->
            if not IsDDoc -> ok; true ->
                incr_stat(Db, <<"doc_design_count">>, 1)
            end,
            incr_stat(Db, <<"doc_count">>, 1);
        recreated ->
            if not IsDDoc -> ok; true ->
                incr_stat(Db, <<"doc_design_count">>, 1)
            end,
            incr_stat(Db, <<"doc_count">>, 1),
            incr_stat(Db, <<"doc_del_count">>, -1);
        replicate_deleted ->
            incr_stat(Db, <<"doc_del_count">>, 1);
        ignore ->
            ok;
        deleted ->
            if not IsDDoc -> ok; true ->
                incr_stat(Db, <<"doc_design_count">>, -1)
            end,
            incr_stat(Db, <<"doc_count">>, -1),
            incr_stat(Db, <<"doc_del_count">>, 1);
        updated ->
            ok
    end,

    fabric2_db_plugin:after_doc_write(Db, Doc, NewWinner, OldWinner,
        NewRevId, WinnerVS),

    % Update database size
    AddSize = sum_add_rev_sizes([NewWinner | ToUpdate]),
    RemSize = sum_rem_rev_sizes(ToRemove),
    incr_stat(Db, <<"sizes">>, <<"external">>, AddSize - RemSize),

    ok.


write_local_doc(#{} = Db0, Doc) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = Db = ensure_current(Db0),

    Id = Doc#doc.id,

    {LDocKey, LDocVal, NewSize, Rows} = local_doc_to_fdb(Db, Doc),

    {WasDeleted, PrevSize} = case erlfdb:wait(erlfdb:get(Tx, LDocKey)) of
        <<255, RevBin/binary>> ->
            case erlfdb_tuple:unpack(RevBin) of
                {?CURR_LDOC_FORMAT, _Rev, Size} ->
                    {false, Size}
            end;
        <<_/binary>> ->
            {false, 0};
        not_found ->
            {true, 0}
    end,

    BPrefix = erlfdb_tuple:pack({?DB_LOCAL_DOC_BODIES, Id}, DbPrefix),

    case Doc#doc.deleted of
        true ->
            erlfdb:clear(Tx, LDocKey),
            erlfdb:clear_range_startswith(Tx, BPrefix);
        false ->
            erlfdb:set(Tx, LDocKey, LDocVal),
            % Make sure to clear the whole range, in case there was a larger
            % document body there before.
            erlfdb:clear_range_startswith(Tx, BPrefix),
            lists:foreach(fun({K, V}) ->
                erlfdb:set(Tx, K, aegis:encrypt(Db, K, V))
            end, Rows)
    end,

    case {WasDeleted, Doc#doc.deleted} of
        {true, false} ->
            incr_stat(Db, <<"doc_local_count">>, 1);
        {false, true} ->
            incr_stat(Db, <<"doc_local_count">>, -1);
        _ ->
            ok
    end,

    incr_stat(Db, <<"sizes">>, <<"external">>, NewSize - PrevSize),

    ok.


read_attachment(#{} = Db, DocId, AttId) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix),
    Data = case erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)) of
        not_found ->
            throw({not_found, missing});
        KVs ->
            {_, Chunks} = lists:unzip(aegis:decrypt(Db, KVs)),
            iolist_to_binary(Chunks)
    end,

    IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix),
    case erlfdb:wait(erlfdb:get(Tx, IdKey)) of
        <<>> ->
            Data; % Old format, before CURR_ATT_STORAGE_VER = 0
        <<_/binary>> = InfoBin ->
            {?CURR_ATT_STORAGE_VER, Compressed} = erlfdb_tuple:unpack(InfoBin),
            case Compressed of
                true -> binary_to_term(Data, [safe]);
                false -> Data
            end
    end.


write_attachment(#{} = Db, DocId, Data, Encoding)
        when is_binary(Data), is_atom(Encoding) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    AttId = fabric2_util:uuid(),

    {Data1, Compressed} = case Encoding of
        gzip ->
            {Data, false};
        _ ->
            Opts = [{minor_version, 1}, {compressed, 6}],
            CompressedData = term_to_binary(Data, Opts),
            case size(CompressedData) < Data of
                true -> {CompressedData, true};
                false -> {Data, false}
            end
    end,

    IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix),
    InfoVal = erlfdb_tuple:pack({?CURR_ATT_STORAGE_VER, Compressed}),
    ok = erlfdb:set(Tx, IdKey, InfoVal),

    Chunks = chunkify_binary(Data1),

    lists:foldl(fun(Chunk, ChunkId) ->
        AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId, ChunkId}, DbPrefix),
        ok = erlfdb:set(Tx, AttKey, aegis:encrypt(Db, AttKey, Chunk)),
        ChunkId + 1
    end, 0, Chunks),
    {ok, AttId}.


get_last_change(#{} = Db) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    {Start, End} = erlfdb_tuple:range({?DB_CHANGES}, DbPrefix),
    Options = [{limit, 1}, {reverse, true}],
    case erlfdb:get_range(Tx, Start, End, Options) of
        [] ->
            vs_to_seq(fabric2_util:seq_zero_vs());
        [{K, _V}] ->
            {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix),
            vs_to_seq(SeqVS)
    end.


fold_range(TxOrDb, RangePrefix, UserFun, UserAcc, Options) ->
    {Db, Tx} = case TxOrDb of
        {tx, TxObj} ->
            {undefined, TxObj};
        #{} = DbObj ->
            DbObj1 = #{tx := TxObj} = ensure_current(DbObj),
            {DbObj1, TxObj}
    end,
    % FoundationDB treats a limit 0 of as unlimited so we guard against it
    case fabric2_util:get_value(limit, Options) of 0 -> UserAcc; _ ->
        FAcc = get_fold_acc(Db, RangePrefix, UserFun, UserAcc, Options),
        try
            fold_range(Tx, FAcc)
        after
            erase(?PDICT_FOLD_ACC_STATE)
        end
    end.


fold_range(Tx, FAcc) ->
    #fold_acc{
        start_key = Start,
        end_key = End,
        limit = Limit,
        base_opts = BaseOpts,
        restart_tx = DoRestart
    } = FAcc,
    case DoRestart of false -> ok; true ->
        ok = erlfdb:set_option(Tx, disallow_writes)
    end,
    Opts = [{limit, Limit} | BaseOpts],
    Callback = fun fold_range_cb/2,
    try
        #fold_acc{
            user_acc = FinalUserAcc
        } = erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts),
        FinalUserAcc
    catch error:{erlfdb_error, Error} when
            ?ERLFDB_IS_RETRYABLE(Error) andalso DoRestart ->
        % Possibly handle cluster_version_changed and future_version as well to
        % continue iteration instead fallback to transactional and retrying
        % from the beginning which is bound to fail when streaming data out to a
        % socket.
        fold_range(Tx, restart_fold(Tx, FAcc))
    end.


vs_to_seq(VS) when is_tuple(VS) ->
    % 51 is the versionstamp type tag
    <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({VS}),
    fabric2_util:to_hex(SeqBin).


seq_to_vs(Seq) when is_binary(Seq) ->
    Seq1 = fabric2_util:from_hex(Seq),
    % 51 is the versionstamp type tag
    Seq2 = <<51:8, Seq1/binary>>,
    {VS} = erlfdb_tuple:unpack(Seq2),
    VS.


next_vs({versionstamp, VS, Batch, TxId}) ->
    {V, B, T} = case TxId < 16#FFFF of
        true ->
            {VS, Batch, TxId + 1};
        false ->
            case Batch < 16#FFFF of
                true ->
                    {VS, Batch + 1, 0};
                false ->
                    {VS + 1, 0, 0}
            end
    end,
    {versionstamp, V, B, T};

next_vs({versionstamp, VS, Batch}) ->
    {V, B} = case Batch < 16#FFFF of
        true ->
            {VS, Batch + 1};
        false ->
            {VS + 1, 0}
    end,
    {versionstamp, V, B}.


new_versionstamp(Tx) ->
    TxId = erlfdb:get_next_tx_id(Tx),
    {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.


get_approximate_tx_size(#{} = TxDb) ->
    require_transaction(TxDb),
    #{tx := Tx} = TxDb,
    erlfdb:wait(erlfdb:get_approximate_size(Tx)).


chunkify_binary(Data) ->
    chunkify_binary(Data, binary_chunk_size()).


chunkify_binary(Data, Size) ->
    case Data of
        <<>> ->
            [];
        <<Head:Size/binary, Rest/binary>> ->
            [Head | chunkify_binary(Rest, Size)];
        <<_/binary>> when size(Data) < Size ->
            [Data]
    end.


debug_cluster() ->
    debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).


debug_cluster(Start, End) ->
    transactional(fun(Tx) ->
        lists:foreach(fun({Key, Val}) ->
            io:format(standard_error, "~s => ~s~n", [
                    string:pad(erlfdb_util:repr(Key), 60),
                    erlfdb_util:repr(Val)
                ])
        end, erlfdb:get_range(Tx, Start, End))
    end).


init_db(Tx, DbName) ->
    Prefix = get_dir(Tx),
    Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)),
    #{
        name => DbName,
        tx => Tx,
        layer_prefix => Prefix,
        md_version => Version,

        security_fun => undefined,
        db_options => []
    }.


load_validate_doc_funs(#{} = Db) ->
    FoldFun = fun
        ({row, Row}, Acc) ->
            DDocInfo = #{id => fabric2_util:get_value(id, Row)},
            {ok, [DDocInfo | Acc]};
        (_, Acc) ->
            {ok, Acc}
    end,

    Options = [
        {start_key, <<"_design/">>},
        {end_key, <<"_design0">>}
    ],

    {ok, Infos1} = fabric2_db:fold_docs(Db, FoldFun, [], Options),

    Infos2 = lists:map(fun(Info) ->
        #{
            id := DDocId = <<"_design/", _/binary>>
        } = Info,
        Info#{
            rev_info => get_winning_revs_future(Db, DDocId, 1)
        }
    end, Infos1),

    Infos3 = lists:flatmap(fun(Info) ->
        #{
            id := DDocId,
            rev_info := RevInfoFuture
        } = Info,
        [RevInfo] = get_revs_wait(Db, RevInfoFuture),
        #{deleted := Deleted} = RevInfo,
        if Deleted -> []; true ->
            [Info#{
                rev_info := RevInfo,
                body => get_doc_body_future(Db, DDocId, RevInfo)
            }]
        end
    end, Infos2),

    VDUs = lists:flatmap(fun(Info) ->
        #{
            id := DDocId,
            rev_info := RevInfo,
            body := BodyFuture
        } = Info,
        #doc{} = Doc = get_doc_body_wait(Db, DDocId, RevInfo, BodyFuture),
        case couch_doc:get_validate_doc_fun(Doc) of
            nil -> [];
            Fun -> [Fun]
        end
    end, Infos3),

    Db#{
        validate_doc_update_funs := VDUs
    }.


bump_metadata_version(Tx) ->
    % The 14 zero bytes is pulled from the PR for adding the
    % metadata version key. Not sure why 14 bytes when version
    % stamps are only 80, but whatever for now.
    erlfdb:set_versionstamped_value(Tx, ?METADATA_VERSION_KEY, <<0:112>>).


check_metadata_version(#{} = Db) ->
    #{
        tx := Tx,
        md_version := Version
    } = Db,

    AlreadyChecked = get(?PDICT_CHECKED_MD_IS_CURRENT),
    if AlreadyChecked == true -> {current, Db}; true ->
        case erlfdb:wait(erlfdb:get_ss(Tx, ?METADATA_VERSION_KEY)) of
            Version ->
                put(?PDICT_CHECKED_MD_IS_CURRENT, true),
                % We want to set a read conflict on the db version as we'd want
                % to conflict with any writes to this particular db. However
                % during db creation db prefix might not exist yet so we don't
                % add a read-conflict on it then.
                case maps:get(db_prefix, Db, not_found) of
                    not_found ->
                        ok;
                    <<_/binary>> = DbPrefix ->
                        DbVerKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
                        erlfdb:add_read_conflict_key(Tx, DbVerKey)
                end,
                {current, Db};
            NewVersion ->
                {stale, Db#{md_version := NewVersion}}
        end
    end.


bump_db_version(#{} = Db) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = Db,

    DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
    DbVersion = fabric2_util:uuid(),
    ok = erlfdb:set(Tx, DbVersionKey, DbVersion),
    ok = bump_metadata_version(Tx),
    {ok, DbVersion}.


check_db_version(#{} = Db, CheckDbVersion) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix,
        db_version := DbVersion
    } = Db,

    AlreadyChecked = get(?PDICT_CHECKED_DB_IS_CURRENT),
    if not CheckDbVersion orelse AlreadyChecked == true -> current; true ->
        DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
        case erlfdb:wait(erlfdb:get(Tx, DbVersionKey)) of
            DbVersion ->
                put(?PDICT_CHECKED_DB_IS_CURRENT, true),
                current;
            _NewDBVersion ->
                stale
        end
    end.


soft_delete_db(Db) ->
    #{
        name := DbName,
        tx := Tx,
        layer_prefix := LayerPrefix,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
    Timestamp = list_to_binary(fabric2_util:iso8601_timestamp()),
    DeletedDbKeyTuple = {?DELETED_DBS, DbName, Timestamp},
    DeletedDbKey = erlfdb_tuple:pack(DeletedDbKeyTuple, LayerPrefix),
    case erlfdb:wait(erlfdb:get(Tx, DeletedDbKey)) of
        not_found ->
            erlfdb:set(Tx, DeletedDbKey, DbPrefix),
            erlfdb:clear(Tx, DbKey),
            bump_db_version(Db),
            ok;
        _Val ->
            {deletion_frequency_exceeded, DbName}
    end.


hard_delete_db(Db) ->
    #{
        name := DbName,
        tx := Tx,
        layer_prefix := LayerPrefix,
        db_prefix := DbPrefix
    } = ensure_current(Db),

    DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),

    erlfdb:clear(Tx, DbKey),
    erlfdb:clear_range_startswith(Tx, DbPrefix),
    bump_metadata_version(Tx),
    ok.


write_doc_body(#{} = Db0, #doc{} = Doc) ->
    #{
        tx := Tx
    } = Db = ensure_current(Db0),

    Rows = doc_to_fdb(Db, Doc),
    lists:foreach(fun({Key, Value}) ->
        ok = erlfdb:set(Tx, Key, aegis:encrypt(Db, Key, Value))
    end, Rows).


clear_doc_body(_Db, _DocId, not_found) ->
    % No old body to clear
    ok;

clear_doc_body(#{} = Db, DocId, #{} = RevInfo) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = Db,

    #{
        rev_id := {RevPos, Rev}
    } = RevInfo,

    BaseKey = {?DB_DOCS, DocId, RevPos, Rev},
    {StartKey, EndKey} = erlfdb_tuple:range(BaseKey, DbPrefix),
    ok = erlfdb:clear_range(Tx, StartKey, EndKey).


cleanup_attachments(Db, DocId, NewDoc, ToRemove) ->
    #{
        tx := Tx,
        db_prefix := DbPrefix
    } = Db,

    RemoveRevs = lists:map(fun(#{rev_id := RevId}) -> RevId end, ToRemove),

    % Gather all known document revisions
    {ok, DiskDocs} = fabric2_db:open_doc_revs(Db, DocId, all, []),
    AllDocs = [{ok, NewDoc} | DiskDocs],

    % Get referenced attachment ids
    ActiveIdSet = lists:foldl(fun({ok, Doc}, Acc) ->
        #doc{
            revs = {Pos, [Rev | _]}
        } = Doc,
        case lists:member({Pos, Rev}, RemoveRevs) of
            true ->
                Acc;
            false ->
                lists:foldl(fun(Att, InnerAcc) ->
                    {loc, _Db, _DocId, AttId} = couch_att:fetch(data, Att),
                    sets:add_element(AttId, InnerAcc)
                end, Acc, Doc#doc.atts)
        end
    end, sets:new(), AllDocs),

    AttPrefix = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId}, DbPrefix),
    Options = [{streaming_mode, want_all}],
    Future = erlfdb:get_range_startswith(Tx, AttPrefix, Options),

    ExistingIdSet = lists:foldl(fun({K, _}, Acc) ->
        {?DB_ATT_NAMES, DocId, AttId} = erlfdb_tuple:unpack(K, DbPrefix),
        sets:add_element(AttId, Acc)
    end, sets:new(), erlfdb:wait(Future)),

    AttsToRemove = sets:subtract(ExistingIdSet, ActiveIdSet),

    lists:foreach(fun(AttId) ->
        IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix),
        erlfdb:clear(Tx, IdKey),

        ChunkKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix),
        erlfdb:clear_range_startswith(Tx, ChunkKey)
    end, sets:to_list(AttsToRemove)).


revinfo_to_fdb(Tx, DbPrefix, DocId, #{winner := true} = RevId) ->
    #{
        deleted := Deleted,
        rev_id := {RevPos, Rev},
        rev_path := RevPath,
        branch_count := BranchCount,
        att_hash := AttHash,
        rev_size := RevSize
    } = RevId,
    VS = new_versionstamp(Tx),
    Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev},
    Val = {
        ?CURR_REV_FORMAT,
        VS,
        BranchCount,
        list_to_tuple(RevPath),
        AttHash,
        RevSize
    },
    KBin = erlfdb_tuple:pack(Key, DbPrefix),
    VBin = erlfdb_tuple:pack_vs(Val),
    {KBin, VBin, VS};

revinfo_to_fdb(_Tx, DbPrefix, DocId, #{} = RevId) ->
    #{
        deleted := Deleted,
        rev_id := {RevPos, Rev},
        rev_path := RevPath,
        att_hash := AttHash,
        rev_size := RevSize
    } = RevId,
    Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev},
    Val = {?CURR_REV_FORMAT, list_to_tuple(RevPath), AttHash, RevSize},
    KBin = erlfdb_tuple:pack(Key, DbPrefix),
    VBin = erlfdb_tuple:pack(Val),
    {KBin, VBin, undefined}.


fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _, _, _, _} = Val) ->
    {?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key,
    {_RevFormat, Sequence, BranchCount, RevPath, AttHash, RevSize} = Val,
    #{
        winner => true,
        exists => true,
        deleted => not NotDeleted,
        rev_id => {RevPos, Rev},
        rev_path => tuple_to_list(RevPath),
        sequence => Sequence,
        branch_count => BranchCount,
        att_hash => AttHash,
        rev_size => RevSize
    };

fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _, _} = Val)  ->
    {?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key,
    {_RevFormat, RevPath, AttHash, RevSize} = Val,
    #{
        winner => false,
        exists => true,
        deleted => not NotDeleted,
        rev_id => {RevPos, Rev},
        rev_path => tuple_to_list(RevPath),
        sequence => undefined,
        branch_count => undefined,
        att_hash => AttHash,
        rev_size => RevSize
    };

fdb_to_revinfo(Key, {0, Seq, BCount, RPath}) ->
    Val = {1, Seq, BCount, RPath, <<>>},
    fdb_to_revinfo(Key, Val);

fdb_to_revinfo(Key, {0, RPath}) ->
    Val = {1, RPath, <<>>},
    fdb_to_revinfo(Key, Val);

fdb_to_revinfo(Key, {1, Seq, BCount, RPath, AttHash}) ->
    % Don't forget to change ?CURR_REV_FORMAT to 2 here when it increments
    Val = {?CURR_REV_FORMAT, Seq, BCount, RPath, AttHash, 0},
    fdb_to_revinfo(Key, Val);

fdb_to_revinfo(Key, {1, RPath, AttHash}) ->
    % Don't forget to change ?CURR_REV_FORMAT to 2 here when it increments
    Val = {?CURR_REV_FORMAT, RPath, AttHash, 0},
    fdb_to_revinfo(Key, Val).


doc_to_fdb(Db, #doc{} = Doc) ->
    #{
        db_prefix := DbPrefix
    } = Db,

    #doc{
        id = Id,
        revs = {Start, [Rev | _]},
        body = Body,
        atts = Atts,
        deleted = Deleted
    } = Doc,

    DiskAtts = lists:map(fun couch_att:to_disk_term/1, Atts),

    Opts = [{minor_version, 1}, {compressed, 6}],
    Value = term_to_binary({Body, DiskAtts, Deleted}, Opts),
    Chunks = chunkify_binary(Value),

    {Rows, _} = lists:mapfoldl(fun(Chunk, ChunkId) ->
        Key = erlfdb_tuple:pack({?DB_DOCS, Id, Start, Rev, ChunkId}, DbPrefix),
        {{Key, Chunk}, ChunkId + 1}
    end, 0, Chunks),

    Rows.


fdb_to_doc(_Db, _DocId, _Pos, _Path, []) ->
    {not_found, missing};

fdb_to_doc(Db, DocId, Pos, Path, BinRows) when is_list(BinRows) ->
    Bin = iolist_to_binary(BinRows),
    {Body, DiskAtts, Deleted} = binary_to_term(Bin, [safe]),
    Atts = lists:map(fun(Att) ->
        couch_att:from_disk_term(Db, DocId, Att)
    end, DiskAtts),
    Doc0 = #doc{
        id = DocId,
        revs = {Pos, Path},
        body = Body,
        atts = Atts,
        deleted = Deleted
    },

    case Db of
        #{after_doc_read := undefined} -> Doc0;
        #{after_doc_read := ADR} -> ADR(Doc0, Db)
    end.


local_doc_to_fdb(Db, #doc{} = Doc) ->
    #{
        db_prefix := DbPrefix
    } = Db,

    #doc{
        id = Id,
        revs = {0, [Rev]},
        body = Body
    } = Doc,

    Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id}, DbPrefix),

    StoreRev = case Rev of
        _ when is_integer(Rev) -> integer_to_binary(Rev);
        _ when is_binary(Rev) -> Rev
    end,

    BVal = term_to_binary(Body, [{minor_version, 1}, {compressed, 6}]),
    {Rows, _} = lists:mapfoldl(fun(Chunk, ChunkId) ->
        K = erlfdb_tuple:pack({?DB_LOCAL_DOC_BODIES, Id, ChunkId}, DbPrefix),
        {{K, Chunk}, ChunkId + 1}
    end, 0, chunkify_binary(BVal)),

    NewSize = fabric2_util:ldoc_size(Doc),
    RawValue = erlfdb_tuple:pack({?CURR_LDOC_FORMAT, StoreRev, NewSize}),

    % Prefix our tuple encoding to make upgrades easier
    Value = <<255, RawValue/binary>>,

    {Key, Value, NewSize, Rows}.


fdb_to_local_doc(_Db, _DocId, not_found, []) ->
    {not_found, missing};

fdb_to_local_doc(_Db, DocId, <<131, _/binary>> = Val, []) ->
    % This is an upgrade clause for the old encoding. We allow reading the old
    % value and will perform an upgrade of the storage format on an update.
    {Rev, Body} = binary_to_term(Val, [safe]),
    #doc{
        id = DocId,
        revs = {0, [Rev]},
        deleted = false,
        body = Body
    };

fdb_to_local_doc(_Db, DocId, <<255, RevBin/binary>>, Rows) when is_list(Rows) ->
    Rev = case erlfdb_tuple:unpack(RevBin) of
        {?CURR_LDOC_FORMAT, Rev0, _Size} -> Rev0
    end,

    BodyBin = iolist_to_binary(Rows),
    Body = binary_to_term(BodyBin, [safe]),

    #doc{
        id = DocId,
        revs = {0, [Rev]},
        deleted = false,
        body = Body
    };

fdb_to_local_doc(Db, DocId, RawRev, Rows) ->
    BaseRev = erlfdb_tuple:pack({?CURR_LDOC_FORMAT, RawRev, 0}),
    Rev = <<255, BaseRev/binary>>,
    fdb_to_local_doc(Db, DocId, Rev, Rows).


sum_add_rev_sizes(RevInfos) ->
    lists:foldl(fun(RI, Acc) ->
        #{
            exists := Exists,
            rev_size := Size
        } = RI,
        case Exists of
            true -> Acc;
            false -> Size + Acc
        end
    end, 0, RevInfos).


sum_rem_rev_sizes(RevInfos) ->
    lists:foldl(fun(RI, Acc) ->
        #{
            exists := true,
            rev_size := Size
        } = RI,
        Size + Acc
    end, 0, RevInfos).


get_fold_acc(Db, RangePrefix, UserCallback, UserAcc, Options)
        when is_map(Db) orelse Db =:= undefined ->

    Reverse = case fabric2_util:get_value(dir, Options) of
        rev -> true;
        _ -> false
    end,

    StartKey0 = fabric2_util:get_value(start_key, Options),
    EndKeyGt = fabric2_util:get_value(end_key_gt, Options),
    EndKey0 = fabric2_util:get_value(end_key, Options, EndKeyGt),
    InclusiveEnd = EndKeyGt == undefined,
    WrapKeys = fabric2_util:get_value(wrap_keys, Options) /= false,

    % CouchDB swaps the key meanings based on the direction
    % of the fold. FoundationDB does not so we have to
    % swap back here.
    {StartKey1, EndKey1} = case Reverse of
        false -> {StartKey0, EndKey0};
        true -> {EndKey0, StartKey0}
    end,

    % Set the maximum bounds for the start and endkey
    StartKey2 = case StartKey1 of
        undefined ->
            <<RangePrefix/binary, 16#00>>;
        SK2 when not WrapKeys ->
            erlfdb_tuple:pack(SK2, RangePrefix);
        SK2 ->
            erlfdb_tuple:pack({SK2}, RangePrefix)
    end,

    EndKey2 = case EndKey1 of
        undefined ->
            <<RangePrefix/binary, 16#FF>>;
        EK2 when Reverse andalso not WrapKeys ->
            PackedEK = erlfdb_tuple:pack(EK2, RangePrefix),
            <<PackedEK/binary, 16#FF>>;
        EK2 when Reverse ->
            PackedEK = erlfdb_tuple:pack({EK2}, RangePrefix),
            <<PackedEK/binary, 16#FF>>;
        EK2 when not WrapKeys ->
            erlfdb_tuple:pack(EK2, RangePrefix);
        EK2 ->
            erlfdb_tuple:pack({EK2}, RangePrefix)
    end,

    % FoundationDB ranges are applied as SK <= key < EK
    % By default, CouchDB is SK <= key <= EK with the
    % optional inclusive_end=false option changing that
    % to SK <= key < EK. Also, remember that CouchDB
    % swaps the meaning of SK and EK based on direction.
    %
    % Thus we have this wonderful bit of logic to account
    % for all of those combinations.

    StartKey3 = case {Reverse, InclusiveEnd} of
        {true, false} ->
            erlfdb_key:first_greater_than(StartKey2);
        _ ->
            StartKey2
    end,

    EndKey3 = case {Reverse, InclusiveEnd} of
        {false, true} when EndKey0 /= undefined ->
            erlfdb_key:first_greater_than(EndKey2);
        {true, _} ->
            erlfdb_key:first_greater_than(EndKey2);
        _ ->
            EndKey2
    end,

    Skip = case fabric2_util:get_value(skip, Options) of
        S when is_integer(S), S >= 0 -> S;
        _ -> 0
    end,

    Limit = case fabric2_util:get_value(limit, Options) of
        L when is_integer(L), L >= 0 -> L + Skip;
        undefined -> 0
    end,

    TargetBytes = case fabric2_util:get_value(target_bytes, Options) of
        T when is_integer(T), T >= 0 -> [{target_bytes, T}];
        undefined -> []
    end,

    StreamingMode = case fabric2_util:get_value(streaming_mode, Options) of
        undefined -> [];
        Name when is_atom(Name) -> [{streaming_mode, Name}]
    end,

    Snapshot = case fabric2_util:get_value(snapshot, Options) of
        undefined -> [];
        B when is_boolean(B) -> [{snapshot, B}]
    end,

    BaseOpts = [{reverse, Reverse}]
            ++ TargetBytes
            ++ StreamingMode
            ++ Snapshot,

    RestartTx = fabric2_util:get_value(restart_tx, Options, false),

    #fold_acc{
        db = Db,
        start_key = StartKey3,
        end_key = EndKey3,
        skip = Skip,
        limit = Limit,
        retries = 0,
        base_opts = BaseOpts,
        restart_tx = RestartTx,
        user_fun = UserCallback,
        user_acc = UserAcc
    }.


fold_range_cb({K, V}, #fold_acc{} = Acc) ->
    #fold_acc{
        skip = Skip,
        limit = Limit,
        user_fun = UserFun,
        user_acc = UserAcc,
        base_opts = Opts
    } = Acc,
    Acc1 = case Skip =:= 0 of
        true ->
            UserAcc1 = UserFun({K, V}, UserAcc),
            Acc#fold_acc{limit = max(0, Limit - 1), user_acc = UserAcc1};
        false ->
            Acc#fold_acc{skip = Skip - 1, limit = Limit - 1}
    end,
    Acc2 = case fabric2_util:get_value(reverse, Opts, false) of
        true -> Acc1#fold_acc{end_key = erlfdb_key:last_less_or_equal(K)};
        false -> Acc1#fold_acc{start_key = erlfdb_key:first_greater_than(K)}
    end,
    put(?PDICT_FOLD_ACC_STATE, Acc2),
    Acc2.


restart_fold(Tx, #fold_acc{} = Acc) ->
    erase(?PDICT_CHECKED_MD_IS_CURRENT),

    ok = erlfdb:reset(Tx),

    % During buggify test runs MaxRetries would be -1
    MaxRetries = fabric2_server:get_retry_limit(),
    case {erase(?PDICT_FOLD_ACC_STATE), Acc#fold_acc.retries} of
        {#fold_acc{db = Db} = Acc1, _} ->
            Acc1#fold_acc{db = check_db_instance(Db), retries = 0};
        {undefined, Retries} when Retries < MaxRetries orelse
                MaxRetries =:= -1 ->
            Db = check_db_instance(Acc#fold_acc.db),
            Acc#fold_acc{db = Db, retries = Retries + 1};
        {undefined, _} ->
            error(fold_range_not_progressing)
    end.


get_db_handle() ->
    case get(?PDICT_DB_KEY) of
        undefined ->
            {ok, Db} = application:get_env(fabric, db),
            put(?PDICT_DB_KEY, Db),
            Db;
        Db ->
            Db
    end.


require_transaction(#{tx := {erlfdb_snapshot, _}} = _Db) ->
    ok;
require_transaction(#{tx := {erlfdb_transaction, _}} = _Db) ->
    ok;
require_transaction(#{} = _Db) ->
    erlang:error(transaction_required).


ensure_current(Db) ->
    ensure_current(Db, true).


ensure_current(#{} = Db0, CheckDbVersion) ->
    require_transaction(Db0),
    Db3 = case check_metadata_version(Db0) of
        {current, Db1} ->
            Db1;
        {stale, Db1} ->
            case check_db_version(Db1, CheckDbVersion) of
                current ->
                    % If db version is current, update cache with the latest
                    % metadata so other requests can immediately see the
                    % refreshed db handle.
                    Now = erlang:monotonic_time(millisecond),
                    Db2 = Db1#{check_current_ts := Now},
                    fabric2_server:maybe_update(Db2),
                    Db2;
                stale ->
                    fabric2_server:maybe_remove(Db1),
                    throw({?MODULE, reopen})
            end
    end,
    case maps:get(security_fun, Db3) of
        SecurityFun when is_function(SecurityFun, 2) ->
            #{security_doc := SecDoc} = Db3,
            ok = SecurityFun(Db3, SecDoc),
            Db3#{security_fun := undefined};
        undefined ->
            Db3
    end.


check_db_instance(undefined) ->
    undefined;

check_db_instance(#{} = Db) ->
    require_transaction(Db),
    case check_metadata_version(Db) of
        {current, Db1} ->
            Db1;
        {stale, Db1} ->
            #{
                tx := Tx,
                uuid := UUID,
                db_prefix := DbPrefix
            } = Db1,
            UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix),
            case erlfdb:wait(erlfdb:get(Tx, UUIDKey)) of
                UUID -> Db1;
                _ -> error(database_does_not_exist)
            end
    end.


is_transaction_applied(Tx) ->
    is_commit_unknown_result()
        andalso has_transaction_id()
        andalso transaction_id_exists(Tx).


get_previous_transaction_result() ->
    get(?PDICT_TX_RES_KEY).


execute_transaction(Tx, Fun, LayerPrefix) ->
    put(?PDICT_CHECKED_MD_IS_CURRENT, false),
    put(?PDICT_CHECKED_DB_IS_CURRENT, false),
    Result = Fun(Tx),
    case erlfdb:is_read_only(Tx) of
        true ->
            ok;
        false ->
            erlfdb:set(Tx, get_transaction_id(Tx, LayerPrefix), <<>>),
            put(?PDICT_TX_RES_KEY, Result)
    end,
    Result.


clear_transaction() ->
    fabric2_txids:remove(get(?PDICT_TX_ID_KEY)),
    erase(?PDICT_CHECKED_DB_IS_CURRENT),
    erase(?PDICT_CHECKED_MD_IS_CURRENT),
    erase(?PDICT_TX_ID_KEY),
    erase(?PDICT_TX_RES_KEY).


is_commit_unknown_result() ->
    erlfdb:get_last_error() == ?ERLFDB_COMMIT_UNKNOWN_RESULT.


has_transaction_id() ->
    is_binary(get(?PDICT_TX_ID_KEY)).


transaction_id_exists(Tx) ->
    erlfdb:wait(erlfdb:get(Tx, get(?PDICT_TX_ID_KEY))) == <<>>.


get_transaction_id(Tx, LayerPrefix) ->
    case get(?PDICT_TX_ID_KEY) of
        undefined ->
            TxId = fabric2_txids:create(Tx, LayerPrefix),
            put(?PDICT_TX_ID_KEY, TxId),
            TxId;
        TxId when is_binary(TxId) ->
            TxId
    end.


with_span(Operation, ExtraTags, Fun) ->
    case ctrace:has_span() of
        true ->
            Tags = maps:merge(#{
                'span.kind' => <<"client">>,
                component => <<"couchdb.fabric">>,
                'db.instance' => fabric2_server:fdb_cluster(),
                'db.namespace' => fabric2_server:fdb_directory(),
                'db.type' => <<"fdb">>,
                nonce => get(nonce),
                pid => self()
            }, ExtraTags),
            ctrace:with_span(Operation, Tags, Fun);
        false ->
            Fun()
    end.


get_info_wait_int(#info_future{} = InfoFuture) ->
    #info_future{
        db_prefix = DbPrefix,
        changes_future = ChangesFuture,
        uuid_future = UUIDFuture,
        meta_future = MetaFuture
    } = InfoFuture,

    RawSeq = case erlfdb:wait(ChangesFuture) of
        [] ->
            vs_to_seq(fabric2_util:seq_zero_vs());
        [{SeqKey, _}] ->
            {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(SeqKey, DbPrefix),
            vs_to_seq(SeqVS)
    end,
    CProp = {update_seq, RawSeq},

    UUIDProp = {uuid, erlfdb:wait(UUIDFuture)},

    MProps = lists:foldl(fun({K, V}, Acc) ->
        case erlfdb_tuple:unpack(K, DbPrefix) of
            {?DB_STATS, <<"doc_count">>} ->
                [{doc_count, ?bin2uint(V)} | Acc];
            {?DB_STATS, <<"doc_del_count">>} ->
                [{doc_del_count, ?bin2uint(V)} | Acc];
            {?DB_STATS, <<"sizes">>, Name} ->
                Val = ?bin2uint(V),
                {_, {Sizes}} = lists:keyfind(sizes, 1, Acc),
                NewSizes = lists:keystore(Name, 1, Sizes, {Name, Val}),
                lists:keystore(sizes, 1, Acc, {sizes, {NewSizes}});
            {?DB_STATS, _} ->
                Acc
        end
    end, [{sizes, {[]}}], erlfdb:wait(MetaFuture)),

    [CProp, UUIDProp | MProps].


binary_chunk_size() ->
    config:get_integer(
        "fabric", "binary_chunk_size", ?DEFAULT_BINARY_CHUNK_SIZE).


-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

fdb_to_revinfo_version_compatibility_test() ->
    DocId = <<"doc_id">>,
    FirstRevFormat = 0,
    RevPos = 1,
    Rev = <<60,84,174,140,210,120,192,18,100,148,9,181,129,165,248,92>>,
    RevPath = {},
    NotDeleted = true,
    Sequence = {versionstamp, 10873034897377, 0, 0},
    BranchCount = 1,

    KeyWinner = {?DB_REVS, DocId, NotDeleted, RevPos, Rev},
    ValWinner = {FirstRevFormat, Sequence, BranchCount, RevPath},
    ExpectedWinner = expected(
        true, BranchCount, NotDeleted, RevPos, Rev, RevPath, Sequence),
    ?assertEqual(ExpectedWinner, fdb_to_revinfo(KeyWinner, ValWinner)),

    KeyLoser = {?DB_REVS, DocId, NotDeleted, RevPos, Rev},
    ValLoser = {FirstRevFormat, RevPath},
    ExpectedLoser = expected(
        false, undefined, NotDeleted, RevPos, Rev, RevPath, undefined),
    ?assertEqual(ExpectedLoser, fdb_to_revinfo(KeyLoser, ValLoser)),
    ok.


expected(Winner, BranchCount, NotDeleted, RevPos, Rev, RevPath, Sequence) ->
    #{
        att_hash => <<>>,
        branch_count => BranchCount,
        deleted => not NotDeleted,
        exists => true,
        rev_id => {RevPos, Rev},
        rev_path => tuple_to_list(RevPath),
        rev_size => 0,
        sequence => Sequence,
        winner => Winner
    }.


-endif.
