| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(fabric2_fdb). |
| |
| |
| -export([ |
| transactional/1, |
| transactional/2, |
| transactional/3, |
| |
| with_snapshot/2, |
| |
| create/2, |
| open/2, |
| ensure_current/1, |
| delete/1, |
| undelete/3, |
| remove_deleted_db/2, |
| exists/1, |
| |
| get_dir/1, |
| |
| list_dbs/4, |
| list_dbs_info/4, |
| list_deleted_dbs_info/4, |
| |
| get_info/1, |
| get_info_future/2, |
| get_info_wait/1, |
| set_config/3, |
| |
| get_stat/2, |
| incr_stat/3, |
| incr_stat/4, |
| |
| get_all_revs/2, |
| get_all_revs_future/2, |
| get_winning_revs/3, |
| get_winning_revs_future/3, |
| get_revs_wait/2, |
| get_non_deleted_rev/3, |
| |
| get_doc_body/3, |
| get_doc_body_future/3, |
| get_doc_body_wait/4, |
| |
| get_local_doc_rev_future/2, |
| get_local_doc_rev_wait/1, |
| get_local_doc_body_future/3, |
| get_local_doc_body_wait/4, |
| get_local_doc/2, |
| get_local_doc_rev/3, |
| |
| write_doc/6, |
| write_local_doc/2, |
| |
| read_attachment/3, |
| write_attachment/4, |
| |
| get_last_change/1, |
| |
| fold_range/5, |
| |
| vs_to_seq/1, |
| seq_to_vs/1, |
| next_vs/1, |
| |
| new_versionstamp/1, |
| |
| get_approximate_tx_size/1, |
| |
| chunkify_binary/1, |
| chunkify_binary/2, |
| |
| debug_cluster/0, |
| debug_cluster/2 |
| ]). |
| |
| |
| -include_lib("couch/include/couch_db.hrl"). |
| -include("fabric2.hrl"). |
| |
| |
| -record(fold_acc, { |
| db, |
| restart_tx, |
| start_key, |
| end_key, |
| limit, |
| skip, |
| retries, |
| base_opts, |
| user_fun, |
| user_acc |
| }). |
| |
| -record(info_future, { |
| tx, |
| db_prefix, |
| changes_future, |
| meta_future, |
| uuid_future, |
| retries = 0 |
| }). |
| |
| |
| transactional(Fun) -> |
| do_transaction(Fun, undefined, #{}). |
| |
| |
| transactional(DbName, Fun) when is_binary(DbName), is_function(Fun) -> |
| transactional(DbName, #{}, Fun); |
| |
| transactional(#{} = Db, Fun) when is_function(Fun) -> |
| transactional(Db, #{}, Fun). |
| |
| |
| |
| transactional(DbName, #{} = TxOptions, Fun) when is_binary(DbName) -> |
| with_span(Fun, #{'db.name' => DbName}, fun() -> |
| do_transaction(fun(Tx) -> |
| Fun(init_db(Tx, DbName)) |
| end, undefined, TxOptions) |
| end); |
| |
| transactional(#{tx := undefined} = Db, #{} = TxOptions, Fun) -> |
| DbName = maps:get(name, Db, undefined), |
| try |
| Db1 = refresh(Db), |
| Reopen = maps:get(reopen, Db1, false), |
| Db2 = maps:remove(reopen, Db1), |
| LayerPrefix = case Reopen of |
| true -> undefined; |
| false -> maps:get(layer_prefix, Db2) |
| end, |
| with_span(Fun, #{'db.name' => DbName}, fun() -> |
| do_transaction(fun(Tx) -> |
| case Reopen of |
| true -> Fun(reopen(Db2#{tx => Tx})); |
| false -> Fun(Db2#{tx => Tx}) |
| end |
| end, LayerPrefix, TxOptions) |
| end) |
| catch throw:{?MODULE, reopen} -> |
| with_span('db.reopen', #{'db.name' => DbName}, fun() -> |
| transactional(Db#{reopen => true}, Fun) |
| end) |
| end; |
| transactional(#{tx := {erlfdb_snapshot, _}} = Db, #{} = _TxOptions, Fun) -> |
| DbName = maps:get(name, Db, undefined), |
| with_span(Fun, #{'db.name' => DbName}, fun() -> |
| Fun(Db) |
| end); |
| |
| transactional(#{tx := {erlfdb_transaction, _}} = Db, #{} = _TxOptions, Fun) -> |
| DbName = maps:get(name, Db, undefined), |
| with_span(Fun, #{'db.name' => DbName}, fun() -> |
| Fun(Db) |
| end). |
| |
| |
| do_transaction(Fun, LayerPrefix, #{} = TxOptions) when is_function(Fun, 1) -> |
| Db = get_db_handle(), |
| try |
| erlfdb:transactional(Db, fun(Tx) -> |
| apply_tx_options(Tx, TxOptions), |
| case get(erlfdb_trace) of |
| Name when is_binary(Name) -> |
| UId = erlang:unique_integer([positive]), |
| UIdBin = integer_to_binary(UId, 36), |
| TxId = <<Name/binary, "_", UIdBin/binary>>, |
| erlfdb:set_option(Tx, transaction_logging_enable, TxId); |
| _ -> |
| ok |
| end, |
| case is_transaction_applied(Tx) of |
| true -> |
| get_previous_transaction_result(); |
| false -> |
| execute_transaction(Tx, Fun, LayerPrefix) |
| end |
| end) |
| after |
| clear_transaction() |
| end. |
| |
| |
| apply_tx_options(Tx, #{} = TxOptions) -> |
| maps:map(fun(K, V) -> |
| erlfdb:set_option(Tx, K, V) |
| end, TxOptions). |
| |
| |
| with_snapshot(#{tx := {erlfdb_transaction, _} = Tx} = TxDb, Fun) -> |
| SSDb = TxDb#{tx := erlfdb:snapshot(Tx)}, |
| Fun(SSDb); |
| |
| with_snapshot(#{tx := {erlfdb_snapshot, _}} = SSDb, Fun) -> |
| Fun(SSDb). |
| |
| |
| create(#{} = Db0, Options) -> |
| #{ |
| name := DbName, |
| tx := Tx, |
| layer_prefix := LayerPrefix |
| } = Db1 = ensure_current(Db0, false), |
| |
| DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix), |
| HCA = erlfdb_hca:create(erlfdb_tuple:pack({?DB_HCA}, LayerPrefix)), |
| AllocPrefix = erlfdb_hca:allocate(HCA, Tx), |
| DbPrefix = erlfdb_tuple:pack({?DBS, AllocPrefix}, LayerPrefix), |
| erlfdb:set(Tx, DbKey, DbPrefix), |
| |
| % This key is responsible for telling us when something in |
| % the database cache (i.e., fabric2_server's ets table) has |
| % changed and requires re-loading. This currently includes |
| % revs_limit and validate_doc_update functions. There's |
| % no order to versioning here. Its just a value that changes |
| % that is used in the ensure_current check. |
| DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix), |
| DbVersion = fabric2_util:uuid(), |
| erlfdb:set(Tx, DbVersionKey, DbVersion), |
| |
| UUID = fabric2_util:uuid(), |
| |
| Defaults = [ |
| {?DB_CONFIG, <<"uuid">>, UUID}, |
| {?DB_CONFIG, <<"revs_limit">>, ?uint2bin(1000)}, |
| {?DB_CONFIG, <<"security_doc">>, <<"{}">>}, |
| {?DB_STATS, <<"doc_count">>, ?uint2bin(0)}, |
| {?DB_STATS, <<"doc_del_count">>, ?uint2bin(0)}, |
| {?DB_STATS, <<"doc_design_count">>, ?uint2bin(0)}, |
| {?DB_STATS, <<"doc_local_count">>, ?uint2bin(0)}, |
| {?DB_STATS, <<"sizes">>, <<"external">>, ?uint2bin(2)}, |
| {?DB_STATS, <<"sizes">>, <<"views">>, ?uint2bin(0)} |
| ], |
| lists:foreach(fun |
| ({P, K, V}) -> |
| Key = erlfdb_tuple:pack({P, K}, DbPrefix), |
| erlfdb:set(Tx, Key, V); |
| ({P, S, K, V}) -> |
| Key = erlfdb_tuple:pack({P, S, K}, DbPrefix), |
| erlfdb:set(Tx, Key, V) |
| end, Defaults), |
| |
| UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}), |
| Options1 = lists:keydelete(user_ctx, 1, Options), |
| |
| Db2 = Db1#{ |
| uuid => UUID, |
| db_prefix => DbPrefix, |
| db_version => DbVersion, |
| |
| revs_limit => 1000, |
| security_doc => {[]}, |
| user_ctx => UserCtx, |
| check_current_ts => erlang:monotonic_time(millisecond), |
| |
| validate_doc_update_funs => [], |
| before_doc_update => undefined, |
| after_doc_read => undefined, |
| % All other db things as we add features, |
| |
| db_options => Options1, |
| interactive => false |
| }, |
| aegis:init_db(Db2, Options). |
| |
| |
| open(#{} = Db0, Options) -> |
| #{ |
| name := DbName, |
| tx := Tx, |
| layer_prefix := LayerPrefix |
| } = Db1 = ensure_current(Db0, false), |
| |
| DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix), |
| DbPrefix = case erlfdb:wait(erlfdb:get(Tx, DbKey)) of |
| Bin when is_binary(Bin) -> Bin; |
| not_found -> erlang:error(database_does_not_exist) |
| end, |
| |
| DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix), |
| DbVersion = erlfdb:wait(erlfdb:get(Tx, DbVersionKey)), |
| |
| UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}), |
| Options1 = lists:keydelete(user_ctx, 1, Options), |
| |
| UUID = fabric2_util:get_value(uuid, Options1), |
| Options2 = lists:keydelete(uuid, 1, Options1), |
| |
| Interactive = fabric2_util:get_value(interactive, Options2, false), |
| Options3 = lists:keydelete(interactive, 1, Options2), |
| |
| Db2 = Db1#{ |
| db_prefix => DbPrefix, |
| db_version => DbVersion, |
| |
| uuid => <<>>, |
| revs_limit => 1000, |
| security_doc => {[]}, |
| |
| user_ctx => UserCtx, |
| check_current_ts => erlang:monotonic_time(millisecond), |
| |
| % Place holders until we implement these |
| % bits. |
| validate_doc_update_funs => [], |
| before_doc_update => undefined, |
| after_doc_read => undefined, |
| |
| db_options => Options3, |
| interactive => Interactive |
| }, |
| |
| Db3 = load_config(Db2), |
| Db4 = aegis:open_db(Db3), |
| |
| case {UUID, Db4} of |
| {undefined, _} -> ok; |
| {<<_/binary>>, #{uuid := UUID}} -> ok; |
| {<<_/binary>>, #{uuid := _}} -> erlang:error(database_does_not_exist) |
| end, |
| |
| load_validate_doc_funs(Db4). |
| |
| |
| % Match on `name` in the function head since some non-fabric2 db |
| % objects might not have names and so they don't get cached |
| refresh(#{tx := undefined, name := DbName} = Db) -> |
| #{ |
| uuid := UUID, |
| md_version := OldVer |
| } = Db, |
| |
| case fabric2_server:fetch(DbName, UUID) of |
| % Relying on these assumptions about the `md_version` value: |
| % - It is bumped every time `db_version` is bumped |
| % - Is a versionstamp, so we can check which one is newer |
| % - If it is `not_found`, it would sort less than a binary value |
| #{md_version := Ver} = Db1 when Ver > OldVer -> |
| Db1#{ |
| user_ctx := maps:get(user_ctx, Db), |
| security_fun := maps:get(security_fun, Db), |
| interactive := maps:get(interactive, Db) |
| }; |
| _ -> |
| Db |
| end; |
| |
| refresh(#{} = Db) -> |
| Db. |
| |
| |
| |
| reopen(#{} = OldDb) -> |
| require_transaction(OldDb), |
| #{ |
| tx := Tx, |
| name := DbName, |
| uuid := UUID, |
| db_options := Options, |
| user_ctx := UserCtx, |
| security_fun := SecurityFun, |
| interactive := Interactive |
| } = OldDb, |
| Options1 = lists:keystore(user_ctx, 1, Options, {user_ctx, UserCtx}), |
| NewDb = open(init_db(Tx, DbName), Options1), |
| |
| % Check if database was re-created |
| case {Interactive, maps:get(uuid, NewDb)} of |
| {true, _} -> ok; |
| {false, UUID} -> ok; |
| {false, _OtherUUID} -> error(database_does_not_exist) |
| end, |
| |
| NewDb#{security_fun := SecurityFun, interactive := Interactive}. |
| |
| |
| delete(#{} = Db) -> |
| DoRecovery = fabric2_util:do_recovery(), |
| case DoRecovery of |
| true -> soft_delete_db(Db); |
| false -> hard_delete_db(Db) |
| end. |
| |
| |
| undelete(#{} = Db0, TgtDbName, TimeStamp) -> |
| #{ |
| name := DbName, |
| tx := Tx, |
| layer_prefix := LayerPrefix |
| } = ensure_current(Db0, false), |
| DbKey = erlfdb_tuple:pack({?ALL_DBS, TgtDbName}, LayerPrefix), |
| case erlfdb:wait(erlfdb:get(Tx, DbKey)) of |
| Bin when is_binary(Bin) -> |
| file_exists; |
| not_found -> |
| DeletedDbTupleKey = { |
| ?DELETED_DBS, |
| DbName, |
| TimeStamp |
| }, |
| DeleteDbKey = erlfdb_tuple:pack(DeletedDbTupleKey, LayerPrefix), |
| case erlfdb:wait(erlfdb:get(Tx, DeleteDbKey)) of |
| not_found -> |
| not_found; |
| DbPrefix -> |
| erlfdb:set(Tx, DbKey, DbPrefix), |
| erlfdb:clear(Tx, DeleteDbKey), |
| bump_db_version(#{ |
| tx => Tx, |
| db_prefix => DbPrefix |
| }), |
| ok |
| end |
| end. |
| |
| |
| remove_deleted_db(#{} = Db0, TimeStamp) -> |
| #{ |
| name := DbName, |
| tx := Tx, |
| layer_prefix := LayerPrefix |
| } = ensure_current(Db0, false), |
| |
| DeletedDbTupleKey = { |
| ?DELETED_DBS, |
| DbName, |
| TimeStamp |
| }, |
| DeletedDbKey = erlfdb_tuple:pack(DeletedDbTupleKey, LayerPrefix), |
| case erlfdb:wait(erlfdb:get(Tx, DeletedDbKey)) of |
| not_found -> |
| not_found; |
| DbPrefix -> |
| erlfdb:clear(Tx, DeletedDbKey), |
| erlfdb:clear_range_startswith(Tx, DbPrefix), |
| bump_db_version(#{ |
| tx => Tx, |
| db_prefix => DbPrefix |
| }), |
| ok |
| end. |
| |
| |
| exists(#{name := DbName} = Db) when is_binary(DbName) -> |
| #{ |
| tx := Tx, |
| layer_prefix := LayerPrefix |
| } = ensure_current(Db, false), |
| |
| DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix), |
| case erlfdb:wait(erlfdb:get(Tx, DbKey)) of |
| Bin when is_binary(Bin) -> true; |
| not_found -> false |
| end. |
| |
| |
| get_dir(Tx) -> |
| Root = erlfdb_directory:root(), |
| Dir = fabric2_server:fdb_directory(), |
| CouchDB = erlfdb_directory:create_or_open(Tx, Root, Dir), |
| erlfdb_directory:get_name(CouchDB). |
| |
| |
| list_dbs(Tx, Callback, AccIn, Options0) -> |
| Options = case fabric2_util:get_value(restart_tx, Options0) of |
| undefined -> [{restart_tx, true} | Options0]; |
| _AlreadySet -> Options0 |
| end, |
| LayerPrefix = get_dir(Tx), |
| Prefix = erlfdb_tuple:pack({?ALL_DBS}, LayerPrefix), |
| fold_range({tx, Tx}, Prefix, fun({K, _V}, Acc) -> |
| {DbName} = erlfdb_tuple:unpack(K, Prefix), |
| Callback(DbName, Acc) |
| end, AccIn, Options). |
| |
| |
| list_dbs_info(Tx, Callback, AccIn, Options0) -> |
| Options = case fabric2_util:get_value(restart_tx, Options0) of |
| undefined -> [{restart_tx, true} | Options0]; |
| _AlreadySet -> Options0 |
| end, |
| LayerPrefix = get_dir(Tx), |
| Prefix = erlfdb_tuple:pack({?ALL_DBS}, LayerPrefix), |
| fold_range({tx, Tx}, Prefix, fun({DbNameKey, DbPrefix}, Acc) -> |
| {DbName} = erlfdb_tuple:unpack(DbNameKey, Prefix), |
| InfoFuture = get_info_future(Tx, DbPrefix), |
| Callback(DbName, InfoFuture, Acc) |
| end, AccIn, Options). |
| |
| |
| list_deleted_dbs_info(Tx, Callback, AccIn, Options0) -> |
| Options = case fabric2_util:get_value(restart_tx, Options0) of |
| undefined -> [{restart_tx, true} | Options0]; |
| _AlreadySet -> Options0 |
| end, |
| LayerPrefix = get_dir(Tx), |
| Prefix = erlfdb_tuple:pack({?DELETED_DBS}, LayerPrefix), |
| fold_range({tx, Tx}, Prefix, fun({DbKey, DbPrefix}, Acc) -> |
| {DbName, TimeStamp} = erlfdb_tuple:unpack(DbKey, Prefix), |
| InfoFuture = get_info_future(Tx, DbPrefix), |
| Callback(DbName, TimeStamp, InfoFuture, Acc) |
| end, AccIn, Options). |
| |
| |
| get_info(#{} = Db) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = ensure_current(Db), |
| DbInfo = get_info_wait(get_info_future(Tx, DbPrefix)), |
| AegisProps = aegis:get_db_info(Db), |
| [{encryption, {AegisProps}} | DbInfo]. |
| |
| |
| get_info_future(Tx, DbPrefix) -> |
| {CStart, CEnd} = erlfdb_tuple:range({?DB_CHANGES}, DbPrefix), |
| ChangesFuture = erlfdb:get_range(Tx, CStart, CEnd, [ |
| {streaming_mode, exact}, |
| {limit, 1}, |
| {reverse, true} |
| ]), |
| |
| UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix), |
| UUIDFuture = erlfdb:get(Tx, UUIDKey), |
| |
| StatsPrefix = erlfdb_tuple:pack({?DB_STATS}, DbPrefix), |
| MetaFuture = erlfdb:get_range_startswith(Tx, StatsPrefix), |
| |
| % Save the tx object only if it's read-only as we might retry to get the |
| % future again after the tx was reset |
| SaveTx = case erlfdb:get_writes_allowed(Tx) of |
| true -> undefined; |
| false -> Tx |
| end, |
| |
| #info_future{ |
| tx = SaveTx, |
| db_prefix = DbPrefix, |
| changes_future = ChangesFuture, |
| meta_future = MetaFuture, |
| uuid_future = UUIDFuture |
| }. |
| |
| |
| get_info_wait(#info_future{tx = Tx, retries = Retries} = Future) |
| when Tx =:= undefined orelse Retries >= 2 -> |
| get_info_wait_int(Future); |
| |
| get_info_wait(#info_future{tx = Tx, retries = Retries} = Future) -> |
| try |
| get_info_wait_int(Future) |
| catch |
| error:{erlfdb_error, ?ERLFDB_TRANSACTION_CANCELLED} -> |
| Future1 = get_info_future(Tx, Future#info_future.db_prefix), |
| get_info_wait(Future1#info_future{retries = Retries + 1}); |
| error:{erlfdb_error, Error} when ?ERLFDB_IS_RETRYABLE(Error) -> |
| ok = erlfdb:reset(Tx), |
| Future1 = get_info_future(Tx, Future#info_future.db_prefix), |
| get_info_wait(Future1#info_future{retries = Retries + 1}) |
| end. |
| |
| |
| load_config(#{} = Db) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = Db, |
| |
| {Start, End} = erlfdb_tuple:range({?DB_CONFIG}, DbPrefix), |
| Future = erlfdb:get_range(Tx, Start, End), |
| |
| lists:foldl(fun({K, V}, DbAcc) -> |
| {?DB_CONFIG, Key} = erlfdb_tuple:unpack(K, DbPrefix), |
| case Key of |
| <<"uuid">> -> DbAcc#{uuid := V}; |
| <<"revs_limit">> -> DbAcc#{revs_limit := ?bin2uint(V)}; |
| <<"security_doc">> -> DbAcc#{security_doc := ?JSON_DECODE(V)} |
| end |
| end, Db, erlfdb:wait(Future)). |
| |
| |
| set_config(#{} = Db0, Key, Val) when is_atom(Key) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = Db = ensure_current(Db0), |
| {BinKey, BinVal} = case Key of |
| uuid -> {<<"uuid">>, Val}; |
| revs_limit -> {<<"revs_limit">>, ?uint2bin(max(1, Val))}; |
| security_doc -> {<<"security_doc">>, ?JSON_ENCODE(Val)} |
| end, |
| DbKey = erlfdb_tuple:pack({?DB_CONFIG, BinKey}, DbPrefix), |
| erlfdb:set(Tx, DbKey, BinVal), |
| {ok, DbVersion} = bump_db_version(Db), |
| {ok, Db#{db_version := DbVersion, Key := Val}}. |
| |
| |
| get_stat(#{} = Db, StatKey) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = ensure_current(Db), |
| |
| Key = erlfdb_tuple:pack({?DB_STATS, StatKey}, DbPrefix), |
| |
| % Might need to figure out some sort of type |
| % system here. Uints are because stats are all |
| % atomic op adds for the moment. |
| ?bin2uint(erlfdb:wait(erlfdb:get(Tx, Key))). |
| |
| |
| incr_stat(_Db, _StatKey, 0) -> |
| ok; |
| |
| incr_stat(#{} = Db, StatKey, Increment) when is_integer(Increment) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = ensure_current(Db), |
| |
| Key = erlfdb_tuple:pack({?DB_STATS, StatKey}, DbPrefix), |
| erlfdb:add(Tx, Key, Increment). |
| |
| |
| incr_stat(_Db, _Section, _Key, 0) -> |
| ok; |
| |
| incr_stat(#{} = Db, Section, Key, Increment) when is_integer(Increment) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = ensure_current(Db), |
| |
| BinKey = erlfdb_tuple:pack({?DB_STATS, Section, Key}, DbPrefix), |
| erlfdb:add(Tx, BinKey, Increment). |
| |
| |
| get_all_revs(#{} = Db, DocId) -> |
| DbName = maps:get(name, Db, undefined), |
| with_span('db.get_all_revs', #{'db.name' => DbName, 'doc.id' => DocId}, fun() -> |
| Future = get_all_revs_future(Db, DocId), |
| get_revs_wait(Db, Future) |
| end). |
| |
| |
| get_all_revs_future(#{} = Db, DocId) -> |
| Options = [{streaming_mode, want_all}], |
| get_revs_future(Db, DocId, Options). |
| |
| |
| get_winning_revs(Db, DocId, NumRevs) -> |
| DbName = maps:get(name, Db, undefined), |
| with_span('db.get_winning_revs', #{'db.name' => DbName, 'doc.id' => DocId}, fun() -> |
| Future = get_winning_revs_future(Db, DocId, NumRevs), |
| get_revs_wait(Db, Future) |
| end). |
| |
| |
| get_winning_revs_future(#{} = Db, DocId, NumRevs) -> |
| Options = [{reverse, true}, {limit, NumRevs}], |
| get_revs_future(Db, DocId, Options). |
| |
| |
| get_revs_future(#{} = Db, DocId, Options) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = ensure_current(Db), |
| |
| {StartKey, EndKey} = erlfdb_tuple:range({?DB_REVS, DocId}, DbPrefix), |
| erlfdb:fold_range_future(Tx, StartKey, EndKey, Options). |
| |
| |
| get_revs_wait(#{} = Db, RangeFuture) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = ensure_current(Db), |
| |
| RevRows = erlfdb:fold_range_wait(Tx, RangeFuture, fun({K, V}, Acc) -> |
| Key = erlfdb_tuple:unpack(K, DbPrefix), |
| Val = erlfdb_tuple:unpack(V), |
| [fdb_to_revinfo(Key, Val) | Acc] |
| end, []), |
| lists:reverse(RevRows). |
| |
| |
| get_non_deleted_rev(#{} = Db, DocId, RevId) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = ensure_current(Db), |
| |
| {RevPos, Rev} = RevId, |
| |
| BaseKey = {?DB_REVS, DocId, true, RevPos, Rev}, |
| Key = erlfdb_tuple:pack(BaseKey, DbPrefix), |
| case erlfdb:wait(erlfdb:get(Tx, Key)) of |
| not_found -> |
| not_found; |
| Val -> |
| fdb_to_revinfo(BaseKey, erlfdb_tuple:unpack(Val)) |
| end. |
| |
| |
| get_doc_body(Db, DocId, RevInfo) -> |
| DbName = maps:get(name, Db, undefined), |
| with_span('db.get_doc_body', #{'db.name' => DbName, 'doc.id' => DocId}, fun() -> |
| Future = get_doc_body_future(Db, DocId, RevInfo), |
| get_doc_body_wait(Db, DocId, RevInfo, Future) |
| end). |
| |
| |
| get_doc_body_future(#{} = Db, DocId, RevInfo) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = ensure_current(Db), |
| |
| #{ |
| rev_id := {RevPos, Rev} |
| } = RevInfo, |
| |
| Key = {?DB_DOCS, DocId, RevPos, Rev}, |
| {StartKey, EndKey} = erlfdb_tuple:range(Key, DbPrefix), |
| erlfdb:fold_range_future(Tx, StartKey, EndKey, []). |
| |
| |
| get_doc_body_wait(#{} = Db0, DocId, RevInfo, Future) -> |
| #{ |
| tx := Tx |
| } = Db = ensure_current(Db0), |
| |
| #{ |
| rev_id := {RevPos, Rev}, |
| rev_path := RevPath |
| } = RevInfo, |
| |
| FoldFun = aegis:wrap_fold_fun(Db, fun({_K, V}, Acc) -> |
| [V | Acc] |
| end), |
| RevBodyRows = erlfdb:fold_range_wait(Tx, Future, FoldFun, []), |
| BodyRows = lists:reverse(RevBodyRows), |
| |
| fdb_to_doc(Db, DocId, RevPos, [Rev | RevPath], BodyRows). |
| |
| |
| get_local_doc_rev_future(Db, DocId) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = ensure_current(Db), |
| |
| Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, DocId}, DbPrefix), |
| erlfdb:get(Tx, Key). |
| |
| |
| get_local_doc_rev_wait(Future) -> |
| erlfdb:wait(Future). |
| |
| |
| get_local_doc_body_future(#{} = Db, DocId, _Rev) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = ensure_current(Db), |
| |
| Prefix = erlfdb_tuple:pack({?DB_LOCAL_DOC_BODIES, DocId}, DbPrefix), |
| erlfdb:get_range_startswith(Tx, Prefix). |
| |
| |
| get_local_doc_body_wait(#{} = Db0, DocId, Rev, Future) -> |
| Db = ensure_current(Db0), |
| |
| {_, Chunks} = lists:unzip(aegis:decrypt(Db, erlfdb:wait(Future))), |
| fdb_to_local_doc(Db, DocId, Rev, Chunks). |
| |
| |
| get_local_doc(#{} = Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId) -> |
| RevFuture = get_local_doc_rev_future(Db, DocId), |
| Rev = get_local_doc_rev_wait(RevFuture), |
| |
| BodyFuture = get_local_doc_body_future(Db, DocId, Rev), |
| get_local_doc_body_wait(Db, DocId, Rev, BodyFuture). |
| |
| |
| get_local_doc_rev(_Db0, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, Val) -> |
| case Val of |
| <<255, RevBin/binary>> -> |
| % Versioned local docs |
| try |
| case erlfdb_tuple:unpack(RevBin) of |
| {?CURR_LDOC_FORMAT, Rev, _Size} -> Rev |
| end |
| catch _:_ -> |
| erlang:error({invalid_local_doc_rev, DocId, Val}) |
| end; |
| <<131, _/binary>> -> |
| % Compatibility clause for an older encoding format |
| try binary_to_term(Val, [safe]) of |
| {Rev, _} -> Rev; |
| _ -> erlang:error({invalid_local_doc_rev, DocId, Val}) |
| catch |
| error:badarg -> |
| erlang:error({invalid_local_doc_rev, DocId, Val}) |
| end; |
| <<_/binary>> -> |
| try binary_to_integer(Val) of |
| IntVal when IntVal >= 0 -> |
| Val; |
| _ -> |
| erlang:error({invalid_local_doc_rev, DocId, Val}) |
| catch |
| error:badarg -> |
| erlang:error({invalid_local_doc_rev, DocId, Val}) |
| end |
| end. |
| |
| |
| write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = Db = ensure_current(Db0), |
| |
| #doc{ |
| id = DocId, |
| deleted = Deleted, |
| atts = Atts |
| } = Doc, |
| |
| % Doc body |
| |
| ok = write_doc_body(Db, Doc), |
| |
| % Attachment bookkeeping |
| |
| % If a document's attachments have changed we have to scan |
| % for any attachments that may need to be deleted. The check |
| % for `>= 2` is a bit subtle. The important point is that |
| % one of the revisions will be from the new document so we |
| % have to find at least one more beyond that to assert that |
| % the attachments have not changed. |
| AttHash = fabric2_util:hash_atts(Atts), |
| RevsToCheck = [NewWinner0] ++ ToUpdate ++ ToRemove, |
| AttHashCount = lists:foldl(fun(Att, Count) -> |
| #{att_hash := RevAttHash} = Att, |
| case RevAttHash == AttHash of |
| true -> Count + 1; |
| false -> Count |
| end |
| end, 0, RevsToCheck), |
| if |
| AttHashCount == length(RevsToCheck) -> |
| ok; |
| AttHashCount >= 2 -> |
| ok; |
| true -> |
| cleanup_attachments(Db, DocId, Doc, ToRemove) |
| end, |
| |
| % Revision tree |
| |
| NewWinner = NewWinner0#{ |
| winner := true |
| }, |
| NewRevId = maps:get(rev_id, NewWinner), |
| |
| {WKey, WVal, WinnerVS} = revinfo_to_fdb(Tx, DbPrefix, DocId, NewWinner), |
| ok = erlfdb:set_versionstamped_value(Tx, WKey, WVal), |
| |
| lists:foreach(fun(RI0) -> |
| RI = RI0#{winner := false}, |
| {K, V, undefined} = revinfo_to_fdb(Tx, DbPrefix, DocId, RI), |
| ok = erlfdb:set(Tx, K, V) |
| end, ToUpdate), |
| |
| lists:foreach(fun(RI0) -> |
| RI = RI0#{winner := false}, |
| {K, _, undefined} = revinfo_to_fdb(Tx, DbPrefix, DocId, RI), |
| ok = erlfdb:clear(Tx, K), |
| ok = clear_doc_body(Db, DocId, RI0) |
| end, ToRemove), |
| |
| % _all_docs |
| |
| UpdateStatus = case {OldWinner, NewWinner} of |
| {not_found, #{deleted := false}} -> |
| created; |
| {not_found, #{deleted := true}} -> |
| replicate_deleted; |
| {#{deleted := true}, #{deleted := false}} -> |
| recreated; |
| {#{deleted := false}, #{deleted := false}} -> |
| updated; |
| {#{deleted := false}, #{deleted := true}} -> |
| deleted; |
| {#{deleted := true}, #{deleted := true}} -> |
| ignore |
| end, |
| |
| case UpdateStatus of |
| replicate_deleted -> |
| ok; |
| ignore -> |
| ok; |
| deleted -> |
| ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix), |
| ok = erlfdb:clear(Tx, ADKey); |
| _ -> |
| ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix), |
| ADVal = erlfdb_tuple:pack(NewRevId), |
| ok = erlfdb:set(Tx, ADKey, ADVal) |
| end, |
| |
| % _changes |
| |
| if OldWinner == not_found -> ok; true -> |
| OldSeq = maps:get(sequence, OldWinner), |
| OldSeqKey = erlfdb_tuple:pack({?DB_CHANGES, OldSeq}, DbPrefix), |
| erlfdb:clear(Tx, OldSeqKey) |
| end, |
| |
| NewSeqKey = erlfdb_tuple:pack_vs({?DB_CHANGES, WinnerVS}, DbPrefix), |
| NewSeqVal = erlfdb_tuple:pack({DocId, Deleted, NewRevId}), |
| erlfdb:set_versionstamped_key(Tx, NewSeqKey, NewSeqVal), |
| |
| % Bump db version on design doc changes |
| |
| IsDDoc = case Doc#doc.id of |
| <<?DESIGN_DOC_PREFIX, _/binary>> -> true; |
| _ -> false |
| end, |
| |
| if not IsDDoc -> ok; true -> |
| bump_db_version(Db) |
| end, |
| |
| % Update our document counts |
| |
| case UpdateStatus of |
| created -> |
| if not IsDDoc -> ok; true -> |
| incr_stat(Db, <<"doc_design_count">>, 1) |
| end, |
| incr_stat(Db, <<"doc_count">>, 1); |
| recreated -> |
| if not IsDDoc -> ok; true -> |
| incr_stat(Db, <<"doc_design_count">>, 1) |
| end, |
| incr_stat(Db, <<"doc_count">>, 1), |
| incr_stat(Db, <<"doc_del_count">>, -1); |
| replicate_deleted -> |
| incr_stat(Db, <<"doc_del_count">>, 1); |
| ignore -> |
| ok; |
| deleted -> |
| if not IsDDoc -> ok; true -> |
| incr_stat(Db, <<"doc_design_count">>, -1) |
| end, |
| incr_stat(Db, <<"doc_count">>, -1), |
| incr_stat(Db, <<"doc_del_count">>, 1); |
| updated -> |
| ok |
| end, |
| |
| fabric2_db_plugin:after_doc_write(Db, Doc, NewWinner, OldWinner, |
| NewRevId, WinnerVS), |
| |
| % Update database size |
| AddSize = sum_add_rev_sizes([NewWinner | ToUpdate]), |
| RemSize = sum_rem_rev_sizes(ToRemove), |
| incr_stat(Db, <<"sizes">>, <<"external">>, AddSize - RemSize), |
| |
| ok. |
| |
| |
| write_local_doc(#{} = Db0, Doc) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = Db = ensure_current(Db0), |
| |
| Id = Doc#doc.id, |
| |
| {LDocKey, LDocVal, NewSize, Rows} = local_doc_to_fdb(Db, Doc), |
| |
| {WasDeleted, PrevSize} = case erlfdb:wait(erlfdb:get(Tx, LDocKey)) of |
| <<255, RevBin/binary>> -> |
| case erlfdb_tuple:unpack(RevBin) of |
| {?CURR_LDOC_FORMAT, _Rev, Size} -> |
| {false, Size} |
| end; |
| <<_/binary>> -> |
| {false, 0}; |
| not_found -> |
| {true, 0} |
| end, |
| |
| BPrefix = erlfdb_tuple:pack({?DB_LOCAL_DOC_BODIES, Id}, DbPrefix), |
| |
| case Doc#doc.deleted of |
| true -> |
| erlfdb:clear(Tx, LDocKey), |
| erlfdb:clear_range_startswith(Tx, BPrefix); |
| false -> |
| erlfdb:set(Tx, LDocKey, LDocVal), |
| % Make sure to clear the whole range, in case there was a larger |
| % document body there before. |
| erlfdb:clear_range_startswith(Tx, BPrefix), |
| lists:foreach(fun({K, V}) -> |
| erlfdb:set(Tx, K, aegis:encrypt(Db, K, V)) |
| end, Rows) |
| end, |
| |
| case {WasDeleted, Doc#doc.deleted} of |
| {true, false} -> |
| incr_stat(Db, <<"doc_local_count">>, 1); |
| {false, true} -> |
| incr_stat(Db, <<"doc_local_count">>, -1); |
| _ -> |
| ok |
| end, |
| |
| incr_stat(Db, <<"sizes">>, <<"external">>, NewSize - PrevSize), |
| |
| ok. |
| |
| |
| read_attachment(#{} = Db, DocId, AttId) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = ensure_current(Db), |
| |
| AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix), |
| Data = case erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)) of |
| not_found -> |
| throw({not_found, missing}); |
| KVs -> |
| {_, Chunks} = lists:unzip(aegis:decrypt(Db, KVs)), |
| iolist_to_binary(Chunks) |
| end, |
| |
| IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix), |
| case erlfdb:wait(erlfdb:get(Tx, IdKey)) of |
| <<>> -> |
| Data; % Old format, before CURR_ATT_STORAGE_VER = 0 |
| <<_/binary>> = InfoBin -> |
| {?CURR_ATT_STORAGE_VER, Compressed} = erlfdb_tuple:unpack(InfoBin), |
| case Compressed of |
| true -> binary_to_term(Data, [safe]); |
| false -> Data |
| end |
| end. |
| |
| |
| write_attachment(#{} = Db, DocId, Data, Encoding) |
| when is_binary(Data), is_atom(Encoding) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = ensure_current(Db), |
| |
| AttId = fabric2_util:uuid(), |
| |
| {Data1, Compressed} = case Encoding of |
| gzip -> |
| {Data, false}; |
| _ -> |
| Opts = [{minor_version, 1}, {compressed, 6}], |
| CompressedData = term_to_binary(Data, Opts), |
| case size(CompressedData) < Data of |
| true -> {CompressedData, true}; |
| false -> {Data, false} |
| end |
| end, |
| |
| IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix), |
| InfoVal = erlfdb_tuple:pack({?CURR_ATT_STORAGE_VER, Compressed}), |
| ok = erlfdb:set(Tx, IdKey, InfoVal), |
| |
| Chunks = chunkify_binary(Data1), |
| |
| lists:foldl(fun(Chunk, ChunkId) -> |
| AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId, ChunkId}, DbPrefix), |
| ok = erlfdb:set(Tx, AttKey, aegis:encrypt(Db, AttKey, Chunk)), |
| ChunkId + 1 |
| end, 0, Chunks), |
| {ok, AttId}. |
| |
| |
| get_last_change(#{} = Db) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = ensure_current(Db), |
| |
| {Start, End} = erlfdb_tuple:range({?DB_CHANGES}, DbPrefix), |
| Options = [{limit, 1}, {reverse, true}], |
| case erlfdb:get_range(Tx, Start, End, Options) of |
| [] -> |
| vs_to_seq(fabric2_util:seq_zero_vs()); |
| [{K, _V}] -> |
| {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix), |
| vs_to_seq(SeqVS) |
| end. |
| |
| |
| fold_range(TxOrDb, RangePrefix, UserFun, UserAcc, Options) -> |
| {Db, Tx} = case TxOrDb of |
| {tx, TxObj} -> |
| {undefined, TxObj}; |
| #{} = DbObj -> |
| DbObj1 = #{tx := TxObj} = ensure_current(DbObj), |
| {DbObj1, TxObj} |
| end, |
| % FoundationDB treats a limit 0 of as unlimited so we guard against it |
| case fabric2_util:get_value(limit, Options) of 0 -> UserAcc; _ -> |
| FAcc = get_fold_acc(Db, RangePrefix, UserFun, UserAcc, Options), |
| try |
| fold_range(Tx, FAcc) |
| after |
| erase(?PDICT_FOLD_ACC_STATE) |
| end |
| end. |
| |
| |
| fold_range(Tx, FAcc) -> |
| #fold_acc{ |
| start_key = Start, |
| end_key = End, |
| limit = Limit, |
| base_opts = BaseOpts, |
| restart_tx = DoRestart |
| } = FAcc, |
| case DoRestart of false -> ok; true -> |
| ok = erlfdb:set_option(Tx, disallow_writes) |
| end, |
| Opts = [{limit, Limit} | BaseOpts], |
| Callback = fun fold_range_cb/2, |
| try |
| #fold_acc{ |
| user_acc = FinalUserAcc |
| } = erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts), |
| FinalUserAcc |
| catch error:{erlfdb_error, Error} when |
| ?ERLFDB_IS_RETRYABLE(Error) andalso DoRestart -> |
| % Possibly handle cluster_version_changed and future_version as well to |
| % continue iteration instead fallback to transactional and retrying |
| % from the beginning which is bound to fail when streaming data out to a |
| % socket. |
| fold_range(Tx, restart_fold(Tx, FAcc)) |
| end. |
| |
| |
| vs_to_seq(VS) when is_tuple(VS) -> |
| % 51 is the versionstamp type tag |
| <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({VS}), |
| fabric2_util:to_hex(SeqBin). |
| |
| |
| seq_to_vs(Seq) when is_binary(Seq) -> |
| Seq1 = fabric2_util:from_hex(Seq), |
| % 51 is the versionstamp type tag |
| Seq2 = <<51:8, Seq1/binary>>, |
| {VS} = erlfdb_tuple:unpack(Seq2), |
| VS. |
| |
| |
| next_vs({versionstamp, VS, Batch, TxId}) -> |
| {V, B, T} = case TxId < 16#FFFF of |
| true -> |
| {VS, Batch, TxId + 1}; |
| false -> |
| case Batch < 16#FFFF of |
| true -> |
| {VS, Batch + 1, 0}; |
| false -> |
| {VS + 1, 0, 0} |
| end |
| end, |
| {versionstamp, V, B, T}; |
| |
| next_vs({versionstamp, VS, Batch}) -> |
| {V, B} = case Batch < 16#FFFF of |
| true -> |
| {VS, Batch + 1}; |
| false -> |
| {VS + 1, 0} |
| end, |
| {versionstamp, V, B}. |
| |
| |
| new_versionstamp(Tx) -> |
| TxId = erlfdb:get_next_tx_id(Tx), |
| {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}. |
| |
| |
| get_approximate_tx_size(#{} = TxDb) -> |
| require_transaction(TxDb), |
| #{tx := Tx} = TxDb, |
| erlfdb:wait(erlfdb:get_approximate_size(Tx)). |
| |
| |
| chunkify_binary(Data) -> |
| chunkify_binary(Data, binary_chunk_size()). |
| |
| |
| chunkify_binary(Data, Size) -> |
| case Data of |
| <<>> -> |
| []; |
| <<Head:Size/binary, Rest/binary>> -> |
| [Head | chunkify_binary(Rest, Size)]; |
| <<_/binary>> when size(Data) < Size -> |
| [Data] |
| end. |
| |
| |
| debug_cluster() -> |
| debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>). |
| |
| |
| debug_cluster(Start, End) -> |
| transactional(fun(Tx) -> |
| lists:foreach(fun({Key, Val}) -> |
| io:format(standard_error, "~s => ~s~n", [ |
| string:pad(erlfdb_util:repr(Key), 60), |
| erlfdb_util:repr(Val) |
| ]) |
| end, erlfdb:get_range(Tx, Start, End)) |
| end). |
| |
| |
| init_db(Tx, DbName) -> |
| Prefix = get_dir(Tx), |
| Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)), |
| #{ |
| name => DbName, |
| tx => Tx, |
| layer_prefix => Prefix, |
| md_version => Version, |
| |
| security_fun => undefined, |
| db_options => [] |
| }. |
| |
| |
| load_validate_doc_funs(#{} = Db) -> |
| FoldFun = fun |
| ({row, Row}, Acc) -> |
| DDocInfo = #{id => fabric2_util:get_value(id, Row)}, |
| {ok, [DDocInfo | Acc]}; |
| (_, Acc) -> |
| {ok, Acc} |
| end, |
| |
| Options = [ |
| {start_key, <<"_design/">>}, |
| {end_key, <<"_design0">>} |
| ], |
| |
| {ok, Infos1} = fabric2_db:fold_docs(Db, FoldFun, [], Options), |
| |
| Infos2 = lists:map(fun(Info) -> |
| #{ |
| id := DDocId = <<"_design/", _/binary>> |
| } = Info, |
| Info#{ |
| rev_info => get_winning_revs_future(Db, DDocId, 1) |
| } |
| end, Infos1), |
| |
| Infos3 = lists:flatmap(fun(Info) -> |
| #{ |
| id := DDocId, |
| rev_info := RevInfoFuture |
| } = Info, |
| [RevInfo] = get_revs_wait(Db, RevInfoFuture), |
| #{deleted := Deleted} = RevInfo, |
| if Deleted -> []; true -> |
| [Info#{ |
| rev_info := RevInfo, |
| body => get_doc_body_future(Db, DDocId, RevInfo) |
| }] |
| end |
| end, Infos2), |
| |
| VDUs = lists:flatmap(fun(Info) -> |
| #{ |
| id := DDocId, |
| rev_info := RevInfo, |
| body := BodyFuture |
| } = Info, |
| #doc{} = Doc = get_doc_body_wait(Db, DDocId, RevInfo, BodyFuture), |
| case couch_doc:get_validate_doc_fun(Doc) of |
| nil -> []; |
| Fun -> [Fun] |
| end |
| end, Infos3), |
| |
| Db#{ |
| validate_doc_update_funs := VDUs |
| }. |
| |
| |
| bump_metadata_version(Tx) -> |
| % The 14 zero bytes is pulled from the PR for adding the |
| % metadata version key. Not sure why 14 bytes when version |
| % stamps are only 80, but whatever for now. |
| erlfdb:set_versionstamped_value(Tx, ?METADATA_VERSION_KEY, <<0:112>>). |
| |
| |
| check_metadata_version(#{} = Db) -> |
| #{ |
| tx := Tx, |
| md_version := Version |
| } = Db, |
| |
| AlreadyChecked = get(?PDICT_CHECKED_MD_IS_CURRENT), |
| if AlreadyChecked == true -> {current, Db}; true -> |
| case erlfdb:wait(erlfdb:get_ss(Tx, ?METADATA_VERSION_KEY)) of |
| Version -> |
| put(?PDICT_CHECKED_MD_IS_CURRENT, true), |
| % We want to set a read conflict on the db version as we'd want |
| % to conflict with any writes to this particular db. However |
| % during db creation db prefix might not exist yet so we don't |
| % add a read-conflict on it then. |
| case maps:get(db_prefix, Db, not_found) of |
| not_found -> |
| ok; |
| <<_/binary>> = DbPrefix -> |
| DbVerKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix), |
| erlfdb:add_read_conflict_key(Tx, DbVerKey) |
| end, |
| {current, Db}; |
| NewVersion -> |
| {stale, Db#{md_version := NewVersion}} |
| end |
| end. |
| |
| |
| bump_db_version(#{} = Db) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = Db, |
| |
| DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix), |
| DbVersion = fabric2_util:uuid(), |
| ok = erlfdb:set(Tx, DbVersionKey, DbVersion), |
| ok = bump_metadata_version(Tx), |
| {ok, DbVersion}. |
| |
| |
| check_db_version(#{} = Db, CheckDbVersion) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix, |
| db_version := DbVersion |
| } = Db, |
| |
| AlreadyChecked = get(?PDICT_CHECKED_DB_IS_CURRENT), |
| if not CheckDbVersion orelse AlreadyChecked == true -> current; true -> |
| DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix), |
| case erlfdb:wait(erlfdb:get(Tx, DbVersionKey)) of |
| DbVersion -> |
| put(?PDICT_CHECKED_DB_IS_CURRENT, true), |
| current; |
| _NewDBVersion -> |
| stale |
| end |
| end. |
| |
| |
| soft_delete_db(Db) -> |
| #{ |
| name := DbName, |
| tx := Tx, |
| layer_prefix := LayerPrefix, |
| db_prefix := DbPrefix |
| } = ensure_current(Db), |
| |
| DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix), |
| Timestamp = list_to_binary(fabric2_util:iso8601_timestamp()), |
| DeletedDbKeyTuple = {?DELETED_DBS, DbName, Timestamp}, |
| DeletedDbKey = erlfdb_tuple:pack(DeletedDbKeyTuple, LayerPrefix), |
| case erlfdb:wait(erlfdb:get(Tx, DeletedDbKey)) of |
| not_found -> |
| erlfdb:set(Tx, DeletedDbKey, DbPrefix), |
| erlfdb:clear(Tx, DbKey), |
| bump_db_version(Db), |
| ok; |
| _Val -> |
| {deletion_frequency_exceeded, DbName} |
| end. |
| |
| |
| hard_delete_db(Db) -> |
| #{ |
| name := DbName, |
| tx := Tx, |
| layer_prefix := LayerPrefix, |
| db_prefix := DbPrefix |
| } = ensure_current(Db), |
| |
| DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix), |
| |
| erlfdb:clear(Tx, DbKey), |
| erlfdb:clear_range_startswith(Tx, DbPrefix), |
| bump_metadata_version(Tx), |
| ok. |
| |
| |
| write_doc_body(#{} = Db0, #doc{} = Doc) -> |
| #{ |
| tx := Tx |
| } = Db = ensure_current(Db0), |
| |
| Rows = doc_to_fdb(Db, Doc), |
| lists:foreach(fun({Key, Value}) -> |
| ok = erlfdb:set(Tx, Key, aegis:encrypt(Db, Key, Value)) |
| end, Rows). |
| |
| |
| clear_doc_body(_Db, _DocId, not_found) -> |
| % No old body to clear |
| ok; |
| |
| clear_doc_body(#{} = Db, DocId, #{} = RevInfo) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = Db, |
| |
| #{ |
| rev_id := {RevPos, Rev} |
| } = RevInfo, |
| |
| BaseKey = {?DB_DOCS, DocId, RevPos, Rev}, |
| {StartKey, EndKey} = erlfdb_tuple:range(BaseKey, DbPrefix), |
| ok = erlfdb:clear_range(Tx, StartKey, EndKey). |
| |
| |
| cleanup_attachments(Db, DocId, NewDoc, ToRemove) -> |
| #{ |
| tx := Tx, |
| db_prefix := DbPrefix |
| } = Db, |
| |
| RemoveRevs = lists:map(fun(#{rev_id := RevId}) -> RevId end, ToRemove), |
| |
| % Gather all known document revisions |
| {ok, DiskDocs} = fabric2_db:open_doc_revs(Db, DocId, all, []), |
| AllDocs = [{ok, NewDoc} | DiskDocs], |
| |
| % Get referenced attachment ids |
| ActiveIdSet = lists:foldl(fun({ok, Doc}, Acc) -> |
| #doc{ |
| revs = {Pos, [Rev | _]} |
| } = Doc, |
| case lists:member({Pos, Rev}, RemoveRevs) of |
| true -> |
| Acc; |
| false -> |
| lists:foldl(fun(Att, InnerAcc) -> |
| {loc, _Db, _DocId, AttId} = couch_att:fetch(data, Att), |
| sets:add_element(AttId, InnerAcc) |
| end, Acc, Doc#doc.atts) |
| end |
| end, sets:new(), AllDocs), |
| |
| AttPrefix = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId}, DbPrefix), |
| Options = [{streaming_mode, want_all}], |
| Future = erlfdb:get_range_startswith(Tx, AttPrefix, Options), |
| |
| ExistingIdSet = lists:foldl(fun({K, _}, Acc) -> |
| {?DB_ATT_NAMES, DocId, AttId} = erlfdb_tuple:unpack(K, DbPrefix), |
| sets:add_element(AttId, Acc) |
| end, sets:new(), erlfdb:wait(Future)), |
| |
| AttsToRemove = sets:subtract(ExistingIdSet, ActiveIdSet), |
| |
| lists:foreach(fun(AttId) -> |
| IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix), |
| erlfdb:clear(Tx, IdKey), |
| |
| ChunkKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix), |
| erlfdb:clear_range_startswith(Tx, ChunkKey) |
| end, sets:to_list(AttsToRemove)). |
| |
| |
| revinfo_to_fdb(Tx, DbPrefix, DocId, #{winner := true} = RevId) -> |
| #{ |
| deleted := Deleted, |
| rev_id := {RevPos, Rev}, |
| rev_path := RevPath, |
| branch_count := BranchCount, |
| att_hash := AttHash, |
| rev_size := RevSize |
| } = RevId, |
| VS = new_versionstamp(Tx), |
| Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev}, |
| Val = { |
| ?CURR_REV_FORMAT, |
| VS, |
| BranchCount, |
| list_to_tuple(RevPath), |
| AttHash, |
| RevSize |
| }, |
| KBin = erlfdb_tuple:pack(Key, DbPrefix), |
| VBin = erlfdb_tuple:pack_vs(Val), |
| {KBin, VBin, VS}; |
| |
| revinfo_to_fdb(_Tx, DbPrefix, DocId, #{} = RevId) -> |
| #{ |
| deleted := Deleted, |
| rev_id := {RevPos, Rev}, |
| rev_path := RevPath, |
| att_hash := AttHash, |
| rev_size := RevSize |
| } = RevId, |
| Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev}, |
| Val = {?CURR_REV_FORMAT, list_to_tuple(RevPath), AttHash, RevSize}, |
| KBin = erlfdb_tuple:pack(Key, DbPrefix), |
| VBin = erlfdb_tuple:pack(Val), |
| {KBin, VBin, undefined}. |
| |
| |
| fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _, _, _, _} = Val) -> |
| {?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key, |
| {_RevFormat, Sequence, BranchCount, RevPath, AttHash, RevSize} = Val, |
| #{ |
| winner => true, |
| exists => true, |
| deleted => not NotDeleted, |
| rev_id => {RevPos, Rev}, |
| rev_path => tuple_to_list(RevPath), |
| sequence => Sequence, |
| branch_count => BranchCount, |
| att_hash => AttHash, |
| rev_size => RevSize |
| }; |
| |
| fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _, _} = Val) -> |
| {?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key, |
| {_RevFormat, RevPath, AttHash, RevSize} = Val, |
| #{ |
| winner => false, |
| exists => true, |
| deleted => not NotDeleted, |
| rev_id => {RevPos, Rev}, |
| rev_path => tuple_to_list(RevPath), |
| sequence => undefined, |
| branch_count => undefined, |
| att_hash => AttHash, |
| rev_size => RevSize |
| }; |
| |
| fdb_to_revinfo(Key, {0, Seq, BCount, RPath}) -> |
| Val = {1, Seq, BCount, RPath, <<>>}, |
| fdb_to_revinfo(Key, Val); |
| |
| fdb_to_revinfo(Key, {0, RPath}) -> |
| Val = {1, RPath, <<>>}, |
| fdb_to_revinfo(Key, Val); |
| |
| fdb_to_revinfo(Key, {1, Seq, BCount, RPath, AttHash}) -> |
| % Don't forget to change ?CURR_REV_FORMAT to 2 here when it increments |
| Val = {?CURR_REV_FORMAT, Seq, BCount, RPath, AttHash, 0}, |
| fdb_to_revinfo(Key, Val); |
| |
| fdb_to_revinfo(Key, {1, RPath, AttHash}) -> |
| % Don't forget to change ?CURR_REV_FORMAT to 2 here when it increments |
| Val = {?CURR_REV_FORMAT, RPath, AttHash, 0}, |
| fdb_to_revinfo(Key, Val). |
| |
| |
| doc_to_fdb(Db, #doc{} = Doc) -> |
| #{ |
| db_prefix := DbPrefix |
| } = Db, |
| |
| #doc{ |
| id = Id, |
| revs = {Start, [Rev | _]}, |
| body = Body, |
| atts = Atts, |
| deleted = Deleted |
| } = Doc, |
| |
| DiskAtts = lists:map(fun couch_att:to_disk_term/1, Atts), |
| |
| Opts = [{minor_version, 1}, {compressed, 6}], |
| Value = term_to_binary({Body, DiskAtts, Deleted}, Opts), |
| Chunks = chunkify_binary(Value), |
| |
| {Rows, _} = lists:mapfoldl(fun(Chunk, ChunkId) -> |
| Key = erlfdb_tuple:pack({?DB_DOCS, Id, Start, Rev, ChunkId}, DbPrefix), |
| {{Key, Chunk}, ChunkId + 1} |
| end, 0, Chunks), |
| |
| Rows. |
| |
| |
| fdb_to_doc(_Db, _DocId, _Pos, _Path, []) -> |
| {not_found, missing}; |
| |
| fdb_to_doc(Db, DocId, Pos, Path, BinRows) when is_list(BinRows) -> |
| Bin = iolist_to_binary(BinRows), |
| {Body, DiskAtts, Deleted} = binary_to_term(Bin, [safe]), |
| Atts = lists:map(fun(Att) -> |
| couch_att:from_disk_term(Db, DocId, Att) |
| end, DiskAtts), |
| Doc0 = #doc{ |
| id = DocId, |
| revs = {Pos, Path}, |
| body = Body, |
| atts = Atts, |
| deleted = Deleted |
| }, |
| |
| case Db of |
| #{after_doc_read := undefined} -> Doc0; |
| #{after_doc_read := ADR} -> ADR(Doc0, Db) |
| end. |
| |
| |
| local_doc_to_fdb(Db, #doc{} = Doc) -> |
| #{ |
| db_prefix := DbPrefix |
| } = Db, |
| |
| #doc{ |
| id = Id, |
| revs = {0, [Rev]}, |
| body = Body |
| } = Doc, |
| |
| Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id}, DbPrefix), |
| |
| StoreRev = case Rev of |
| _ when is_integer(Rev) -> integer_to_binary(Rev); |
| _ when is_binary(Rev) -> Rev |
| end, |
| |
| BVal = term_to_binary(Body, [{minor_version, 1}, {compressed, 6}]), |
| {Rows, _} = lists:mapfoldl(fun(Chunk, ChunkId) -> |
| K = erlfdb_tuple:pack({?DB_LOCAL_DOC_BODIES, Id, ChunkId}, DbPrefix), |
| {{K, Chunk}, ChunkId + 1} |
| end, 0, chunkify_binary(BVal)), |
| |
| NewSize = fabric2_util:ldoc_size(Doc), |
| RawValue = erlfdb_tuple:pack({?CURR_LDOC_FORMAT, StoreRev, NewSize}), |
| |
| % Prefix our tuple encoding to make upgrades easier |
| Value = <<255, RawValue/binary>>, |
| |
| {Key, Value, NewSize, Rows}. |
| |
| |
| fdb_to_local_doc(_Db, _DocId, not_found, []) -> |
| {not_found, missing}; |
| |
| fdb_to_local_doc(_Db, DocId, <<131, _/binary>> = Val, []) -> |
| % This is an upgrade clause for the old encoding. We allow reading the old |
| % value and will perform an upgrade of the storage format on an update. |
| {Rev, Body} = binary_to_term(Val, [safe]), |
| #doc{ |
| id = DocId, |
| revs = {0, [Rev]}, |
| deleted = false, |
| body = Body |
| }; |
| |
| fdb_to_local_doc(_Db, DocId, <<255, RevBin/binary>>, Rows) when is_list(Rows) -> |
| Rev = case erlfdb_tuple:unpack(RevBin) of |
| {?CURR_LDOC_FORMAT, Rev0, _Size} -> Rev0 |
| end, |
| |
| BodyBin = iolist_to_binary(Rows), |
| Body = binary_to_term(BodyBin, [safe]), |
| |
| #doc{ |
| id = DocId, |
| revs = {0, [Rev]}, |
| deleted = false, |
| body = Body |
| }; |
| |
| fdb_to_local_doc(Db, DocId, RawRev, Rows) -> |
| BaseRev = erlfdb_tuple:pack({?CURR_LDOC_FORMAT, RawRev, 0}), |
| Rev = <<255, BaseRev/binary>>, |
| fdb_to_local_doc(Db, DocId, Rev, Rows). |
| |
| |
| sum_add_rev_sizes(RevInfos) -> |
| lists:foldl(fun(RI, Acc) -> |
| #{ |
| exists := Exists, |
| rev_size := Size |
| } = RI, |
| case Exists of |
| true -> Acc; |
| false -> Size + Acc |
| end |
| end, 0, RevInfos). |
| |
| |
| sum_rem_rev_sizes(RevInfos) -> |
| lists:foldl(fun(RI, Acc) -> |
| #{ |
| exists := true, |
| rev_size := Size |
| } = RI, |
| Size + Acc |
| end, 0, RevInfos). |
| |
| |
| get_fold_acc(Db, RangePrefix, UserCallback, UserAcc, Options) |
| when is_map(Db) orelse Db =:= undefined -> |
| |
| Reverse = case fabric2_util:get_value(dir, Options) of |
| rev -> true; |
| _ -> false |
| end, |
| |
| StartKey0 = fabric2_util:get_value(start_key, Options), |
| EndKeyGt = fabric2_util:get_value(end_key_gt, Options), |
| EndKey0 = fabric2_util:get_value(end_key, Options, EndKeyGt), |
| InclusiveEnd = EndKeyGt == undefined, |
| WrapKeys = fabric2_util:get_value(wrap_keys, Options) /= false, |
| |
| % CouchDB swaps the key meanings based on the direction |
| % of the fold. FoundationDB does not so we have to |
| % swap back here. |
| {StartKey1, EndKey1} = case Reverse of |
| false -> {StartKey0, EndKey0}; |
| true -> {EndKey0, StartKey0} |
| end, |
| |
| % Set the maximum bounds for the start and endkey |
| StartKey2 = case StartKey1 of |
| undefined -> |
| <<RangePrefix/binary, 16#00>>; |
| SK2 when not WrapKeys -> |
| erlfdb_tuple:pack(SK2, RangePrefix); |
| SK2 -> |
| erlfdb_tuple:pack({SK2}, RangePrefix) |
| end, |
| |
| EndKey2 = case EndKey1 of |
| undefined -> |
| <<RangePrefix/binary, 16#FF>>; |
| EK2 when Reverse andalso not WrapKeys -> |
| PackedEK = erlfdb_tuple:pack(EK2, RangePrefix), |
| <<PackedEK/binary, 16#FF>>; |
| EK2 when Reverse -> |
| PackedEK = erlfdb_tuple:pack({EK2}, RangePrefix), |
| <<PackedEK/binary, 16#FF>>; |
| EK2 when not WrapKeys -> |
| erlfdb_tuple:pack(EK2, RangePrefix); |
| EK2 -> |
| erlfdb_tuple:pack({EK2}, RangePrefix) |
| end, |
| |
| % FoundationDB ranges are applied as SK <= key < EK |
| % By default, CouchDB is SK <= key <= EK with the |
| % optional inclusive_end=false option changing that |
| % to SK <= key < EK. Also, remember that CouchDB |
| % swaps the meaning of SK and EK based on direction. |
| % |
| % Thus we have this wonderful bit of logic to account |
| % for all of those combinations. |
| |
| StartKey3 = case {Reverse, InclusiveEnd} of |
| {true, false} -> |
| erlfdb_key:first_greater_than(StartKey2); |
| _ -> |
| StartKey2 |
| end, |
| |
| EndKey3 = case {Reverse, InclusiveEnd} of |
| {false, true} when EndKey0 /= undefined -> |
| erlfdb_key:first_greater_than(EndKey2); |
| {true, _} -> |
| erlfdb_key:first_greater_than(EndKey2); |
| _ -> |
| EndKey2 |
| end, |
| |
| Skip = case fabric2_util:get_value(skip, Options) of |
| S when is_integer(S), S >= 0 -> S; |
| _ -> 0 |
| end, |
| |
| Limit = case fabric2_util:get_value(limit, Options) of |
| L when is_integer(L), L >= 0 -> L + Skip; |
| undefined -> 0 |
| end, |
| |
| TargetBytes = case fabric2_util:get_value(target_bytes, Options) of |
| T when is_integer(T), T >= 0 -> [{target_bytes, T}]; |
| undefined -> [] |
| end, |
| |
| StreamingMode = case fabric2_util:get_value(streaming_mode, Options) of |
| undefined -> []; |
| Name when is_atom(Name) -> [{streaming_mode, Name}] |
| end, |
| |
| Snapshot = case fabric2_util:get_value(snapshot, Options) of |
| undefined -> []; |
| B when is_boolean(B) -> [{snapshot, B}] |
| end, |
| |
| BaseOpts = [{reverse, Reverse}] |
| ++ TargetBytes |
| ++ StreamingMode |
| ++ Snapshot, |
| |
| RestartTx = fabric2_util:get_value(restart_tx, Options, false), |
| |
| #fold_acc{ |
| db = Db, |
| start_key = StartKey3, |
| end_key = EndKey3, |
| skip = Skip, |
| limit = Limit, |
| retries = 0, |
| base_opts = BaseOpts, |
| restart_tx = RestartTx, |
| user_fun = UserCallback, |
| user_acc = UserAcc |
| }. |
| |
| |
| fold_range_cb({K, V}, #fold_acc{} = Acc) -> |
| #fold_acc{ |
| skip = Skip, |
| limit = Limit, |
| user_fun = UserFun, |
| user_acc = UserAcc, |
| base_opts = Opts |
| } = Acc, |
| Acc1 = case Skip =:= 0 of |
| true -> |
| UserAcc1 = UserFun({K, V}, UserAcc), |
| Acc#fold_acc{limit = max(0, Limit - 1), user_acc = UserAcc1}; |
| false -> |
| Acc#fold_acc{skip = Skip - 1, limit = Limit - 1} |
| end, |
| Acc2 = case fabric2_util:get_value(reverse, Opts, false) of |
| true -> Acc1#fold_acc{end_key = erlfdb_key:last_less_or_equal(K)}; |
| false -> Acc1#fold_acc{start_key = erlfdb_key:first_greater_than(K)} |
| end, |
| put(?PDICT_FOLD_ACC_STATE, Acc2), |
| Acc2. |
| |
| |
| restart_fold(Tx, #fold_acc{} = Acc) -> |
| erase(?PDICT_CHECKED_MD_IS_CURRENT), |
| |
| ok = erlfdb:reset(Tx), |
| |
| % During buggify test runs MaxRetries would be -1 |
| MaxRetries = fabric2_server:get_retry_limit(), |
| case {erase(?PDICT_FOLD_ACC_STATE), Acc#fold_acc.retries} of |
| {#fold_acc{db = Db} = Acc1, _} -> |
| Acc1#fold_acc{db = check_db_instance(Db), retries = 0}; |
| {undefined, Retries} when Retries < MaxRetries orelse |
| MaxRetries =:= -1 -> |
| Db = check_db_instance(Acc#fold_acc.db), |
| Acc#fold_acc{db = Db, retries = Retries + 1}; |
| {undefined, _} -> |
| error(fold_range_not_progressing) |
| end. |
| |
| |
| get_db_handle() -> |
| case get(?PDICT_DB_KEY) of |
| undefined -> |
| {ok, Db} = application:get_env(fabric, db), |
| put(?PDICT_DB_KEY, Db), |
| Db; |
| Db -> |
| Db |
| end. |
| |
| |
| require_transaction(#{tx := {erlfdb_snapshot, _}} = _Db) -> |
| ok; |
| require_transaction(#{tx := {erlfdb_transaction, _}} = _Db) -> |
| ok; |
| require_transaction(#{} = _Db) -> |
| erlang:error(transaction_required). |
| |
| |
| ensure_current(Db) -> |
| ensure_current(Db, true). |
| |
| |
| ensure_current(#{} = Db0, CheckDbVersion) -> |
| require_transaction(Db0), |
| Db3 = case check_metadata_version(Db0) of |
| {current, Db1} -> |
| Db1; |
| {stale, Db1} -> |
| case check_db_version(Db1, CheckDbVersion) of |
| current -> |
| % If db version is current, update cache with the latest |
| % metadata so other requests can immediately see the |
| % refreshed db handle. |
| Now = erlang:monotonic_time(millisecond), |
| Db2 = Db1#{check_current_ts := Now}, |
| fabric2_server:maybe_update(Db2), |
| Db2; |
| stale -> |
| fabric2_server:maybe_remove(Db1), |
| throw({?MODULE, reopen}) |
| end |
| end, |
| case maps:get(security_fun, Db3) of |
| SecurityFun when is_function(SecurityFun, 2) -> |
| #{security_doc := SecDoc} = Db3, |
| ok = SecurityFun(Db3, SecDoc), |
| Db3#{security_fun := undefined}; |
| undefined -> |
| Db3 |
| end. |
| |
| |
| check_db_instance(undefined) -> |
| undefined; |
| |
| check_db_instance(#{} = Db) -> |
| require_transaction(Db), |
| case check_metadata_version(Db) of |
| {current, Db1} -> |
| Db1; |
| {stale, Db1} -> |
| #{ |
| tx := Tx, |
| uuid := UUID, |
| db_prefix := DbPrefix |
| } = Db1, |
| UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix), |
| case erlfdb:wait(erlfdb:get(Tx, UUIDKey)) of |
| UUID -> Db1; |
| _ -> error(database_does_not_exist) |
| end |
| end. |
| |
| |
| is_transaction_applied(Tx) -> |
| is_commit_unknown_result() |
| andalso has_transaction_id() |
| andalso transaction_id_exists(Tx). |
| |
| |
| get_previous_transaction_result() -> |
| get(?PDICT_TX_RES_KEY). |
| |
| |
| execute_transaction(Tx, Fun, LayerPrefix) -> |
| put(?PDICT_CHECKED_MD_IS_CURRENT, false), |
| put(?PDICT_CHECKED_DB_IS_CURRENT, false), |
| Result = Fun(Tx), |
| case erlfdb:is_read_only(Tx) of |
| true -> |
| ok; |
| false -> |
| erlfdb:set(Tx, get_transaction_id(Tx, LayerPrefix), <<>>), |
| put(?PDICT_TX_RES_KEY, Result) |
| end, |
| Result. |
| |
| |
| clear_transaction() -> |
| fabric2_txids:remove(get(?PDICT_TX_ID_KEY)), |
| erase(?PDICT_CHECKED_DB_IS_CURRENT), |
| erase(?PDICT_CHECKED_MD_IS_CURRENT), |
| erase(?PDICT_TX_ID_KEY), |
| erase(?PDICT_TX_RES_KEY). |
| |
| |
| is_commit_unknown_result() -> |
| erlfdb:get_last_error() == ?ERLFDB_COMMIT_UNKNOWN_RESULT. |
| |
| |
| has_transaction_id() -> |
| is_binary(get(?PDICT_TX_ID_KEY)). |
| |
| |
| transaction_id_exists(Tx) -> |
| erlfdb:wait(erlfdb:get(Tx, get(?PDICT_TX_ID_KEY))) == <<>>. |
| |
| |
| get_transaction_id(Tx, LayerPrefix) -> |
| case get(?PDICT_TX_ID_KEY) of |
| undefined -> |
| TxId = fabric2_txids:create(Tx, LayerPrefix), |
| put(?PDICT_TX_ID_KEY, TxId), |
| TxId; |
| TxId when is_binary(TxId) -> |
| TxId |
| end. |
| |
| |
| with_span(Operation, ExtraTags, Fun) -> |
| case ctrace:has_span() of |
| true -> |
| Tags = maps:merge(#{ |
| 'span.kind' => <<"client">>, |
| component => <<"couchdb.fabric">>, |
| 'db.instance' => fabric2_server:fdb_cluster(), |
| 'db.namespace' => fabric2_server:fdb_directory(), |
| 'db.type' => <<"fdb">>, |
| nonce => get(nonce), |
| pid => self() |
| }, ExtraTags), |
| ctrace:with_span(Operation, Tags, Fun); |
| false -> |
| Fun() |
| end. |
| |
| |
| get_info_wait_int(#info_future{} = InfoFuture) -> |
| #info_future{ |
| db_prefix = DbPrefix, |
| changes_future = ChangesFuture, |
| uuid_future = UUIDFuture, |
| meta_future = MetaFuture |
| } = InfoFuture, |
| |
| RawSeq = case erlfdb:wait(ChangesFuture) of |
| [] -> |
| vs_to_seq(fabric2_util:seq_zero_vs()); |
| [{SeqKey, _}] -> |
| {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(SeqKey, DbPrefix), |
| vs_to_seq(SeqVS) |
| end, |
| CProp = {update_seq, RawSeq}, |
| |
| UUIDProp = {uuid, erlfdb:wait(UUIDFuture)}, |
| |
| MProps = lists:foldl(fun({K, V}, Acc) -> |
| case erlfdb_tuple:unpack(K, DbPrefix) of |
| {?DB_STATS, <<"doc_count">>} -> |
| [{doc_count, ?bin2uint(V)} | Acc]; |
| {?DB_STATS, <<"doc_del_count">>} -> |
| [{doc_del_count, ?bin2uint(V)} | Acc]; |
| {?DB_STATS, <<"sizes">>, Name} -> |
| Val = ?bin2uint(V), |
| {_, {Sizes}} = lists:keyfind(sizes, 1, Acc), |
| NewSizes = lists:keystore(Name, 1, Sizes, {Name, Val}), |
| lists:keystore(sizes, 1, Acc, {sizes, {NewSizes}}); |
| {?DB_STATS, _} -> |
| Acc |
| end |
| end, [{sizes, {[]}}], erlfdb:wait(MetaFuture)), |
| |
| [CProp, UUIDProp | MProps]. |
| |
| |
| binary_chunk_size() -> |
| config:get_integer( |
| "fabric", "binary_chunk_size", ?DEFAULT_BINARY_CHUNK_SIZE). |
| |
| |
| -ifdef(TEST). |
| -include_lib("eunit/include/eunit.hrl"). |
| |
| fdb_to_revinfo_version_compatibility_test() -> |
| DocId = <<"doc_id">>, |
| FirstRevFormat = 0, |
| RevPos = 1, |
| Rev = <<60,84,174,140,210,120,192,18,100,148,9,181,129,165,248,92>>, |
| RevPath = {}, |
| NotDeleted = true, |
| Sequence = {versionstamp, 10873034897377, 0, 0}, |
| BranchCount = 1, |
| |
| KeyWinner = {?DB_REVS, DocId, NotDeleted, RevPos, Rev}, |
| ValWinner = {FirstRevFormat, Sequence, BranchCount, RevPath}, |
| ExpectedWinner = expected( |
| true, BranchCount, NotDeleted, RevPos, Rev, RevPath, Sequence), |
| ?assertEqual(ExpectedWinner, fdb_to_revinfo(KeyWinner, ValWinner)), |
| |
| KeyLoser = {?DB_REVS, DocId, NotDeleted, RevPos, Rev}, |
| ValLoser = {FirstRevFormat, RevPath}, |
| ExpectedLoser = expected( |
| false, undefined, NotDeleted, RevPos, Rev, RevPath, undefined), |
| ?assertEqual(ExpectedLoser, fdb_to_revinfo(KeyLoser, ValLoser)), |
| ok. |
| |
| |
| expected(Winner, BranchCount, NotDeleted, RevPos, Rev, RevPath, Sequence) -> |
| #{ |
| att_hash => <<>>, |
| branch_count => BranchCount, |
| deleted => not NotDeleted, |
| exists => true, |
| rev_id => {RevPos, Rev}, |
| rev_path => tuple_to_list(RevPath), |
| rev_size => 0, |
| sequence => Sequence, |
| winner => Winner |
| }. |
| |
| |
| -endif. |