Implement the DirectoryLayer
This implements a DirectoryLayer intends to behave identically to the
Python version. Soak tests have so far run as long as an hour without
finding issues. However I plan on trying to get that number to upwards
of 24 hours or more.
diff --git a/notes/directory-layer-design.md b/notes/directory-layer-design.md
new file mode 100644
index 0000000..376f800
--- /dev/null
+++ b/notes/directory-layer-design.md
@@ -0,0 +1,184 @@
+Directory Layer Schema-ish
+==========================
+
+
+`'\x16...'` to `'\x1D...'` - Keys that represent contents of the directory
+ tree. This is basically "all possible positive integers". The prefix
+ of each key is a directory node down below.
+
+
+`'\xFE'` - Prefix for all keys related to the directory tree node
+ hierarchy.
+
+
+`'\xFE\x01version\x00'` - Version of the directory tree formatted as:
+ <<V1:32/little-unsigned, V2:32/little-unsigned, V3:32/little-unsigned>>
+
+
+`'\xFE\x01\xFE\x00'` - the root node of the tree
+
+
+`'\xFE\x01\xFE\x00\x01layer\x00'` - the "layer" of the given node. This
+ is set at node creation time and never mutated by the directory layer.
+ If a layer is provided when opening nodes it checks to see that the
+ layer matches nodes that are read. When there's a mismatch an error
+ is thrown. I believe the purpose of this is so that users of the
+ DirectoryLayer can assert a primitive "ownership" over part of the
+ directory tree with some guarantee that they won't accidentally trample
+ over each other. However, if "layer" is the binary string "partition"
+ then this becomes a whole new thing which I'll discuss below.
+
+
+`'\xFE\x01\xFE\x00\x16' - The prefix for all child nodes of the root node. The
+ pattern here is basically `'\xFE'` + erlfdb_tupe:pack({node.key(), 0})
+ although its using erlfdb_subspaces under the hood. I may ixnay subspaces
+ in the directory layer because the syntax really isn't all that useful
+ in Erlang.
+
+
+`'\xFE\x01\xFE\x00\x16\x02foo\x00'` -> `'\x17\x05'` - A child node named "foo"
+ which has a node id of `\x17\x05`.
+
+
+`'\xFE\x17\x05\x16\x02bar\x00'` -> `'\x17\x19'` - A child of "foo" named
+ "bar" with node id `'\x17\x19'`. The tree is made my recursively
+ following these paths.
+
+
+`'\xFE\x17\x19\x01layer\x00'` -> `'partition'` A node id has its layer value
+ set to partition which means that this node is "DirectoryPartition"
+ which basically means its node id is pasted onto the front of
+ every subtree node. This can be useful to do a range scan against an
+ entire subtree. However, directories can not be moved across partition
+ boundaries (because that would require changing all of their key
+ prefixes). Also, it makes key lengths slightly longer by the length
+ of the short node id string.
+
+ Partitions are roughly equivalent to a nested directory tree re-rooted
+ outside the main directory tree. All of the contents and node keys
+ share the same prefix based on the node id of the partition.
+
+
+Some Names
+==========
+
+A warning to future readers, this list of names is used fairly consistently
+by the `erlfdb` DirectoryLayer, however they are not all used in the
+Python implementation. So any comparison between the two requires some
+fairly decent knowledge of how things work. This is mostly because the
+Python implementation doesn't actually bother naming things and instead has
+a number of long function invocations that gloss over where we might need
+to assign something to a variable in Erlang.
+
+ * directory name = human readable string
+ * node_name = shortened binary
+ * root version = node_prefix + {"version"}
+ * root node = node_prefix + {node_prefix}
+ * partition id = node_name + 16#FE
+ * partition version = node_name + {"version"}
+ * partition node = partition_id + {partition_id}
+ * node_id = node_prefix + {node_name}
+ * node_layer_id = node_prefix + {node_name, "layer"}
+ * node_entry_id = node_prefix + {node_name, SUBDIRS, directory name}
+
+
+
+Algorithms
+==========
+
+These are all based on the assumption that my schema is at least relatively
+close to reality and also by staring at the Python implementation. However,
+the Python implementation makes extensive use of OO inheritance as well
+as syntactical sugar via magic methods. These algorithms are my general
+reinterpretation of the Python approach in Erlang terms.
+
+Some common assumptions here:
+
+ * Path is a tuple of UTF-8 encoded binaries.
+
+
+Find Node at Path
+-----------------
+
+ * Get the root node (i.e., `\xFE\x01\xFE\x00`) (though this will
+ obviously be parameterizable).
+ * curr_node = RootNode
+ * For part in path:
+ - node_id = erlfdb:get(Tx, Root + to_string(Path))
+ - if node_id == not_found: return not_found
+ - curr_node = `'\xFE' + node_id`
+ * return curr_node
+
+
+Open Directory at Path
+----------------------
+
+ * Find node at path
+ * If not found, return not_found or throw an error or w/e
+ * Read layer value
+ * Optionally pre-fetch child keys. Would be more efficient as
+ a single range scan, but super wide trees would be ungood
+ * if layer != "partition": Return value representing this directory
+ - root reference
+ - path referecne
+ - layer
+ - children
+ * else: Return record representing this partition
+ - previous root
+ - new root
+ - path to partition root
+
+
+Create New Directory at Path
+----------------------------
+
+ * Find directory at Path[:-1]
+ * If direcotry[Path[-1]] exists: return eexists
+ * Allocate new node id via HCA
+ * Write `'\xFE' + node_id + '\x01layer\x00' = whatever was passed
+ * return samesies as for open path
+
+
+Remove A Directory at Path
+--------------------------
+
+ * Find node at Path
+ * del_children(node_id):
+ * for name, child_id in children(node_id):
+ * del_children(child_id)
+ * clear_range for all content related to child_id
+ * clear_range for this node's layer and children nodes
+ * delete node_id from Path[:-1]
+
+
+Move a Directory from PathA to PathB
+------------------------------------
+
+ * Bunch of error checking and partition handling
+ * Insert PathA node_id to PathB[:-1]'s list of children
+ * Then remove PathA's node_id from PathA[:-1]
+
+
+List Directory at Path
+----------------------
+
+ * Find Directory at Path
+ * Return children list. Mebbe after a range read if they
+ haven't been pre-loaded.
+
+
+Directory Exists
+----------------
+
+ * Find directory, see if it throws an error
+
+
+Tests
+-----
+
+ * Opening a directory creates parent directories?
+ * Creating parent directories does not set a label?
+ * Opening nested directories does not check intermediate labels?
+
+
+
diff --git a/src/erlfdb_directory.erl b/src/erlfdb_directory.erl
new file mode 100644
index 0000000..d727ad3
--- /dev/null
+++ b/src/erlfdb_directory.erl
@@ -0,0 +1,828 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(erlfdb_directory).
+
+
+-export([
+ root/0,
+ root/1,
+
+ create_or_open/3,
+ create_or_open/4,
+ create/3,
+ create/4,
+ open/3,
+ open/4,
+
+ list/2,
+ list/3,
+
+ exists/2,
+ exists/3,
+
+ move/4,
+ move_to/3,
+
+ remove/2,
+ remove/3,
+ remove_if_exists/2,
+ remove_if_exists/3,
+
+ get_id/1,
+ get_name/1,
+ get_root/1,
+ get_root_for_path/2,
+ get_node_prefix/1,
+ get_path/1,
+ get_layer/1,
+ get_subspace/1,
+
+ subspace/2,
+ key/1,
+ pack/2,
+ pack_vs/2,
+ unpack/2,
+ range/1,
+ range/2,
+ contains/2,
+
+ debug_nodes/2
+]).
+
+
+-include("erlfdb.hrl").
+
+
+-define(LAYER_VERSION, {1, 0, 0}).
+-define(DEFAULT_NODE_PREFIX, <<16#FE>>).
+-define(SUBDIRS, 0).
+
+
+root() ->
+ init_root([]).
+
+
+root(Options) ->
+ init_root(Options).
+
+
+create_or_open(TxObj, Node, Path) ->
+ create_or_open(TxObj, Node, Path, <<>>).
+
+
+create_or_open(TxObj, Node, PathIn, Layer) ->
+ {Root, Path} = adj_path(Node, PathIn),
+ if Path /= [] -> ok; true ->
+ ?ERLFDB_ERROR({open_error, cannot_open_root})
+ end,
+ case create_or_open_int(TxObj, Root, Path, Layer) of
+ #{is_absolute_root := true} ->
+ ?ERLFDB_ERROR({open_error, cannot_open_root});
+ Else ->
+ Else
+ end.
+
+create(TxObj, Node, Path) ->
+ create(TxObj, Node, Path, []).
+
+
+create(TxObj, Node, PathIn, Options) ->
+ {Root, Path} = adj_path(Node, PathIn),
+ check_manual_node_name(Root, Options),
+ erlfdb:transactional(TxObj, fun(Tx) ->
+ Layer = erlfdb_util:get(Options, layer, <<>>),
+ NodeName = erlfdb_util:get(Options, node_name, undefined),
+ create_int(Tx, Root, Path, Layer, NodeName)
+ end).
+
+
+open(TxObj, Node, Path) ->
+ open(TxObj, Node, Path, []).
+
+
+open(TxObj, Node, PathIn, Options) ->
+ {Root, Path} = adj_path(Node, PathIn),
+ if Path /= [] -> ok; true ->
+ ?ERLFDB_ERROR({open_error, cannot_open_root})
+ end,
+ erlfdb:transactional(TxObj, fun(Tx) ->
+ Layer = erlfdb_util:get(Options, layer, <<>>),
+ open_int(Tx, Root, Path, Layer)
+ end).
+
+
+list(TxObj, Node) ->
+ list(TxObj, Node, {}).
+
+
+list(TxObj, Node, PathIn) ->
+ {Root, Path} = adj_path(Node, PathIn),
+ erlfdb:transactional(TxObj, fun(Tx) ->
+ check_version(Tx, Root, read),
+ case find(Tx, Root, Path) of
+ not_found ->
+ ?ERLFDB_ERROR({list_error, missing_path, Path});
+ ListNode ->
+ Subdirs = ?ERLFDB_EXTEND(get_id(ListNode), ?SUBDIRS),
+ SDLen = size(Subdirs),
+ SDStart = <<Subdirs:SDLen/binary, 16#00>>,
+ SDEnd = <<Subdirs:SDLen/binary, 16#FF>>,
+ SubDirKVs = erlfdb:wait(erlfdb:get_range(Tx, SDStart, SDEnd)),
+ lists:map(fun({Key, NodeName}) ->
+ {DName} = ?ERLFDB_EXTRACT(Subdirs, Key),
+ ChildNode = init_node(Tx, ListNode, NodeName, DName),
+ {DName, ChildNode}
+ end, SubDirKVs)
+ end
+ end).
+
+
+exists(TxObj, Node) ->
+ exists(TxObj, Node, {}).
+
+
+exists(TxObj, Node, PathIn) ->
+ %Root = get_root(Node),
+ Root = get_root_for_path(Node, PathIn),
+ {Root, Path} = adj_path(Root, Node, PathIn),
+ erlfdb:transactional(TxObj, fun(Tx) ->
+ check_version(Tx, Root, read),
+ case find(Tx, Root, Path) of
+ not_found ->
+ false;
+ _ChildNode ->
+ true
+ end
+ end).
+
+
+move(TxObj, Node, OldPathIn, NewPathIn) ->
+ {Root, OldPath} = adj_path(Node, OldPathIn),
+ {Root, NewPath} = adj_path(Node, NewPathIn),
+ erlfdb:transactional(TxObj, fun(Tx) ->
+ check_version(Tx, Root, write),
+ check_not_subpath(OldPath, NewPath),
+
+ OldNode = find(Tx, Root, OldPath),
+ NewNode = find(Tx, Root, NewPath),
+
+ if OldNode /= not_found -> ok; true ->
+ ?ERLFDB_ERROR({move_error, missing_source, OldPath})
+ end,
+
+ if NewNode == not_found -> ok; true ->
+ ?ERLFDB_ERROR({move_error, target_exists, NewPath})
+ end,
+
+ {NewParentPath, [NewName]} = lists:split(length(NewPath) - 1, NewPath),
+ case find(Tx, Root, NewParentPath) of
+ not_found ->
+ ?ERLFDB_ERROR({move_error, missing_parent_node, NewParentPath});
+ NewParentNode ->
+ check_same_partition(OldNode, NewParentNode),
+ ParentId = get_id(NewParentNode),
+ NodeEntryId = ?ERLFDB_PACK(ParentId, {?SUBDIRS, NewName}),
+ erlfdb:set(Tx, NodeEntryId, get_name(OldNode)),
+ remove_from_parent(Tx, OldNode),
+ OldNode#{path := get_path(Root) ++ NewPath}
+ end
+ end).
+
+
+move_to(_TxObj, #{is_absolute_root := true}, _NewPath) ->
+ ?ERLFDB_ERROR({move_error, root_cannot_be_moved});
+
+move_to(TxObj, Node, NewAbsPathIn) ->
+ Root = get_root_for_path(Node, []),
+ RootPath = get_path(Root),
+ RootPathLen = length(RootPath),
+ NewAbsPath = path_init(NewAbsPathIn),
+ IsPrefix = lists:prefix(RootPath, NewAbsPath),
+ if IsPrefix -> ok; true ->
+ ?ERLFDB_ERROR({move_error, partition_mismatch, RootPath, NewAbsPath})
+ end,
+ NodePath = get_path(Node),
+ SrcPath = lists:nthtail(RootPathLen, NodePath),
+ TgtPath = lists:nthtail(RootPathLen, NewAbsPath),
+ move(TxObj, Root, SrcPath, TgtPath).
+
+
+remove(TxObj, Node) ->
+ remove_int(TxObj, Node, {}, false).
+
+
+remove(TxObj, Node, Path) ->
+ remove_int(TxObj, Node, Path, false).
+
+
+remove_if_exists(TxObj, Node) ->
+ remove_int(TxObj, Node, {}, true).
+
+
+remove_if_exists(TxObj, Node, Path) ->
+ remove_int(TxObj, Node, Path, true).
+
+
+get_id(Node) ->
+ invoke(Node, get_id, []).
+
+
+get_name(Node) ->
+ invoke(Node, get_name, []).
+
+
+get_root(Node) ->
+ invoke(Node, get_root, []).
+
+
+get_root_for_path(Node, Path) ->
+ invoke(Node, get_root_for_path, [Path]).
+
+
+get_partition(Node) ->
+ invoke(Node, get_partition, []).
+
+
+get_node_prefix(Node) ->
+ invoke(Node, get_node_prefix, []).
+
+
+get_path(Node) ->
+ invoke(Node, get_path, []).
+
+
+get_layer(Node) ->
+ invoke(Node, get_layer, []).
+
+
+get_subspace(Node) ->
+ invoke(Node, get_subspace, []).
+
+
+subspace(Node, Tuple) ->
+ erlfdb_subspace:create(get_subspace(Node), Tuple).
+
+
+key(Node) ->
+ erlfdb_subspace:key(get_subspace(Node)).
+
+
+pack(Node, Tuple) ->
+ erlfdb_subspace:pack(get_subspace(Node), Tuple).
+
+
+pack_vs(Node, Tuple) ->
+ erlfdb_subspace:pack_vs(get_subspace(Node), Tuple).
+
+
+unpack(Node, Key) ->
+ erlfdb_subspace:unpack(get_subspace(Node), Key).
+
+
+range(Node) ->
+ range(Node, {}).
+
+
+range(Node, Tuple) ->
+ erlfdb_subspace:range(get_subspace(Node), Tuple).
+
+
+contains(Node, Key) ->
+ erlfdb_subspace:contains(get_subspace(Node), Key).
+
+
+debug_nodes(TxObj, _Node) ->
+ erlfdb:fold_range(TxObj, <<16#02>>, <<16#FF>>, fun({K, V}, _Acc) ->
+ io:format(standard_error, "~s => ~s~n", [
+ erlfdb_util:repr(K),
+ erlfdb_util:repr(V)
+ ])
+ end, nil).
+
+
+invoke(not_found, _, _) ->
+ erlang:error(broken);
+
+invoke(Node, FunName, Args) ->
+ case Node of
+ #{FunName := Fun} ->
+ erlang:apply(Fun, [Node | Args]);
+ #{} ->
+ ?ERLFDB_ERROR({op_not_supported, FunName, Node})
+ end.
+
+
+init_root(Options) ->
+ DefNodePref = ?DEFAULT_NODE_PREFIX,
+ NodePrefix = erlfdb_util:get(Options, node_prefix, DefNodePref),
+ RootNodeId = ?ERLFDB_EXTEND(NodePrefix, NodePrefix),
+ ContentPrefix = erlfdb_util:get(Options, content_prefix, <<>>),
+ AllowManual = erlfdb_util:get(Options, allow_manual_names, false),
+ Allocator = erlfdb_hca:create(?ERLFDB_EXTEND(RootNodeId, <<"hca">>)),
+ #{
+ id => ?ERLFDB_EXTEND(NodePrefix, NodePrefix),
+ node_prefix => NodePrefix,
+ content_prefix => ContentPrefix,
+ allocator => Allocator,
+ allow_manual_names => AllowManual,
+ is_absolute_root => true,
+
+ get_id => fun(Self) -> maps:get(id, Self) end,
+ get_root => fun(Self) -> Self end,
+ get_root_for_path => fun(Self, _Path) -> Self end,
+ get_partition => fun(Self) -> Self end,
+ get_node_prefix => fun(Self) -> maps:get(node_prefix, Self) end,
+ get_path => fun(_Self) -> [] end,
+ get_layer => fun(_Self) -> <<>> end,
+ get_subspace => fun(_Self) ->
+ ?ERLFDB_ERROR({subspace_error, subspace_unsupported_for_root})
+ end
+ }.
+
+
+init_node(Tx, Node, NodeName, PathName) ->
+ NodePrefix = get_node_prefix(Node),
+ NodeLayerId = ?ERLFDB_PACK(NodePrefix, {NodeName, <<"layer">>}),
+ Layer = case erlfdb:wait(erlfdb:get(Tx, NodeLayerId)) of
+ not_found ->
+ ?ERLFDB_ERROR({internal_error, missing_node_layer, NodeLayerId});
+ LName ->
+ LName
+ end,
+ case Layer of
+ <<"partition">> ->
+ init_partition(Node, NodeName, PathName);
+ _ ->
+ init_directory(Node, NodeName, PathName, Layer)
+ end.
+
+
+init_partition(ParentNode, NodeName, PathName) ->
+ NodeNameLen = size(NodeName),
+ NodePrefix = <<NodeName:NodeNameLen/binary, 16#FE>>,
+ RootNodeId = ?ERLFDB_EXTEND(NodePrefix, NodePrefix),
+ Allocator = erlfdb_hca:create(?ERLFDB_EXTEND(RootNodeId, <<"hca">>)),
+ #{
+ id => RootNodeId,
+ name => NodeName,
+ root => get_root(ParentNode),
+ node_prefix => NodePrefix,
+ content_prefix => NodeName,
+ allocator => Allocator,
+ allow_manual_names => false,
+ path => path_append(get_path(ParentNode), PathName),
+ is_partition => true,
+
+ get_id => fun(Self) -> maps:get(id, Self) end,
+ get_name => fun(Self) -> maps:get(name, Self) end,
+ get_root => fun(Self) -> Self end,
+ get_root_for_path => fun(Self, PathIn) ->
+ case PathIn of
+ {} -> maps:get(root, Self);
+ [] -> maps:get(root, Self);
+ _ -> Self
+ end
+ end,
+ get_partition => fun(Self) -> maps:get(root, Self) end,
+ get_node_prefix => fun(Self) -> maps:get(node_prefix, Self) end,
+ get_path => fun(Self) -> maps:get(path, Self) end,
+ get_layer => fun(_Self) -> <<"partition">> end,
+ get_subspace => fun(_Self) ->
+ ?ERLFDB_ERROR({subspace_error, subspace_unsupported_for_partition})
+ end
+ }.
+
+
+init_directory(ParentNode, NodeName, PathName, Layer) ->
+ NodePrefix = get_node_prefix(ParentNode),
+ ParentPath = get_path(ParentNode),
+ #{
+ id => ?ERLFDB_EXTEND(NodePrefix, NodeName),
+ name => NodeName,
+ root => get_root(ParentNode),
+ path => path_append(ParentPath, PathName),
+ layer => Layer,
+
+ get_id => fun(Self) -> maps:get(id, Self) end,
+ get_name => fun(Self) -> maps:get(name, Self) end,
+ get_root => fun(Self) -> maps:get(root, Self) end,
+ get_root_for_path => fun(Self, Path) ->
+ NewPath = maps:get(path, Self) ++ path_init(Path),
+ get_root_for_path(maps:get(root, Self), NewPath)
+ end,
+ get_partition => fun(Self) -> maps:get(root, Self) end,
+ get_node_prefix => fun(Self) ->
+ Root = maps:get(root, Self),
+ get_node_prefix(Root)
+ end,
+ get_path => fun(Self) -> maps:get(path, Self) end,
+ get_layer => fun(Self) -> maps:get(layer, Self) end,
+ get_subspace => fun(Self) ->
+ erlfdb_subspace:create({}, maps:get(name, Self))
+ end
+ }.
+
+
+find(_Tx, Node, []) ->
+ Node;
+
+find(Tx, Node, [PathName | RestPath]) ->
+ NodeEntryId = ?ERLFDB_PACK(get_id(Node), {?SUBDIRS, PathName}),
+ case erlfdb:wait(erlfdb:get(Tx, NodeEntryId)) of
+ not_found ->
+ not_found;
+ ChildNodeName ->
+ ChildNode = init_node(Tx, Node, ChildNodeName, PathName),
+ find(Tx, ChildNode, RestPath)
+ end.
+
+
+find_deepest(_Tx, Node, []) ->
+ Node;
+
+find_deepest(Tx, Node, [PathName | RestPath]) ->
+ NodeEntryId = ?ERLFDB_PACK(get_id(Node), {?SUBDIRS, PathName}),
+ case erlfdb:wait(erlfdb:get(Tx, NodeEntryId)) of
+ not_found ->
+ Node;
+ ChildNodeName ->
+ ChildNode = init_node(Tx, Node, ChildNodeName, PathName),
+ find_deepest(Tx, ChildNode, RestPath)
+ end.
+
+
+create_or_open_int(TxObj, Node, {}, Layer) ->
+ create_or_open_int(TxObj, Node, [], Layer);
+
+create_or_open_int(_TxObj, Node, [], LayerIn) ->
+ Layer = case LayerIn of
+ <<>> -> <<>>;
+ null -> <<>>;
+ undefined -> <<>>;
+ Else when is_binary(Else) -> Else
+ end,
+ NodeLayer = get_layer(Node),
+ if Layer == <<>> orelse Layer == NodeLayer -> ok; true ->
+ ?ERLFDB_ERROR({open_error, layer_mismatch, Layer, NodeLayer})
+ end,
+ Node;
+
+create_or_open_int(TxObj, Node, PathIn, Layer) ->
+ {Root, Path} = adj_path(Node, PathIn),
+ erlfdb:transactional(TxObj, fun(Tx) ->
+ {ParentPath, [PathName]} = lists:split(length(Path) - 1, Path),
+
+ Parent = lists:foldl(fun(Name, CurrNode) ->
+ try
+ open_int(Tx, CurrNode, Name, <<>>)
+ catch error:{?MODULE, {open_error, path_missing, _}} ->
+ create_int(Tx, CurrNode, Name, <<>>, undefined)
+ end
+ end, Root, ParentPath),
+
+ try
+ open_int(Tx, Parent, PathName, Layer)
+ catch error:{?MODULE, {open_error, path_missing, _}} ->
+ create_int(Tx, Parent, PathName, Layer, undefined)
+ end
+ end).
+
+
+create_int(Tx, Node, PathIn, Layer, NodeNameIn) ->
+ Path = path_init(PathIn),
+ try
+ open_int(Tx, Node, Path, <<>>),
+ ?ERLFDB_ERROR({create_error, path_exists, Path})
+ catch error:{?MODULE, {open_error, path_missing, _}} ->
+ Deepest = find_deepest(Tx, Node, Path),
+ NodeName = create_node_name(Tx, Deepest, NodeNameIn),
+ {ParentPath, [PathName]} = lists:split(length(Path) - 1, Path),
+ case create_or_open_int(Tx, Node, ParentPath, <<>>) of
+ not_found ->
+ ?ERLFDB_ERROR({create_error, missing_parent, ParentPath});
+ Parent ->
+ check_version(Tx, Parent, write),
+ create_node(Tx, Parent, PathName, NodeName, Layer),
+ R = find(Tx, Parent, [PathName]),
+ if R /= not_found -> R; true ->
+ erlang:error(broken)
+ end
+ end
+ end.
+
+
+create_node(Tx, Parent, PathName, NodeName, LayerIn) ->
+ NodeEntryId = ?ERLFDB_PACK(get_id(Parent), {?SUBDIRS, PathName}),
+ erlfdb:set(Tx, NodeEntryId, NodeName),
+
+ NodePrefix = get_node_prefix(Parent),
+ NodeLayerId = ?ERLFDB_PACK(NodePrefix, {NodeName, <<"layer">>}),
+ Layer = if LayerIn == undefined -> <<>>; true -> LayerIn end,
+ erlfdb:set(Tx, NodeLayerId, Layer).
+
+
+open_int(Tx, Node, PathIn, Layer) ->
+ check_version(Tx, Node, read),
+ Path = path_init(PathIn),
+ case find(Tx, Node, Path) of
+ not_found ->
+ ?ERLFDB_ERROR({open_error, path_missing, Path});
+ #{is_absolute_root := true} ->
+ ?ERLFDB_ERROR({open_error, cannot_open_root});
+ Opened ->
+ NodeLayer = get_layer(Opened),
+ if Layer == <<>> orelse Layer == NodeLayer -> ok; true ->
+ ?ERLFDB_ERROR({open_error, layer_mismatch, Layer, NodeLayer})
+ end,
+ Opened
+ end.
+
+
+remove_int(TxObj, Node, PathIn, IgnoreMissing) ->
+ Root = get_root_for_path(Node, PathIn),
+ {Root, Path} = adj_path(Root, Node, PathIn),
+ erlfdb:transactional(TxObj, fun(Tx) ->
+ check_version(Tx, Root, write),
+ case find(Tx, Root, Path) of
+ not_found when IgnoreMissing ->
+ ok;
+ not_found ->
+ ?ERLFDB_ERROR({remove_error, path_missing, Path});
+ #{is_absolute_root := true} ->
+ ?ERLFDB_ERROR({remove_error, cannot_remove_root});
+ ToRem ->
+ remove_recursive(Tx, ToRem),
+ remove_from_parent(Tx, ToRem)
+ end
+ end).
+
+
+remove_recursive(Tx, Node) ->
+ % Remove all subdirectories
+ lists:foreach(fun({_DirName, ChildNode}) ->
+ remove_recursive(Tx, ChildNode)
+ end, list(Tx, Node)),
+
+ % Delete all content for the node.
+ ContentSS = erlfdb_subspace:create({}, get_name(Node)),
+ {ContentStart, ContentEnd} = erlfdb_subspace:range(ContentSS),
+ erlfdb:clear_range(Tx, ContentStart, ContentEnd),
+
+ % Delete this node from the tree hierarchy
+ NodeSubspace = erlfdb_subspace:create({}, get_id(Node)),
+ {NodeStart, NodeEnd} = erlfdb_subspace:range(NodeSubspace),
+ erlfdb:clear_range(Tx, NodeStart, NodeEnd).
+
+
+remove_from_parent(Tx, Node) ->
+ {Root, Path} = adj_path(get_root_for_path(Node, []), Node, []),
+ {ParentPath, [PathName]} = lists:split(length(Path) - 1, Path),
+ Parent = find(Tx, Root, ParentPath),
+
+ NodeEntryId = ?ERLFDB_PACK(get_id(Parent), {?SUBDIRS, PathName}),
+ erlfdb:clear(Tx, NodeEntryId).
+
+
+check_manual_node_name(Root, Options) ->
+ AllowManual = maps:get(allow_manual_names, Root),
+ IsManual = lists:keyfind(node_name, 1, Options) /= false,
+ if not (IsManual and not AllowManual) -> ok; true ->
+ ?ERLFDB_ERROR({create_error, manual_node_names_prohibited})
+ end.
+
+
+create_node_name(Tx, Parent, NameIn) ->
+ #{
+ content_prefix := ContentPrefix,
+ allow_manual_names := AllowManual,
+ allocator := Allocator
+ } = get_root(Parent),
+ Name = case NameIn of
+ null -> undefined;
+ undefined -> undefined;
+ _ when is_binary(NameIn) -> NameIn
+ end,
+ case Name of
+ _ when Name == undefined ->
+ BaseId = erlfdb_hca:allocate(Allocator, Tx),
+ CPLen = size(ContentPrefix),
+ NewName = <<ContentPrefix:CPLen/binary, BaseId/binary>>,
+
+ KeysExist = erlfdb:get_range_startswith(Tx, NewName, [{limit, 1}]),
+ if KeysExist == [] -> ok; true ->
+ ?ERLFDB_ERROR({
+ create_error,
+ keys_exist_for_allocated_name,
+ NewName
+ })
+ end,
+
+ IsFree = is_prefix_free(erlfdb:snapshot(Tx), Parent, NewName),
+ if IsFree -> ok; true ->
+ ?ERLFDB_ERROR({
+ create_error,
+ manual_names_conflict_with_allocated_name,
+ NewName
+ })
+ end,
+
+ NewName;
+ _ when AllowManual andalso is_binary(Name) ->
+ case is_prefix_free(Tx, Parent, NameIn) of
+ true ->
+ ok;
+ false ->
+ ?ERLFDB_ERROR({create_error, node_name_in_use, NameIn})
+ end,
+ NameIn;
+ _ ->
+ ?ERLFDB_ERROR({create_error, manual_node_names_prohibited})
+ end.
+
+
+is_prefix_free(Tx, Parent, NodeName) ->
+ % We have to make sure that NodeName does not interact with
+ % anything that currently exists in the tree. This means that
+ % it must not be a prefix of any existing node id and also
+ % that no existing node id is a prefix of this NodeName.
+ %
+ % A motivating example for why is that deletion of nodes
+ % in the tree would end up deleting unrelated portions
+ % of the tree when node ids overlapped. There would also
+ % be other badness if keys overlapped with the layer
+ % or ?SUBDIRS spaces.
+
+ try
+ % An empty name would obviously be kind of bonkers.
+ if NodeName /= <<>> -> ok; true ->
+ throw(false)
+ end,
+
+ Root = get_root(Parent),
+ RootId = get_id(Root),
+ NodePrefix = get_node_prefix(Root),
+ NPLen = size(NodePrefix),
+
+ % First check that the special case of the root node
+ case bin_startswith(NodeName, RootId) of
+ true -> throw(false);
+ false -> ok
+ end,
+
+ % Check if any node id is a prefix of NodeName
+ Start1 = <<NodePrefix:NPLen/binary, 16#00>>,
+ End1 = ?ERLFDB_PACK(NodePrefix, {NodeName, null}),
+ Opts1 = [{reverse, true}, {limit, 1}, {streaming_mode, exact}],
+ Subspace = erlfdb_subspace:create({}, get_node_prefix(Parent)),
+ erlfdb:fold_range(Tx, Start1, End1, fun({Key, _} = _E, _) ->
+ KeyNodeId = element(1, erlfdb_subspace:unpack(Subspace, Key)),
+ case bin_startswith(NodeName, KeyNodeId) of
+ true -> throw(false);
+ false -> ok
+ end
+ end, nil, Opts1),
+
+ % Check if NodeName is a prefix of any existing key
+ Start2 = ?ERLFDB_EXTEND(NodePrefix, NodeName),
+ End2 = ?ERLFDB_EXTEND(NodePrefix, erlfdb_key:strinc(NodeName)),
+ Opts2 = [{limit, 1}, {streaming_mode, exact}],
+ case erlfdb:wait(erlfdb:get_range(Tx, Start2, End2, Opts2)) of
+ [_E | _] -> throw(false);
+ [] -> ok
+ end,
+
+ true
+ catch throw:false ->
+ false
+ end.
+
+
+bin_startswith(Subject, Prefix) ->
+ PrefixLen = size(Prefix),
+ case Subject of
+ <<Prefix:PrefixLen/binary, _/binary>> -> true;
+ _ -> false
+ end.
+
+
+check_version(Tx, Node, PermLevel) ->
+ Root = get_root(Node),
+ VsnKey = ?ERLFDB_EXTEND(get_id(Root), <<"version">>),
+ {LV1, LV2, _LV3} = ?LAYER_VERSION,
+ {Major, Minor, Patch} = case erlfdb:wait(erlfdb:get(Tx, VsnKey)) of
+ not_found when PermLevel == write ->
+ initialize_directory(Tx, VsnKey);
+ not_found ->
+ ?LAYER_VERSION;
+ VsnBin ->
+ <<
+ V1:32/little-unsigned,
+ V2:32/little-unsigned,
+ V3:32/little-unsigned
+ >> = VsnBin,
+ {V1, V2, V3}
+ end,
+
+ Path = get_path(Node),
+
+ if Major =< LV1 -> ok; true ->
+ ?ERLFDB_ERROR({version_error, unreadable, Path, {Major, Minor, Patch}})
+ end,
+
+ if not (Minor > LV2 andalso PermLevel /= read) -> ok; true ->
+ ?ERLFDB_ERROR({version_error, unwritable, Path, {Major, Minor, Patch}})
+ end.
+
+
+initialize_directory(Tx, VsnKey) ->
+ {V1, V2, V3} = ?LAYER_VERSION,
+ Packed = <<
+ V1:32/little-unsigned,
+ V2:32/little-unsigned,
+ V3:32/little-unsigned
+ >>,
+ erlfdb:set(Tx, VsnKey, Packed),
+ ?LAYER_VERSION.
+
+
+check_same_partition(OldNode, NewParentNode) ->
+ OldRoot = get_partition(OldNode),
+ NewRoot = get_root(NewParentNode),
+ if NewRoot == OldRoot -> ok; true ->
+ ?ERLFDB_ERROR({move_error, partition_mismatch, OldRoot, NewRoot})
+ end.
+
+
+adj_path(Node, PathIn) ->
+ adj_path(get_root(Node), Node, PathIn).
+
+
+adj_path(Root, Node, PathIn) ->
+ RootPathLen = length(get_path(Root)),
+ NodePath = get_path(Node),
+ NodeRelPath = lists:nthtail(RootPathLen, NodePath),
+ Path = NodeRelPath ++ path_init(PathIn),
+ {Root, Path}.
+
+
+path_init(<<_/binary>> = Bin) ->
+ check_utf8(0, Bin),
+ [{utf8, Bin}];
+
+path_init({utf8, <<_/binary>> = Bin} = Path) ->
+ check_utf8(0, Bin),
+ [Path];
+
+path_init(Path) when is_list(Path) ->
+ lists:flatmap(fun(Part) ->
+ path_init(Part)
+ end, Path);
+
+path_init(Path) when is_tuple(Path) ->
+ path_init(tuple_to_list(Path));
+
+path_init(Else) ->
+ ?ERLFDB_ERROR({path_error, invalid_path_component, Else}).
+
+
+check_utf8(Offset, Binary) ->
+ case Binary of
+ <<_:Offset/binary>> ->
+ true;
+ <<_:Offset/binary, _/utf8, Rest/binary>> ->
+ % Recalculating offset as a subtraction here is
+ % slightly odd but this is to avoid having to
+ % re-encode the utf8 code point and adding the
+ % size of that new binary.
+ check_utf8(size(Binary) - size(Rest), Binary);
+ <<_:Offset/binary, _/binary>> ->
+ ?ERLFDB_ERROR({path_error, invalid_utf8, Binary})
+ end.
+
+
+path_append(Path, Part) ->
+ Path ++ path_init(Part).
+
+
+check_not_subpath(OldPath, NewPath) ->
+ case lists:prefix(OldPath, NewPath) of
+ true ->
+ ?ERLFDB_ERROR({
+ move_error,
+ target_is_subdirectory,
+ OldPath,
+ NewPath
+ });
+ false ->
+ ok
+ end.
diff --git a/test/tester.es b/test/tester.es
index e0a28e5..101f535 100755
--- a/test/tester.es
+++ b/test/tester.es
@@ -4,6 +4,18 @@
-mode(compile).
+-define(DIRECTORY_CREATE_OPS, [
+ <<"DIRECTORY_CREATE_SUBSPACE">>,
+ <<"DIRECTORY_CREATE_LAYER">>,
+ <<"DIRECTORY_CREATE_OR_OPEN">>,
+ <<"DIRECTORY_CREATE">>,
+ <<"DIRECTORY_OPEN">>,
+ <<"DIRECTORY_MOVE">>,
+ <<"DIRECTORY_MOVE_TO">>,
+ <<"DIRECTORY_OPEN_SUBSPACE">>
+]).
+
+
-record(st, {
db,
tx_mgr,
@@ -16,10 +28,24 @@
is_snapshot,
last_version,
pids,
- directory_extension
+
+ % Directory Layer tests
+ is_directory_op,
+ dir_list,
+ dir_index,
+ dir_error_index
}).
+init_rand() ->
+ case os:getenv("RANDOM_SEED") of
+ false ->
+ ok;
+ Seed ->
+ rand:seed(exsplus, {list_to_integer(Seed), 0, 0})
+ end.
+
+
stack_create() ->
Pid = spawn_link(fun() -> stack_loop([]) end),
spawn(fun() ->
@@ -192,6 +218,20 @@
stack_push(Pid, {Idx, Value}).
+stack_pop_tuples(St) ->
+ {Tuple} = stack_pop_tuples(St, 1),
+ Tuple.
+
+
+stack_pop_tuples(St, Count) ->
+ TupleList = lists:map(fun(_) ->
+ TupleSize = stack_pop(St),
+ TupleElems = stack_pop(St, TupleSize),
+ list_to_tuple(TupleElems)
+ end, lists:seq(1, Count)),
+ list_to_tuple(TupleList).
+
+
get_transaction(TxName) ->
get({'$erlfdb_tx', TxName}).
@@ -201,6 +241,23 @@
put({'$erlfdb_tx', TxName}, Tx).
+switch_transaction(Db, TxName) ->
+ case get_transaction(TxName) of
+ undefined ->
+ new_transaction(Db, TxName);
+ _ ->
+ ok
+ end.
+
+
+has_prefix(Subject, Prefix) ->
+ PrefLen = size(Prefix),
+ case Subject of
+ <<Prefix:PrefLen/binary, _/binary>> -> true;
+ _ -> false
+ end.
+
+
has_suffix(Subject, Suffix) ->
SubjSize = size(Subject),
SuffSize = size(Suffix),
@@ -235,8 +292,8 @@
wait_for_empty(Db, Prefix) ->
- erlfdb:transactional(Db, fun(Tr) ->
- Future = erlfdb:get_range_startswith(Tr, Prefix, [{limit, 1}]),
+ erlfdb:transactional(Db, fun(Tx) ->
+ Future = erlfdb:get_range_startswith(Tx, Prefix, [{limit, 1}]),
case erlfdb:wait(Future) of
[_|_] -> erlang:error({erlfdb_error, 1020});
[] -> ok
@@ -244,7 +301,21 @@
end).
+append_dir(St, Dir) ->
+ case Dir of
+ not_found -> erlang:error(broken);
+ _ -> ok
+ end,
+ #st{
+ dir_list = DirList
+ } = St,
+ St#st{
+ dir_list = DirList ++ [Dir]
+ }.
+
+
init_run_loop(Db, Prefix) ->
+ init_rand(),
{StartKey, EndKey} = erlfdb_tuple:range({Prefix}),
St = #st{
db = Db,
@@ -257,10 +328,13 @@
is_snapshot = undefined,
last_version = 0,
pids = [],
- directory_extension = undefined
+
+ dir_list = [erlfdb_directory:root()],
+ dir_index = 0,
+ dir_error_index = 0
},
%% lists:foreach(fun({K, V}) ->
- %% io:format("'~s'~n'~s'~n", [py_repr(K), py_repr(V)])
+ %% io:format("'~s'~n'~s'~n", [erlfdb_util:repr(K), erlfdb_util:repr(V)])
%% end, St#st.instructions),
run_loop(St).
@@ -290,6 +364,7 @@
IsDb = has_suffix(Op, <<"_DATABASE">>),
IsSS = has_suffix(Op, <<"_SNAPSHOT">>),
+ IsDir = has_prefix(Op, <<"DIRECTORY_">>),
OpName = if not (IsDb or IsSS) -> Op; true ->
binary:part(Op, {0, size(Op) - 9}) % strip off _DATABASE/_SNAPSHOT
@@ -307,7 +382,8 @@
PreSt = St#st{
op_tuple = OpTuple,
is_db = IsDb,
- is_snapshot = IsSS
+ is_snapshot = IsSS,
+ is_directory_op = IsDir
},
PostSt = try
@@ -324,7 +400,8 @@
index = Index + 1,
op_tuple = undefined,
is_db = undefined,
- is_snapshot = undefined
+ is_snapshot = undefined,
+ is_directory_op = undefined
}).
@@ -382,6 +459,7 @@
execute(_TxObj, St, <<"USE_TRANSACTION">>) ->
TxName = stack_pop(St),
+ switch_transaction(St#st.db, TxName),
St#st{
tx_name = TxName
};
@@ -664,10 +742,263 @@
% TODO
St;
+execute(TxObj, #st{is_directory_op = true} = St, Op) ->
+ #st{
+ dir_list = DirList,
+ dir_index = DirIdx
+ } = St,
+ Dir = lists:nth(DirIdx + 1, DirList),
+ try
+ execute_dir(TxObj, St, Dir, Op)
+ catch error:{erlfdb_directory, _} = _R ->
+ NewSt = case lists:member(Op, ?DIRECTORY_CREATE_OPS) of
+ true -> append_dir(St, null);
+ false -> St
+ end,
+ stack_push(St, <<"DIRECTORY_ERROR">>),
+ NewSt
+ end;
+
execute(_TxObj, _St, UnknownOp) ->
erlang:error({unknown_op, UnknownOp}).
+execute_dir(_TxObj, St, _Dir, <<"DIRECTORY_CREATE_SUBSPACE">>) ->
+ Path = stack_pop_tuples(St),
+ RawPrefix = stack_pop(St),
+ Subspace = erlfdb_subspace:create(Path, RawPrefix),
+ append_dir(St, Subspace);
+
+execute_dir(_TxObj, St, _Dir, <<"DIRECTORY_CREATE_LAYER">>) ->
+ #st{
+ dir_list = DirList
+ } = St,
+ [Index1, Index2, AllowManual] = stack_pop(St, 3),
+ NodeSS = lists:nth(Index1 + 1, DirList),
+ ContentSS = lists:nth(Index2 + 1, DirList),
+ case (NodeSS == null orelse ContentSS == null) of
+ true ->
+ append_dir(St, null);
+ false ->
+ Opts = [
+ {node_prefix, erlfdb_subspace:key(NodeSS)},
+ {content_prefix, erlfdb_subspace:key(ContentSS)},
+ {allow_manual_names, AllowManual == 1}
+ ],
+ append_dir(St, erlfdb_directory:root(Opts))
+ end;
+
+execute_dir(_TxObj, St, _Dir, <<"DIRECTORY_CHANGE">>) ->
+ #st{
+ dir_list = DirList,
+ dir_error_index = ErrIdx
+ } = St,
+ DirIdx1 = stack_pop(St),
+ DirIdx2 = case lists:nth(DirIdx1 + 1, DirList) of
+ null -> ErrIdx;
+ _ -> DirIdx1
+ end,
+ St#st{
+ dir_index = DirIdx2
+ };
+
+execute_dir(_TxObj, St, _Dir, <<"DIRECTORY_SET_ERROR_INDEX">>) ->
+ St#st{
+ dir_error_index = stack_pop(St)
+ };
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_CREATE_OR_OPEN">>) ->
+ Path = stack_pop_tuples(St),
+ Layer = stack_pop(St),
+ NewDir = erlfdb_directory:create_or_open(TxObj, Dir, Path, Layer),
+ append_dir(St, NewDir);
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_CREATE">>) ->
+ Path = stack_pop_tuples(St),
+ [Layer, Prefix] = stack_pop(St, 2),
+ Opts = [{layer, Layer}] ++ case Prefix of
+ null -> [];
+ _ -> [{node_name, Prefix}]
+ end,
+ NewDir = erlfdb_directory:create(TxObj, Dir, Path, Opts),
+ append_dir(St, NewDir);
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_OPEN">>) ->
+ Path = stack_pop_tuples(St),
+ Layer = stack_pop(St),
+ Opts = [{layer, Layer}],
+ NewDir = erlfdb_directory:open(TxObj, Dir, Path, Opts),
+ append_dir(St, NewDir);
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_MOVE">>) ->
+ {OldPath, NewPath} = stack_pop_tuples(St, 2),
+ NewDir = erlfdb_directory:move(TxObj, Dir, OldPath, NewPath),
+ append_dir(St, NewDir);
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_MOVE_TO">>) ->
+ NewAbsPath = stack_pop_tuples(St),
+ NewDir = erlfdb_directory:move_to(TxObj, Dir, NewAbsPath),
+ append_dir(St, NewDir);
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_REMOVE">>) ->
+ Count = stack_pop(St),
+ case Count == 0 of
+ true ->
+ erlfdb_directory:remove(TxObj, Dir);
+ false ->
+ Path = stack_pop_tuples(St),
+ erlfdb_directory:remove(TxObj, Dir, Path)
+ end,
+ St;
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_REMOVE_IF_EXISTS">>) ->
+ Count = stack_pop(St),
+ case Count == 0 of
+ true ->
+ erlfdb_directory:remove_if_exists(TxObj, Dir);
+ false ->
+ Path = stack_pop_tuples(St),
+ erlfdb_directory:remove_if_exists(TxObj, Dir, Path)
+ end,
+ St;
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_LIST">>) ->
+ Count = stack_pop(St),
+ Results = case Count == 0 of
+ true ->
+ erlfdb_directory:list(TxObj, Dir);
+ false ->
+ Path = stack_pop_tuples(St),
+ erlfdb_directory:list(TxObj, Dir, Path)
+ end,
+ Names = lists:map(fun({N, _}) -> N end, Results),
+ stack_push(St, erlfdb_tuple:pack(list_to_tuple(Names))),
+ St;
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_EXISTS">>) ->
+ Count = stack_pop(St),
+ Result = case Count == 0 of
+ true ->
+ erlfdb_directory:exists(TxObj, Dir);
+ false ->
+ Path = stack_pop_tuples(St),
+ erlfdb_directory:exists(TxObj, Dir, Path)
+ end,
+ case Result of
+ true -> stack_push(St, 1);
+ false -> stack_push(St, 0)
+ end,
+ St;
+
+execute_dir(_TxObj, St, Dir, <<"DIRECTORY_PACK_KEY">>) ->
+ Tuple = stack_pop_tuples(St),
+ Mod = get_dir_or_ss_mod(Dir),
+ Result = Mod:pack(Dir, Tuple),
+ stack_push(St, Result),
+ St;
+
+execute_dir(_TxObj, St, Dir, <<"DIRECTORY_UNPACK_KEY">>) ->
+ Key = stack_pop(St),
+ Mod = get_dir_or_ss_mod(Dir),
+ Tuple = Mod:unpack(Dir, Key),
+ lists:foreach(fun(Elem) ->
+ stack_push(St, Elem)
+ end, tuple_to_list(Tuple)),
+ St;
+
+execute_dir(_TxObj, St, Dir, <<"DIRECTORY_RANGE">>) ->
+ Tuple = stack_pop_tuples(St),
+ Mod = get_dir_or_ss_mod(Dir),
+ {Start, End} = Mod:range(Dir, Tuple),
+ stack_push(St, Start),
+ stack_push(St, End),
+ St;
+
+execute_dir(_TxObj, St, Dir, <<"DIRECTORY_CONTAINS">>) ->
+ Key = stack_pop(St),
+ Mod = get_dir_or_ss_mod(Dir),
+ Result = Mod:contains(Dir, Key),
+ case Result of
+ true -> stack_push(St, 1);
+ false -> stack_push(St, 0)
+ end,
+ St;
+
+execute_dir(_TxObj, St, Dir, <<"DIRECTORY_OPEN_SUBSPACE">>) ->
+ Path = stack_pop_tuples(St),
+ Mod = get_dir_or_ss_mod(Dir),
+ Subspace = Mod:subspace(Dir, Path),
+ append_dir(St, Subspace);
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_LOG_SUBSPACE">>) ->
+ #st{
+ dir_index = DirIdx
+ } = St,
+ Prefix = stack_pop(St),
+ LogKey = erlfdb_tuple:pack({DirIdx}, Prefix),
+ Mod = get_dir_or_ss_mod(Dir),
+ erlfdb:set(TxObj, LogKey, Mod:key(Dir)),
+ St;
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_LOG_DIRECTORY">>) ->
+ #st{
+ dir_index = DirIdx
+ } = St,
+ Prefix = stack_pop(St),
+ LogPrefix = erlfdb_tuple:pack({DirIdx}, Prefix),
+
+ Exists = erlfdb_directory:exists(TxObj, Dir),
+ Children = case Exists of
+ true ->
+ ListResult = erlfdb_directory:list(TxObj, Dir),
+ Names = lists:map(fun({N, _}) -> N end, ListResult),
+ list_to_tuple(Names);
+ false ->
+ {}
+ end,
+
+ PathKey = erlfdb_tuple:pack({{utf8, <<"path">>}}, LogPrefix),
+ Path = erlfdb_tuple:pack(list_to_tuple(erlfdb_directory:get_path(Dir))),
+ erlfdb:set(TxObj, PathKey, Path),
+
+ LayerKey = erlfdb_tuple:pack({{utf8, <<"layer">>}}, LogPrefix),
+ Layer = erlfdb_tuple:pack({erlfdb_directory:get_layer(Dir)}),
+ erlfdb:set(TxObj, LayerKey, Layer),
+
+ ExistsKey = erlfdb_tuple:pack({{utf8, <<"exists">>}}, LogPrefix),
+ ExistsVal = erlfdb_tuple:pack({if Exists -> 1; true -> 0 end}),
+ erlfdb:set(TxObj, ExistsKey, ExistsVal),
+
+ ChildrenKey = erlfdb_tuple:pack({{utf8, <<"children">>}}, LogPrefix),
+ ChildrenVal = erlfdb_tuple:pack(Children),
+ erlfdb:set(TxObj, ChildrenKey, ChildrenVal),
+
+ St;
+
+execute_dir(_TxObj, St, Dir, <<"DIRECTORY_STRIP_PREFIX">>) ->
+ ToStrip = stack_pop(St),
+ Mod = get_dir_or_ss_mod(Dir),
+ DirKey = Mod:key(Dir),
+ DKLen = size(DirKey),
+ case ToStrip of
+ _ when not is_binary(ToStrip) ->
+ erlang:error({erlfdb_directory, prefix_not_a_binary});
+ <<DirKey:DKLen/binary, Rest/binary>> ->
+ stack_push(St, Rest);
+ _ ->
+ erlang:error({erlfdb_directory, {invalid_prefix_strip, ToStrip, DirKey}})
+ end,
+ St;
+
+execute_dir(_TxObj, _St, _Dir, UnknownOp) ->
+ erlang:error({unknown_directory_op, UnknownOp}).
+
+
+get_dir_or_ss_mod(Subspace) when element(1, Subspace) == erlfdb_subspace ->
+ erlfdb_subspace;
+get_dir_or_ss_mod(#{}) ->
+ erlfdb_directory.
+
main([Prefix, APIVsn]) ->
main([Prefix, APIVsn, ""]);