blob: d2ddb1fdad9997d06b432b890bac5689797534f8 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(erlfdb).
-compile({no_auto_import, [get/1]}).
-export([
open/0,
open/1,
create_transaction/1,
transactional/2,
snapshot/1,
% Db/Tx configuration
set_option/2,
set_option/3,
% Lifecycle Management
commit/1,
reset/1,
cancel/1,
cancel/2,
% Future Specific functions
is_ready/1,
get/1,
get_error/1,
block_until_ready/1,
wait/1,
wait/2,
wait_for_any/1,
wait_for_any/2,
wait_for_all/1,
wait_for_all/2,
% Data retrieval
get/2,
get_ss/2,
get_key/2,
get_key_ss/2,
get_range/3,
get_range/4,
get_range_startswith/2,
get_range_startswith/3,
fold_range/5,
fold_range/6,
fold_range_future/4,
fold_range_wait/4,
% Data modifications
set/3,
clear/2,
clear_range/3,
clear_range_startswith/2,
% Atomic operations
add/3,
bit_and/3,
bit_or/3,
bit_xor/3,
min/3,
max/3,
byte_min/3,
byte_max/3,
set_versionstamped_key/3,
set_versionstamped_value/3,
atomic_op/4,
% Watches
watch/2,
get_and_watch/2,
set_and_watch/3,
clear_and_watch/2,
% Conflict ranges
add_read_conflict_key/2,
add_read_conflict_range/3,
add_write_conflict_key/2,
add_write_conflict_range/3,
add_conflict_range/4,
% Transaction versioning
set_read_version/2,
get_read_version/1,
get_committed_version/1,
get_versionstamp/1,
% Transaction size info
get_approximate_size/1,
% Transaction status
get_next_tx_id/1,
is_read_only/1,
has_watches/1,
get_writes_allowed/1,
% Locality
get_addresses_for_key/2,
% Get conflict information
get_conflicting_keys/1,
% Misc
on_error/2,
error_predicate/2,
get_last_error/0,
get_error_string/1
]).
-define(IS_FUTURE, {erlfdb_future, _, _}).
-define(IS_FOLD_FUTURE, {fold_info, _, _}).
-define(IS_DB, {erlfdb_database, _}).
-define(IS_TX, {erlfdb_transaction, _}).
-define(IS_SS, {erlfdb_snapshot, _}).
-define(GET_TX(SS), element(2, SS)).
-define(ERLFDB_ERROR, '$erlfdb_error').
-record(fold_st, {
start_key,
end_key,
limit,
target_bytes,
streaming_mode,
iteration,
snapshot,
reverse
}).
open() ->
open(<<>>).
open(ClusterFile) ->
erlfdb_nif:create_database(ClusterFile).
create_transaction(?IS_DB = Db) ->
erlfdb_nif:database_create_transaction(Db).
transactional(?IS_DB = Db, UserFun) when is_function(UserFun, 1) ->
clear_erlfdb_error(),
Tx = create_transaction(Db),
do_transaction(Tx, UserFun);
transactional(?IS_TX = Tx, UserFun) when is_function(UserFun, 1) ->
UserFun(Tx);
transactional(?IS_SS = SS, UserFun) when is_function(UserFun, 1) ->
UserFun(SS).
snapshot(?IS_TX = Tx) ->
{erlfdb_snapshot, Tx};
snapshot(?IS_SS = SS) ->
SS.
set_option(DbOrTx, Option) ->
set_option(DbOrTx, Option, <<>>).
set_option(?IS_DB = Db, DbOption, Value) ->
erlfdb_nif:database_set_option(Db, DbOption, Value);
set_option(?IS_TX = Tx, TxOption, Value) ->
erlfdb_nif:transaction_set_option(Tx, TxOption, Value).
commit(?IS_TX = Tx) ->
erlfdb_nif:transaction_commit(Tx).
reset(?IS_TX = Tx) ->
ok = erlfdb_nif:transaction_reset(Tx).
cancel(?IS_FOLD_FUTURE = FoldInfo) ->
cancel(FoldInfo, []);
cancel(?IS_FUTURE = Future) ->
cancel(Future, []);
cancel(?IS_TX = Tx) ->
ok = erlfdb_nif:transaction_cancel(Tx).
cancel(?IS_FOLD_FUTURE = FoldInfo, Options) ->
{fold_info, _St, Future} = FoldInfo,
cancel(Future, Options);
cancel(?IS_FUTURE = Future, Options) ->
ok = erlfdb_nif:future_cancel(Future),
case erlfdb_util:get(Options, flush, false) of
true -> flush_future_message(Future);
false -> ok
end.
is_ready(?IS_FUTURE = Future) ->
erlfdb_nif:future_is_ready(Future).
get_error(?IS_FUTURE = Future) ->
erlfdb_nif:future_get_error(Future).
get(?IS_FUTURE = Future) ->
erlfdb_nif:future_get(Future).
block_until_ready(?IS_FUTURE = Future) ->
{erlfdb_future, MsgRef, _FRef} = Future,
receive
{MsgRef, ready} -> ok
end.
wait(?IS_FUTURE = Future) ->
wait(Future, []);
wait(Ready) ->
Ready.
wait(?IS_FUTURE = Future, Options) ->
case is_ready(Future) of
true ->
Result = get(Future),
% Flush ready message if already sent
flush_future_message(Future),
Result;
false ->
Timeout = erlfdb_util:get(Options, timeout, 5000),
{erlfdb_future, MsgRef, _Res} = Future,
receive
{MsgRef, ready} -> get(Future)
after Timeout ->
erlang:error({timeout, Future})
end
end;
wait(Ready, _) ->
Ready.
wait_for_any(Futures) ->
wait_for_any(Futures, []).
wait_for_any(Futures, Options) ->
wait_for_any(Futures, Options, []).
wait_for_any(Futures, Options, ResendQ) ->
Timeout = erlfdb_util:get(Options, timeout, 5000),
receive
{MsgRef, ready} = Msg ->
case lists:keyfind(MsgRef, 2, Futures) of
?IS_FUTURE = Future ->
lists:foreach(fun(M) ->
self() ! M
end, ResendQ),
Future;
_ ->
wait_for_any(Futures, Options, [Msg | ResendQ])
end
after Timeout ->
lists:foreach(fun(M) ->
self() ! M
end, ResendQ),
erlang:error({timeout, Futures})
end.
wait_for_all(Futures) ->
wait_for_all(Futures, []).
wait_for_all(Futures, Options) ->
% Same as wait for all. We might want to
% handle timeouts here so we have a single
% timeout for all future waiting.
lists:map(fun(Future) ->
wait(Future, Options)
end, Futures).
get(?IS_DB = Db, Key) ->
transactional(Db, fun(Tx) ->
wait(get(Tx, Key))
end);
get(?IS_TX = Tx, Key) ->
erlfdb_nif:transaction_get(Tx, Key, false);
get(?IS_SS = SS, Key) ->
get_ss(?GET_TX(SS), Key).
get_ss(?IS_TX = Tx, Key) ->
erlfdb_nif:transaction_get(Tx, Key, true);
get_ss(?IS_SS = SS, Key) ->
get_ss(?GET_TX(SS), Key).
get_key(?IS_DB = Db, Key) ->
transactional(Db, fun(Tx) ->
wait(get_key(Tx, Key))
end);
get_key(?IS_TX = Tx, Key) ->
erlfdb_nif:transaction_get_key(Tx, Key, false);
get_key(?IS_SS = SS, Key) ->
get_key_ss(?GET_TX(SS), Key).
get_key_ss(?IS_TX = Tx, Key) ->
erlfdb_nif:transaction_get_key(Tx, Key, true).
get_range(DbOrTx, StartKey, EndKey) ->
get_range(DbOrTx, StartKey, EndKey, []).
get_range(?IS_DB = Db, StartKey, EndKey, Options) ->
transactional(Db, fun(Tx) ->
get_range(Tx, StartKey, EndKey, Options)
end);
get_range(?IS_TX = Tx, StartKey, EndKey, Options) ->
Fun = fun(Rows, Acc) -> [Rows | Acc] end,
Chunks = fold_range_int(Tx, StartKey, EndKey, Fun, [], Options),
lists:flatten(lists:reverse(Chunks));
get_range(?IS_SS = SS, StartKey, EndKey, Options) ->
get_range(?GET_TX(SS), StartKey, EndKey, [{snapshot, true} | Options]).
get_range_startswith(DbOrTx, Prefix) ->
get_range_startswith(DbOrTx, Prefix, []).
get_range_startswith(DbOrTx, Prefix, Options) ->
StartKey = Prefix,
EndKey = erlfdb_key:strinc(Prefix),
get_range(DbOrTx, StartKey, EndKey, Options).
fold_range(DbOrTx, StartKey, EndKey, Fun, Acc) ->
fold_range(DbOrTx, StartKey, EndKey, Fun, Acc, []).
fold_range(?IS_DB = Db, StartKey, EndKey, Fun, Acc, Options) ->
transactional(Db, fun(Tx) ->
fold_range(Tx, StartKey, EndKey, Fun, Acc, Options)
end);
fold_range(?IS_TX = Tx, StartKey, EndKey, Fun, Acc, Options) ->
fold_range_int(Tx, StartKey, EndKey, fun(Rows, InnerAcc) ->
lists:foldl(Fun, InnerAcc, Rows)
end, Acc, Options);
fold_range(?IS_SS = SS, StartKey, EndKey, Fun, Acc, Options) ->
SSOptions = [{snapshot, true} | Options],
fold_range(?GET_TX(SS), StartKey, EndKey, Fun, Acc, SSOptions).
fold_range_future(?IS_TX = Tx, StartKey, EndKey, Options) ->
St = options_to_fold_st(StartKey, EndKey, Options),
fold_range_future_int(Tx, St);
fold_range_future(?IS_SS = SS, StartKey, EndKey, Options) ->
SSOptions = [{snapshot, true} | Options],
fold_range_future(?GET_TX(SS), StartKey, EndKey, SSOptions).
fold_range_wait(?IS_TX = Tx, ?IS_FOLD_FUTURE = FI, Fun, Acc) ->
fold_range_int(Tx, FI, fun(Rows, InnerAcc) ->
lists:foldl(Fun, InnerAcc, Rows)
end, Acc).
set(?IS_DB = Db, Key, Value) ->
transactional(Db, fun(Tx) ->
set(Tx, Key, Value)
end);
set(?IS_TX = Tx, Key, Value) ->
erlfdb_nif:transaction_set(Tx, Key, Value);
set(?IS_SS = SS, Key, Value) ->
set(?GET_TX(SS), Key, Value).
clear(?IS_DB = Db, Key) ->
transactional(Db, fun(Tx) ->
clear(Tx, Key)
end);
clear(?IS_TX = Tx, Key) ->
erlfdb_nif:transaction_clear(Tx, Key);
clear(?IS_SS = SS, Key) ->
clear(?GET_TX(SS), Key).
clear_range(?IS_DB = Db, StartKey, EndKey) ->
transactional(Db, fun(Tx) ->
clear_range(Tx, StartKey, EndKey)
end);
clear_range(?IS_TX = Tx, StartKey, EndKey) ->
erlfdb_nif:transaction_clear_range(Tx, StartKey, EndKey);
clear_range(?IS_SS = SS, StartKey, EndKey) ->
clear_range(?GET_TX(SS), StartKey, EndKey).
clear_range_startswith(?IS_DB = Db, Prefix) ->
transactional(Db, fun(Tx) ->
clear_range_startswith(Tx, Prefix)
end);
clear_range_startswith(?IS_TX = Tx, Prefix) ->
EndKey = erlfdb_key:strinc(Prefix),
erlfdb_nif:transaction_clear_range(Tx, Prefix, EndKey);
clear_range_startswith(?IS_SS = SS, Prefix) ->
clear_range_startswith(?GET_TX(SS), Prefix).
add(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, add).
bit_and(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, bit_and).
bit_or(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, bit_or).
bit_xor(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, bit_xor).
min(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, min).
max(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, max).
byte_min(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, byte_min).
byte_max(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, byte_max).
set_versionstamped_key(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, set_versionstamped_key).
set_versionstamped_value(DbOrTx, Key, Param) ->
atomic_op(DbOrTx, Key, Param, set_versionstamped_value).
atomic_op(?IS_DB = Db, Key, Param, Op) ->
transactional(Db, fun(Tx) ->
atomic_op(Tx, Key, Param, Op)
end);
atomic_op(?IS_TX = Tx, Key, Param, Op) ->
erlfdb_nif:transaction_atomic_op(Tx, Key, Param, Op);
atomic_op(?IS_SS = SS, Key, Param, Op) ->
atomic_op(?GET_TX(SS), Key, Param, Op).
watch(?IS_DB = Db, Key) ->
transactional(Db, fun(Tx) ->
watch(Tx, Key)
end);
watch(?IS_TX = Tx, Key) ->
erlfdb_nif:transaction_watch(Tx, Key);
watch(?IS_SS = SS, Key) ->
watch(?GET_TX(SS), Key).
get_and_watch(?IS_DB = Db, Key) ->
transactional(Db, fun(Tx) ->
KeyFuture = get(Tx, Key),
WatchFuture = watch(Tx, Key),
{wait(KeyFuture), WatchFuture}
end).
set_and_watch(?IS_DB = Db, Key, Value) ->
transactional(Db, fun(Tx) ->
set(Tx, Key, Value),
watch(Tx, Key)
end).
clear_and_watch(?IS_DB = Db, Key) ->
transactional(Db, fun(Tx) ->
clear(Tx, Key),
watch(Tx, Key)
end).
add_read_conflict_key(TxObj, Key) ->
add_read_conflict_range(TxObj, Key, <<Key/binary, 16#00>>).
add_read_conflict_range(TxObj, StartKey, EndKey) ->
add_conflict_range(TxObj, StartKey, EndKey, read).
add_write_conflict_key(TxObj, Key) ->
add_write_conflict_range(TxObj, Key, <<Key/binary, 16#00>>).
add_write_conflict_range(TxObj, StartKey, EndKey) ->
add_conflict_range(TxObj, StartKey, EndKey, write).
add_conflict_range(?IS_TX = Tx, StartKey, EndKey, Type) ->
erlfdb_nif:transaction_add_conflict_range(Tx, StartKey, EndKey, Type);
add_conflict_range(?IS_SS = SS, StartKey, EndKey, Type) ->
add_conflict_range(?GET_TX(SS), StartKey, EndKey, Type).
set_read_version(?IS_TX = Tx, Version) ->
erlfdb_nif:transaction_set_read_version(Tx, Version);
set_read_version(?IS_SS = SS, Version) ->
set_read_version(?GET_TX(SS), Version).
get_read_version(?IS_TX = Tx) ->
erlfdb_nif:transaction_get_read_version(Tx);
get_read_version(?IS_SS = SS) ->
get_read_version(?GET_TX(SS)).
get_committed_version(?IS_TX = Tx) ->
erlfdb_nif:transaction_get_committed_version(Tx);
get_committed_version(?IS_SS = SS) ->
get_committed_version(?GET_TX(SS)).
get_versionstamp(?IS_TX = Tx) ->
erlfdb_nif:transaction_get_versionstamp(Tx);
get_versionstamp(?IS_SS = SS) ->
get_versionstamp(?GET_TX(SS)).
get_approximate_size(?IS_TX = Tx) ->
erlfdb_nif:transaction_get_approximate_size(Tx);
get_approximate_size(?IS_SS = SS) ->
get_approximate_size(?GET_TX(SS)).
get_next_tx_id(?IS_TX = Tx) ->
erlfdb_nif:transaction_get_next_tx_id(Tx);
get_next_tx_id(?IS_SS = SS) ->
get_next_tx_id(?GET_TX(SS)).
is_read_only(?IS_TX = Tx) ->
erlfdb_nif:transaction_is_read_only(Tx);
is_read_only(?IS_SS = SS) ->
is_read_only(?GET_TX(SS)).
has_watches(?IS_TX = Tx) ->
erlfdb_nif:transaction_has_watches(Tx);
has_watches(?IS_SS = SS) ->
has_watches(?GET_TX(SS)).
get_writes_allowed(?IS_TX = Tx) ->
erlfdb_nif:transaction_get_writes_allowed(Tx);
get_writes_allowed(?IS_SS = SS) ->
get_writes_allowed(?GET_TX(SS)).
get_addresses_for_key(?IS_DB = Db, Key) ->
transactional(Db, fun(Tx) ->
wait(get_addresses_for_key(Tx, Key))
end);
get_addresses_for_key(?IS_TX = Tx, Key) ->
erlfdb_nif:transaction_get_addresses_for_key(Tx, Key);
get_addresses_for_key(?IS_SS = SS, Key) ->
get_addresses_for_key(?GET_TX(SS), Key).
get_conflicting_keys(?IS_TX = Tx) ->
StartKey = <<16#FF, 16#FF, "/transaction/conflicting_keys/">>,
EndKey = <<16#FF, 16#FF, "/transaction/conflicting_keys/", 16#FF>>,
get_range(Tx, StartKey, EndKey).
on_error(?IS_TX = Tx, {erlfdb_error, ErrorCode}) ->
on_error(Tx, ErrorCode);
on_error(?IS_TX = Tx, ErrorCode) ->
erlfdb_nif:transaction_on_error(Tx, ErrorCode);
on_error(?IS_SS = SS, Error) ->
on_error(?GET_TX(SS), Error).
error_predicate(Predicate, {erlfdb_error, ErrorCode}) ->
error_predicate(Predicate, ErrorCode);
error_predicate(Predicate, ErrorCode) ->
erlfdb_nif:error_predicate(Predicate, ErrorCode).
get_last_error() ->
erlang:get(?ERLFDB_ERROR).
get_error_string(ErrorCode) when is_integer(ErrorCode) ->
erlfdb_nif:get_error(ErrorCode).
clear_erlfdb_error() ->
put(?ERLFDB_ERROR, undefined).
do_transaction(?IS_TX = Tx, UserFun) ->
try
Ret = UserFun(Tx),
case is_read_only(Tx) andalso not has_watches(Tx) of
true -> ok;
false -> wait(commit(Tx), [{timeout, infinity}])
end,
Ret
catch error:{erlfdb_error, Code} ->
put(?ERLFDB_ERROR, Code),
wait(on_error(Tx, Code), [{timeout, infinity}]),
do_transaction(Tx, UserFun)
end.
fold_range_int(?IS_TX = Tx, StartKey, EndKey, Fun, Acc, Options) ->
St = options_to_fold_st(StartKey, EndKey, Options),
fold_range_int(Tx, St, Fun, Acc).
fold_range_int(Tx, #fold_st{} = St, Fun, Acc) ->
RangeFuture = fold_range_future_int(Tx, St),
fold_range_int(Tx, RangeFuture, Fun, Acc);
fold_range_int(Tx, ?IS_FOLD_FUTURE = FI, Fun, Acc) ->
{fold_info, St, Future} = FI,
#fold_st{
start_key = StartKey,
end_key = EndKey,
limit = Limit,
iteration = Iteration,
reverse = Reverse
} = St,
{RawRows, Count, HasMore} = wait(Future),
Count = length(RawRows),
% If our limit is within the current set of
% rows we need to truncate the list
Rows = if Limit == 0 orelse Limit > Count -> RawRows; true ->
lists:sublist(RawRows, Limit)
end,
% Invoke our callback to update the accumulator
NewAcc = if Rows == [] -> Acc; true ->
Fun(Rows, Acc)
end,
% Determine if we have more rows to iterate
Recurse = (Rows /= []) and (Limit == 0 orelse Limit > Count) and HasMore,
if not Recurse -> NewAcc; true ->
LastKey = element(1, lists:last(Rows)),
{NewStartKey, NewEndKey} = case Reverse /= 0 of
true ->
{StartKey, erlfdb_key:first_greater_or_equal(LastKey)};
false ->
{erlfdb_key:first_greater_than(LastKey), EndKey}
end,
NewSt = St#fold_st{
start_key = NewStartKey,
end_key = NewEndKey,
limit = if Limit == 0 -> 0; true -> Limit - Count end,
iteration = Iteration + 1
},
fold_range_int(Tx, NewSt, Fun, NewAcc)
end.
fold_range_future_int(?IS_TX = Tx, #fold_st{} = St) ->
#fold_st{
start_key = StartKey,
end_key = EndKey,
limit = Limit,
target_bytes = TargetBytes,
streaming_mode = StreamingMode,
iteration = Iteration,
snapshot = Snapshot,
reverse = Reverse
} = St,
Future = erlfdb_nif:transaction_get_range(
Tx,
StartKey,
EndKey,
Limit,
TargetBytes,
StreamingMode,
Iteration,
Snapshot,
Reverse
),
{fold_info, St, Future}.
options_to_fold_st(StartKey, EndKey, Options) ->
Reverse = case erlfdb_util:get(Options, reverse, false) of
true -> 1;
false -> 0;
I when is_integer(I) -> I
end,
#fold_st{
start_key = erlfdb_key:to_selector(StartKey),
end_key = erlfdb_key:to_selector(EndKey),
limit = erlfdb_util:get(Options, limit, 0),
target_bytes = erlfdb_util:get(Options, target_bytes, 0),
streaming_mode = erlfdb_util:get(Options, streaming_mode, want_all),
iteration = erlfdb_util:get(Options, iteration, 1),
snapshot = erlfdb_util:get(Options, snapshot, false),
reverse = Reverse
}.
flush_future_message(?IS_FUTURE = Future) ->
{erlfdb_future, MsgRef, _Res} = Future,
receive
{MsgRef, ready} -> ok
after
0 -> ok
end.