blob: 670ce8b492e59aea0f64ddad04afd2e5d24ebe68 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(fabric2_fdb).
-export([
transactional/1,
transactional/3,
transactional/2,
create/2,
open/2,
reopen/1,
delete/1,
exists/1,
list_dbs/4,
get_info/1,
get_config/1,
set_config/3,
get_stat/2,
incr_stat/3,
get_all_revs/2,
get_winning_revs/3,
get_winning_revs_future/3,
get_winning_revs_wait/2,
get_non_deleted_rev/3,
get_doc_body/3,
get_doc_body_future/3,
get_doc_body_wait/4,
get_local_doc/2,
write_doc/6,
write_local_doc/2,
read_attachment/3,
write_attachment/3,
get_last_change/1,
fold_range/5,
vs_to_seq/1,
seq_to_vs/1,
next_vs/1,
debug_cluster/0,
debug_cluster/2
]).
-include_lib("couch/include/couch_db.hrl").
-include("fabric2.hrl").
transactional(Fun) ->
do_transaction(Fun, undefined).
transactional(DbName, Options, Fun) when is_binary(DbName) ->
transactional(fun(Tx) ->
Fun(init_db(Tx, DbName, Options))
end).
transactional(#{tx := undefined} = Db, Fun) ->
#{layer_prefix := LayerPrefix} = Db,
do_transaction(fun(Tx) ->
Fun(Db#{tx => Tx})
end, LayerPrefix);
transactional(#{tx := {erlfdb_transaction, _}} = Db, Fun) ->
Fun(Db).
do_transaction(Fun, LayerPrefix) when is_function(Fun, 1) ->
Db = get_db_handle(),
try
erlfdb:transactional(Db, fun(Tx) ->
case get(erlfdb_trace) of
Name when is_binary(Name) ->
erlfdb:set_option(Tx, transaction_logging_enable, Name);
_ ->
ok
end,
case is_transaction_applied(Tx) of
true ->
get_previous_transaction_result();
false ->
execute_transaction(Tx, Fun, LayerPrefix)
end
end)
after
clear_transaction()
end.
create(#{} = Db0, Options) ->
#{
name := DbName,
tx := Tx,
layer_prefix := LayerPrefix
} = Db = ensure_current(Db0, false),
% Eventually DbPrefix will be HCA allocated. For now
% we're just using the DbName so that debugging is easier.
DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
DbPrefix = erlfdb_tuple:pack({?DBS, DbName}, LayerPrefix),
erlfdb:set(Tx, DbKey, DbPrefix),
% This key is responsible for telling us when something in
% the database cache (i.e., fabric2_server's ets table) has
% changed and requires re-loading. This currently includes
% revs_limit and validate_doc_update functions. There's
% no order to versioning here. Its just a value that changes
% that is used in the ensure_current check.
DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
DbVersion = fabric2_util:uuid(),
erlfdb:set(Tx, DbVersionKey, DbVersion),
UUID = fabric2_util:uuid(),
Defaults = [
{?DB_CONFIG, <<"uuid">>, UUID},
{?DB_CONFIG, <<"revs_limit">>, ?uint2bin(1000)},
{?DB_CONFIG, <<"security_doc">>, <<"{}">>},
{?DB_STATS, <<"doc_count">>, ?uint2bin(0)},
{?DB_STATS, <<"doc_del_count">>, ?uint2bin(0)},
{?DB_STATS, <<"doc_design_count">>, ?uint2bin(0)},
{?DB_STATS, <<"doc_local_count">>, ?uint2bin(0)},
{?DB_STATS, <<"size">>, ?uint2bin(2)}
],
lists:foreach(fun({P, K, V}) ->
Key = erlfdb_tuple:pack({P, K}, DbPrefix),
erlfdb:set(Tx, Key, V)
end, Defaults),
UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}),
Db#{
uuid => UUID,
db_prefix => DbPrefix,
db_version => DbVersion,
revs_limit => 1000,
security_doc => {[]},
user_ctx => UserCtx,
validate_doc_update_funs => [],
before_doc_update => undefined,
after_doc_read => undefined,
% All other db things as we add features,
db_options => Options
}.
open(#{} = Db0, Options) ->
#{
name := DbName,
tx := Tx,
layer_prefix := LayerPrefix
} = Db1 = ensure_current(Db0, false),
DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
DbPrefix = case erlfdb:wait(erlfdb:get(Tx, DbKey)) of
Bin when is_binary(Bin) -> Bin;
not_found -> erlang:error(database_does_not_exist)
end,
DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
DbVersion = erlfdb:wait(erlfdb:get(Tx, DbVersionKey)),
UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}),
Db2 = Db1#{
db_prefix => DbPrefix,
db_version => DbVersion,
revs_limit => 1000,
security_doc => {[]},
user_ctx => UserCtx,
% Place holders until we implement these
% bits.
validate_doc_update_funs => [],
before_doc_update => undefined,
after_doc_read => undefined,
db_options => Options
},
Db3 = lists:foldl(fun({Key, Val}, DbAcc) ->
case Key of
<<"uuid">> ->
DbAcc#{uuid => Val};
<<"revs_limit">> ->
DbAcc#{revs_limit => ?bin2uint(Val)};
<<"security_doc">> ->
DbAcc#{security_doc => ?JSON_DECODE(Val)}
end
end, Db2, get_config(Db2)),
load_validate_doc_funs(Db3).
reopen(#{} = OldDb) ->
require_transaction(OldDb),
#{
tx := Tx,
name := DbName,
db_options := Options
} = OldDb,
open(init_db(Tx, DbName, Options), Options).
delete(#{} = Db) ->
#{
name := DbName,
tx := Tx,
layer_prefix := LayerPrefix,
db_prefix := DbPrefix
} = ensure_current(Db),
DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
erlfdb:clear(Tx, DbKey),
erlfdb:clear_range_startswith(Tx, DbPrefix),
bump_metadata_version(Tx),
ok.
exists(#{name := DbName} = Db) when is_binary(DbName) ->
#{
tx := Tx,
layer_prefix := LayerPrefix
} = ensure_current(Db, false),
DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
case erlfdb:wait(erlfdb:get(Tx, DbKey)) of
Bin when is_binary(Bin) -> true;
not_found -> false
end.
list_dbs(Tx, Callback, AccIn, Options) ->
Root = erlfdb_directory:root(),
CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
LayerPrefix = erlfdb_directory:get_name(CouchDB),
Prefix = erlfdb_tuple:pack({?ALL_DBS}, LayerPrefix),
fold_range({tx, Tx}, Prefix, fun({K, _V}, Acc) ->
{DbName} = erlfdb_tuple:unpack(K, Prefix),
Callback(DbName, Acc)
end, AccIn, Options).
get_info(#{} = Db) ->
#{
tx := Tx,
db_prefix := DbPrefix
} = ensure_current(Db),
{CStart, CEnd} = erlfdb_tuple:range({?DB_CHANGES}, DbPrefix),
ChangesFuture = erlfdb:get_range(Tx, CStart, CEnd, [
{streaming_mode, exact},
{limit, 1},
{reverse, true}
]),
StatsPrefix = erlfdb_tuple:pack({?DB_STATS}, DbPrefix),
MetaFuture = erlfdb:get_range_startswith(Tx, StatsPrefix),
RawSeq = case erlfdb:wait(ChangesFuture) of
[] ->
vs_to_seq(fabric2_util:seq_zero_vs());
[{SeqKey, _}] ->
{?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(SeqKey, DbPrefix),
vs_to_seq(SeqVS)
end,
CProp = {update_seq, RawSeq},
MProps = lists:flatmap(fun({K, V}) ->
case erlfdb_tuple:unpack(K, DbPrefix) of
{?DB_STATS, <<"doc_count">>} ->
[{doc_count, ?bin2uint(V)}];
{?DB_STATS, <<"doc_del_count">>} ->
[{doc_del_count, ?bin2uint(V)}];
{?DB_STATS, <<"size">>} ->
Val = ?bin2uint(V),
[
{other, {[{data_size, Val}]}},
{sizes, {[
{active, 0},
{external, Val},
{file, 0}
]}}
];
{?DB_STATS, _} ->
[]
end
end, erlfdb:wait(MetaFuture)),
[CProp | MProps].
get_config(#{} = Db) ->
#{
tx := Tx,
db_prefix := DbPrefix
} = Db = ensure_current(Db),
{Start, End} = erlfdb_tuple:range({?DB_CONFIG}, DbPrefix),
Future = erlfdb:get_range(Tx, Start, End),
lists:map(fun({K, V}) ->
{?DB_CONFIG, Key} = erlfdb_tuple:unpack(K, DbPrefix),
{Key, V}
end, erlfdb:wait(Future)).
set_config(#{} = Db, ConfigKey, ConfigVal) ->
#{
tx := Tx,
db_prefix := DbPrefix
} = ensure_current(Db),
Key = erlfdb_tuple:pack({?DB_CONFIG, ConfigKey}, DbPrefix),
erlfdb:set(Tx, Key, ConfigVal),
bump_db_version(Db).
get_stat(#{} = Db, StatKey) ->
#{
tx := Tx,
db_prefix := DbPrefix
} = ensure_current(Db),
Key = erlfdb_tuple:pack({?DB_STATS, StatKey}, DbPrefix),
% Might need to figure out some sort of type
% system here. Uints are because stats are all
% atomic op adds for the moment.
?bin2uint(erlfdb:wait(erlfdb:get(Tx, Key))).
incr_stat(_Db, _StatKey, 0) ->
ok;
incr_stat(#{} = Db, StatKey, Increment) when is_integer(Increment) ->
#{
tx := Tx,
db_prefix := DbPrefix
} = ensure_current(Db),
Key = erlfdb_tuple:pack({?DB_STATS, StatKey}, DbPrefix),
erlfdb:add(Tx, Key, Increment).
get_all_revs(#{} = Db, DocId) ->
#{
tx := Tx,
db_prefix := DbPrefix
} = ensure_current(Db),
Prefix = erlfdb_tuple:pack({?DB_REVS, DocId}, DbPrefix),
Options = [{streaming_mode, want_all}],
Future = erlfdb:get_range_startswith(Tx, Prefix, Options),
lists:map(fun({K, V}) ->
Key = erlfdb_tuple:unpack(K, DbPrefix),
Val = erlfdb_tuple:unpack(V),
fdb_to_revinfo(Key, Val)
end, erlfdb:wait(Future)).
get_winning_revs(Db, DocId, NumRevs) ->
Future = get_winning_revs_future(Db, DocId, NumRevs),
get_winning_revs_wait(Db, Future).
get_winning_revs_future(#{} = Db, DocId, NumRevs) ->
#{
tx := Tx,
db_prefix := DbPrefix
} = ensure_current(Db),
{StartKey, EndKey} = erlfdb_tuple:range({?DB_REVS, DocId}, DbPrefix),
Options = [{reverse, true}, {limit, NumRevs}],
erlfdb:get_range_raw(Tx, StartKey, EndKey, Options).
get_winning_revs_wait(#{} = Db, Future) ->
#{
db_prefix := DbPrefix
} = ensure_current(Db),
{Rows, _, _} = erlfdb:wait(Future),
lists:map(fun({K, V}) ->
Key = erlfdb_tuple:unpack(K, DbPrefix),
Val = erlfdb_tuple:unpack(V),
fdb_to_revinfo(Key, Val)
end, Rows).
get_non_deleted_rev(#{} = Db, DocId, RevId) ->
#{
tx := Tx,
db_prefix := DbPrefix
} = ensure_current(Db),
{RevPos, Rev} = RevId,
BaseKey = {?DB_REVS, DocId, true, RevPos, Rev},
Key = erlfdb_tuple:pack(BaseKey, DbPrefix),
case erlfdb:wait(erlfdb:get(Tx, Key)) of
not_found ->
not_found;
Val ->
fdb_to_revinfo(BaseKey, erlfdb_tuple:unpack(Val))
end.
get_doc_body(Db, DocId, RevInfo) ->
Future = get_doc_body_future(Db, DocId, RevInfo),
get_doc_body_wait(Db, DocId, RevInfo, Future).
get_doc_body_future(#{} = Db, DocId, RevInfo) ->
#{
tx := Tx,
db_prefix := DbPrefix
} = ensure_current(Db),
#{
rev_id := {RevPos, Rev}
} = RevInfo,
Key = erlfdb_tuple:pack({?DB_DOCS, DocId, RevPos, Rev}, DbPrefix),
erlfdb:get(Tx, Key).
get_doc_body_wait(#{} = Db0, DocId, RevInfo, Future) ->
Db = ensure_current(Db0),
#{
rev_id := {RevPos, Rev},
rev_path := RevPath
} = RevInfo,
Val = erlfdb:wait(Future),
fdb_to_doc(Db, DocId, RevPos, [Rev | RevPath], Val).
get_local_doc(#{} = Db0, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId) ->
#{
tx := Tx,
db_prefix := DbPrefix
} = Db = ensure_current(Db0),
Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, DocId}, DbPrefix),
Val = erlfdb:wait(erlfdb:get(Tx, Key)),
fdb_to_local_doc(Db, DocId, Val).
write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) ->
#{
tx := Tx,
db_prefix := DbPrefix
} = Db = ensure_current(Db0),
#doc{
id = DocId,
deleted = Deleted
} = Doc,
% Revision tree
NewWinner = NewWinner0#{winner := true},
NewRevId = maps:get(rev_id, NewWinner),
{WKey, WVal, WinnerVS} = revinfo_to_fdb(Tx, DbPrefix, DocId, NewWinner),
ok = erlfdb:set_versionstamped_value(Tx, WKey, WVal),
lists:foreach(fun(RI0) ->
RI = RI0#{winner := false},
{K, V, undefined} = revinfo_to_fdb(Tx, DbPrefix, DocId, RI),
ok = erlfdb:set(Tx, K, V)
end, ToUpdate),
lists:foreach(fun(RI0) ->
RI = RI0#{winner := false},
{K, _, undefined} = revinfo_to_fdb(Tx, DbPrefix, DocId, RI),
ok = erlfdb:clear(Tx, K)
end, ToRemove),
% _all_docs
UpdateStatus = case {OldWinner, NewWinner} of
{not_found, #{deleted := false}} ->
created;
{not_found, #{deleted := true}} ->
deleted;
{#{deleted := true}, #{deleted := false}} ->
recreated;
{#{deleted := false}, #{deleted := false}} ->
updated;
{#{deleted := false}, #{deleted := true}} ->
deleted;
{#{deleted := true}, #{deleted := true}} ->
deleted
end,
case UpdateStatus of
deleted ->
ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
ok = erlfdb:clear(Tx, ADKey);
_ ->
ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
ADVal = erlfdb_tuple:pack(NewRevId),
ok = erlfdb:set(Tx, ADKey, ADVal)
end,
% _changes
if OldWinner == not_found -> ok; true ->
OldSeq = maps:get(sequence, OldWinner),
OldSeqKey = erlfdb_tuple:pack({?DB_CHANGES, OldSeq}, DbPrefix),
erlfdb:clear(Tx, OldSeqKey)
end,
NewSeqKey = erlfdb_tuple:pack_vs({?DB_CHANGES, WinnerVS}, DbPrefix),
NewSeqVal = erlfdb_tuple:pack({DocId, Deleted, NewRevId}),
erlfdb:set_versionstamped_key(Tx, NewSeqKey, NewSeqVal),
% And all the rest...
ok = write_doc_body(Db, Doc),
IsDDoc = case Doc#doc.id of
<<?DESIGN_DOC_PREFIX, _/binary>> -> true;
_ -> false
end,
if not IsDDoc -> ok; true ->
bump_db_version(Db)
end,
case UpdateStatus of
created ->
if not IsDDoc -> ok; true ->
incr_stat(Db, <<"doc_design_count">>, 1)
end,
incr_stat(Db, <<"doc_count">>, 1);
recreated ->
if not IsDDoc -> ok; true ->
incr_stat(Db, <<"doc_design_count">>, 1)
end,
incr_stat(Db, <<"doc_count">>, 1),
incr_stat(Db, <<"doc_del_count">>, -1);
deleted ->
if not IsDDoc -> ok; true ->
incr_stat(Db, <<"doc_design_count">>, -1)
end,
incr_stat(Db, <<"doc_count">>, -1),
incr_stat(Db, <<"doc_del_count">>, 1);
updated ->
ok
end,
ok.
write_local_doc(#{} = Db0, Doc) ->
#{
tx := Tx
} = Db = ensure_current(Db0),
{LDocKey, LDocVal} = local_doc_to_fdb(Db, Doc),
WasDeleted = case erlfdb:wait(erlfdb:get(Tx, LDocKey)) of
<<_/binary>> -> false;
not_found -> true
end,
case Doc#doc.deleted of
true -> erlfdb:clear(Tx, LDocKey);
false -> erlfdb:set(Tx, LDocKey, LDocVal)
end,
case {WasDeleted, Doc#doc.deleted} of
{true, false} ->
incr_stat(Db, <<"doc_local_count">>, 1);
{false, true} ->
incr_stat(Db, <<"doc_local_count">>, -1);
_ ->
ok
end,
ok.
read_attachment(#{} = Db, DocId, AttId) ->
#{
tx := Tx,
db_prefix := DbPrefix
} = ensure_current(Db),
AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix),
case erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)) of
not_found ->
throw({not_found, missing});
KVs ->
Vs = [V || {_K, V} <- KVs],
iolist_to_binary(Vs)
end.
write_attachment(#{} = Db, DocId, Data) when is_binary(Data) ->
#{
tx := Tx,
db_prefix := DbPrefix
} = ensure_current(Db),
AttId = fabric2_util:uuid(),
Chunks = chunkify_attachment(Data),
lists:foldl(fun(Chunk, ChunkId) ->
AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId, ChunkId}, DbPrefix),
ok = erlfdb:set(Tx, AttKey, Chunk),
ChunkId + 1
end, 0, Chunks),
{ok, AttId}.
get_last_change(#{} = Db) ->
#{
tx := Tx,
db_prefix := DbPrefix
} = ensure_current(Db),
{Start, End} = erlfdb_tuple:range({?DB_CHANGES}, DbPrefix),
Options = [{limit, 1}, {reverse, true}],
case erlfdb:get_range(Tx, Start, End, Options) of
[] ->
vs_to_seq(fabric2_util:seq_zero_vs());
[{K, _V}] ->
{?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix),
vs_to_seq(SeqVS)
end.
fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) ->
#{
tx := Tx
} = ensure_current(Db),
fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options);
fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) ->
case fabric2_util:get_value(limit, Options) of
0 ->
% FoundationDB treats a limit of 0 as unlimited
% so we have to guard for that here.
UserAcc;
_ ->
{Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options),
Callback = fun fold_range_cb/2,
Acc = {skip, Skip, UserCallback, UserAcc},
{skip, _, UserCallback, OutAcc} =
erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts),
OutAcc
end.
vs_to_seq(VS) when is_tuple(VS) ->
% 51 is the versionstamp type tag
<<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({VS}),
fabric2_util:to_hex(SeqBin).
seq_to_vs(Seq) when is_binary(Seq) ->
Seq1 = fabric2_util:from_hex(Seq),
% 51 is the versionstamp type tag
Seq2 = <<51:8, Seq1/binary>>,
{VS} = erlfdb_tuple:unpack(Seq2),
VS.
next_vs({versionstamp, VS, Batch, TxId}) ->
{V, B, T} = case TxId =< 65535 of
true ->
{VS, Batch, TxId + 1};
false ->
case Batch =< 65535 of
true ->
{VS, Batch + 1, 0};
false ->
{VS + 1, 0, 0}
end
end,
{versionstamp, V, B, T}.
debug_cluster() ->
debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
debug_cluster(Start, End) ->
transactional(fun(Tx) ->
lists:foreach(fun({Key, Val}) ->
io:format(standard_error, "~s => ~s~n", [
string:pad(erlfdb_util:repr(Key), 60),
erlfdb_util:repr(Val)
])
end, erlfdb:get_range(Tx, Start, End))
end).
init_db(Tx, DbName, Options) ->
Root = erlfdb_directory:root(),
CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
Prefix = erlfdb_directory:get_name(CouchDB),
Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)),
#{
name => DbName,
tx => Tx,
layer_prefix => Prefix,
md_version => Version,
db_options => Options
}.
load_validate_doc_funs(#{} = Db) ->
FoldFun = fun
({row, Row}, Acc) ->
DDocInfo = #{id => fabric2_util:get_value(id, Row)},
{ok, [DDocInfo | Acc]};
(_, Acc) ->
{ok, Acc}
end,
Options = [
{start_key, <<"_design/">>},
{end_key, <<"_design0">>}
],
{ok, Infos1} = fabric2_db:fold_docs(Db, FoldFun, [], Options),
Infos2 = lists:map(fun(Info) ->
#{
id := DDocId = <<"_design/", _/binary>>
} = Info,
Info#{
rev_info => get_winning_revs_future(Db, DDocId, 1)
}
end, Infos1),
Infos3 = lists:flatmap(fun(Info) ->
#{
id := DDocId,
rev_info := RevInfoFuture
} = Info,
[RevInfo] = get_winning_revs_wait(Db, RevInfoFuture),
#{deleted := Deleted} = RevInfo,
if Deleted -> []; true ->
[Info#{
rev_info := RevInfo,
body => get_doc_body_future(Db, DDocId, RevInfo)
}]
end
end, Infos2),
VDUs = lists:flatmap(fun(Info) ->
#{
id := DDocId,
rev_info := RevInfo,
body := BodyFuture
} = Info,
#doc{} = Doc = get_doc_body_wait(Db, DDocId, RevInfo, BodyFuture),
case couch_doc:get_validate_doc_fun(Doc) of
nil -> [];
Fun -> [Fun]
end
end, Infos3),
Db#{
validate_doc_update_funs := VDUs
}.
bump_metadata_version(Tx) ->
% The 14 zero bytes is pulled from the PR for adding the
% metadata version key. Not sure why 14 bytes when version
% stamps are only 80, but whatever for now.
erlfdb:set_versionstamped_value(Tx, ?METADATA_VERSION_KEY, <<0:112>>).
bump_db_version(#{} = Db) ->
#{
tx := Tx,
db_prefix := DbPrefix
} = Db,
DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
DbVersion = fabric2_util:uuid(),
ok = erlfdb:set(Tx, DbVersionKey, DbVersion).
write_doc_body(#{} = Db0, #doc{} = Doc) ->
#{
tx := Tx
} = Db = ensure_current(Db0),
{NewDocKey, NewDocVal} = doc_to_fdb(Db, Doc),
erlfdb:set(Tx, NewDocKey, NewDocVal).
revinfo_to_fdb(Tx, DbPrefix, DocId, #{winner := true} = RevId) ->
#{
deleted := Deleted,
rev_id := {RevPos, Rev},
rev_path := RevPath,
branch_count := BranchCount
} = RevId,
VS = new_versionstamp(Tx),
Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev},
Val = {?CURR_REV_FORMAT, VS, BranchCount, list_to_tuple(RevPath)},
KBin = erlfdb_tuple:pack(Key, DbPrefix),
VBin = erlfdb_tuple:pack_vs(Val),
{KBin, VBin, VS};
revinfo_to_fdb(_Tx, DbPrefix, DocId, #{} = RevId) ->
#{
deleted := Deleted,
rev_id := {RevPos, Rev},
rev_path := RevPath
} = RevId,
Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev},
Val = {?CURR_REV_FORMAT, list_to_tuple(RevPath)},
KBin = erlfdb_tuple:pack(Key, DbPrefix),
VBin = erlfdb_tuple:pack(Val),
{KBin, VBin, undefined}.
fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _, _} = Val) ->
{?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key,
{_RevFormat, Sequence, BranchCount, RevPath} = Val,
#{
winner => true,
deleted => not NotDeleted,
rev_id => {RevPos, Rev},
rev_path => tuple_to_list(RevPath),
sequence => Sequence,
branch_count => BranchCount
};
fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _} = Val) ->
{?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key,
{_RevFormat, RevPath} = Val,
#{
winner => false,
deleted => not NotDeleted,
rev_id => {RevPos, Rev},
rev_path => tuple_to_list(RevPath),
sequence => undefined,
branch_count => undefined
}.
doc_to_fdb(Db, #doc{} = Doc) ->
#{
db_prefix := DbPrefix
} = Db,
#doc{
id = Id,
revs = {Start, [Rev | _]},
body = Body,
atts = Atts,
deleted = Deleted
} = Doc,
DiskAtts = lists:map(fun couch_att:to_disk_term/1, Atts),
Key = erlfdb_tuple:pack({?DB_DOCS, Id, Start, Rev}, DbPrefix),
Val = {Body, DiskAtts, Deleted},
{Key, term_to_binary(Val, [{minor_version, 1}])}.
fdb_to_doc(Db, DocId, Pos, Path, Bin) when is_binary(Bin) ->
{Body, DiskAtts, Deleted} = binary_to_term(Bin, [safe]),
Atts = lists:map(fun(Att) ->
couch_att:from_disk_term(Db, DocId, Att)
end, DiskAtts),
Doc0 = #doc{
id = DocId,
revs = {Pos, Path},
body = Body,
atts = Atts,
deleted = Deleted
},
case Db of
#{after_doc_read := undefined} -> Doc0;
#{after_doc_read := ADR} -> ADR(Doc0, Db)
end;
fdb_to_doc(_Db, _DocId, _Pos, _Path, not_found) ->
{not_found, missing}.
local_doc_to_fdb(Db, #doc{} = Doc) ->
#{
db_prefix := DbPrefix
} = Db,
#doc{
id = Id,
revs = {0, [Rev]},
body = Body
} = Doc,
StoreRev = case Rev of
_ when is_integer(Rev) -> integer_to_binary(Rev);
_ when is_binary(Rev) -> Rev
end,
Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id}, DbPrefix),
Val = {StoreRev, Body},
{Key, term_to_binary(Val, [{minor_version, 1}])}.
fdb_to_local_doc(_Db, DocId, Bin) when is_binary(Bin) ->
{Rev, Body} = binary_to_term(Bin, [safe]),
#doc{
id = DocId,
revs = {0, [Rev]},
deleted = false,
body = Body
};
fdb_to_local_doc(_Db, _DocId, not_found) ->
{not_found, missing}.
chunkify_attachment(Data) ->
case Data of
<<>> ->
[];
<<Head:?ATTACHMENT_CHUNK_SIZE/binary, Rest/binary>> ->
[Head | chunkify_attachment(Rest)];
<<_/binary>> when size(Data) < ?ATTACHMENT_CHUNK_SIZE ->
[Data]
end.
get_fold_opts(RangePrefix, Options) ->
Reverse = case fabric2_util:get_value(dir, Options) of
rev -> true;
_ -> false
end,
StartKey0 = fabric2_util:get_value(start_key, Options),
EndKeyGt = fabric2_util:get_value(end_key_gt, Options),
EndKey0 = fabric2_util:get_value(end_key, Options, EndKeyGt),
InclusiveEnd = EndKeyGt == undefined,
% CouchDB swaps the key meanings based on the direction
% of the fold. FoundationDB does not so we have to
% swap back here.
{StartKey1, EndKey1} = case Reverse of
false -> {StartKey0, EndKey0};
true -> {EndKey0, StartKey0}
end,
% Set the maximum bounds for the start and endkey
StartKey2 = case StartKey1 of
undefined -> <<>>;
SK2 -> SK2
end,
EndKey2 = case EndKey1 of
undefined -> <<255>>;
EK2 -> EK2
end,
StartKey3 = erlfdb_tuple:pack({StartKey2}, RangePrefix),
EndKey3 = erlfdb_tuple:pack({EndKey2}, RangePrefix),
% FoundationDB ranges are applied as SK <= key < EK
% By default, CouchDB is SK <= key <= EK with the
% optional inclusive_end=false option changing that
% to SK <= key < EK. Also, remember that CouchDB
% swaps the meaning of SK and EK based on direction.
%
% Thus we have this wonderful bit of logic to account
% for all of those combinations.
StartKey4 = case {Reverse, InclusiveEnd} of
{true, false} ->
erlfdb_key:first_greater_than(StartKey3);
_ ->
StartKey3
end,
EndKey4 = case {Reverse, InclusiveEnd} of
{false, true} when EndKey0 /= undefined ->
erlfdb_key:first_greater_than(EndKey3);
{true, _} ->
erlfdb_key:first_greater_than(EndKey3);
_ ->
EndKey3
end,
Skip = case fabric2_util:get_value(skip, Options) of
S when is_integer(S), S >= 0 -> S;
_ -> 0
end,
Limit = case fabric2_util:get_value(limit, Options) of
L when is_integer(L), L >= 0 -> [{limit, L + Skip}];
undefined -> []
end,
TargetBytes = case fabric2_util:get_value(target_bytes, Options) of
T when is_integer(T), T >= 0 -> [{target_bytes, T}];
undefined -> []
end,
StreamingMode = case fabric2_util:get_value(streaming_mode, Options) of
undefined -> [];
Name when is_atom(Name) -> [{streaming_mode, Name}]
end,
Snapshot = case fabric2_util:get_value(snapshot, Options) of
undefined -> [];
B when is_boolean(B) -> [{snapshot, B}]
end,
OutOpts = [{reverse, Reverse}]
++ Limit
++ TargetBytes
++ StreamingMode
++ Snapshot,
{StartKey4, EndKey4, Skip, OutOpts}.
fold_range_cb(KV, {skip, 0, Callback, Acc}) ->
NewAcc = Callback(KV, Acc),
{skip, 0, Callback, NewAcc};
fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 ->
{skip, N - 1, Callback, Acc}.
get_db_handle() ->
case get(?PDICT_DB_KEY) of
undefined ->
{ok, Db} = application:get_env(fabric, db),
put(?PDICT_DB_KEY, Db),
Db;
Db ->
Db
end.
require_transaction(#{tx := {erlfdb_transaction, _}} = _Db) ->
ok;
require_transaction(#{} = _Db) ->
erlang:error(transaction_required).
ensure_current(Db) ->
ensure_current(Db, true).
ensure_current(#{} = Db, CheckDbVersion) ->
require_transaction(Db),
#{
tx := Tx,
md_version := MetaDataVersion
} = Db,
case erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)) of
MetaDataVersion -> Db;
_NewVersion -> reopen(Db)
end,
AlreadyChecked = get(?PDICT_CHECKED_DB_IS_CURRENT),
if not CheckDbVersion orelse AlreadyChecked == true -> Db; true ->
#{
db_prefix := DbPrefix,
db_version := DbVersion
} = Db,
DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
case erlfdb:wait(erlfdb:get(Tx, DbVersionKey)) of
DbVersion ->
put(?PDICT_CHECKED_DB_IS_CURRENT, true),
Db;
_NewDBVersion ->
fabric2_server:remove(maps:get(name, Db)),
reopen(Db)
end
end.
is_transaction_applied(Tx) ->
is_commit_unknown_result()
andalso has_transaction_id()
andalso transaction_id_exists(Tx).
get_previous_transaction_result() ->
get(?PDICT_TX_RES_KEY).
execute_transaction(Tx, Fun, LayerPrefix) ->
put(?PDICT_CHECKED_DB_IS_CURRENT, false),
Result = Fun(Tx),
case erlfdb:is_read_only(Tx) of
true ->
ok;
false ->
erlfdb:set(Tx, get_transaction_id(Tx, LayerPrefix), <<>>),
put(?PDICT_TX_RES_KEY, Result)
end,
Result.
clear_transaction() ->
fabric2_txids:remove(get(?PDICT_TX_ID_KEY)),
erase(?PDICT_CHECKED_DB_IS_CURRENT),
erase(?PDICT_TX_ID_KEY),
erase(?PDICT_TX_RES_KEY).
is_commit_unknown_result() ->
erlfdb:get_last_error() == ?COMMIT_UNKNOWN_RESULT.
has_transaction_id() ->
is_binary(get(?PDICT_TX_ID_KEY)).
transaction_id_exists(Tx) ->
erlfdb:wait(erlfdb:get(Tx, get(?PDICT_TX_ID_KEY))) == <<>>.
get_transaction_id(Tx, LayerPrefix) ->
case get(?PDICT_TX_ID_KEY) of
undefined ->
TxId = fabric2_txids:create(Tx, LayerPrefix),
put(?PDICT_TX_ID_KEY, TxId),
TxId;
TxId when is_binary(TxId) ->
TxId
end.
new_versionstamp(Tx) ->
TxId = erlfdb:get_next_tx_id(Tx),
{versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.