Implement the DirectoryLayer

This implements a DirectoryLayer intends to behave identically to the
Python version. Soak tests have so far run as long as an hour without
finding issues. However I plan on trying to get that number to upwards
of 24 hours or more.
diff --git a/notes/directory-layer-design.md b/notes/directory-layer-design.md
new file mode 100644
index 0000000..376f800
--- /dev/null
+++ b/notes/directory-layer-design.md
@@ -0,0 +1,184 @@
+Directory Layer Schema-ish
+==========================
+
+
+`'\x16...'` to `'\x1D...'` - Keys that represent contents of the directory
+    tree. This is basically "all possible positive integers". The prefix
+    of each key is a directory node down below.
+
+
+`'\xFE'` - Prefix for all keys related to the directory tree node
+    hierarchy.
+
+
+`'\xFE\x01version\x00'` - Version of the directory tree formatted as:
+    <<V1:32/little-unsigned, V2:32/little-unsigned, V3:32/little-unsigned>>
+
+
+`'\xFE\x01\xFE\x00'` - the root node of the tree
+
+
+`'\xFE\x01\xFE\x00\x01layer\x00'` - the "layer" of the given node. This
+    is set at node creation time and never mutated by the directory layer.
+    If a layer is provided when opening nodes it checks to see that the
+    layer matches nodes that are read. When there's a mismatch an error
+    is thrown. I believe the purpose of this is so that users of the
+    DirectoryLayer can assert a primitive "ownership" over part of the
+    directory tree with some guarantee that they won't accidentally trample
+    over each other. However, if "layer" is the binary string "partition"
+    then this becomes a whole new thing which I'll discuss below.
+
+
+`'\xFE\x01\xFE\x00\x16' - The prefix for all child nodes of the root node. The
+    pattern here is basically `'\xFE'` + erlfdb_tupe:pack({node.key(), 0})
+    although its using erlfdb_subspaces under the hood. I may ixnay subspaces
+    in the directory layer because the syntax really isn't all that useful
+    in Erlang.
+
+
+`'\xFE\x01\xFE\x00\x16\x02foo\x00'` -> `'\x17\x05'` - A child node named "foo"
+    which has a node id of `\x17\x05`.
+
+
+`'\xFE\x17\x05\x16\x02bar\x00'` -> `'\x17\x19'` - A child of "foo" named
+    "bar" with node id `'\x17\x19'`. The tree is made my recursively
+    following these paths.
+
+
+`'\xFE\x17\x19\x01layer\x00'` -> `'partition'` A node id has its layer value
+    set to partition which means that this node is "DirectoryPartition"
+    which basically means its node id is pasted onto the front of
+    every subtree node. This can be useful to do a range scan against an
+    entire subtree. However, directories can not be moved across partition
+    boundaries (because that would require changing all of their key
+    prefixes). Also, it makes key lengths slightly longer by the length
+    of the short node id string.
+
+    Partitions are roughly equivalent to a nested directory tree re-rooted
+    outside the main directory tree. All of the contents and node keys
+    share the same prefix based on the node id of the partition.
+
+
+Some Names
+==========
+
+A warning to future readers, this list of names is used fairly consistently
+by the `erlfdb` DirectoryLayer, however they are not all used in the
+Python implementation. So any comparison between the two requires some
+fairly decent knowledge of how things work. This is mostly because the
+Python implementation doesn't actually bother naming things and instead has
+a number of long function invocations that gloss over where we might need
+to assign something to a variable in Erlang.
+
+  * directory name = human readable string
+  * node_name = shortened binary
+  * root version = node_prefix + {"version"}
+  * root node = node_prefix + {node_prefix}
+  * partition id = node_name + 16#FE
+  * partition version = node_name + {"version"}
+  * partition node = partition_id + {partition_id}
+  * node_id = node_prefix + {node_name}
+  * node_layer_id = node_prefix + {node_name, "layer"}
+  * node_entry_id = node_prefix + {node_name, SUBDIRS, directory name}
+
+
+
+Algorithms
+==========
+
+These are all based on the assumption that my schema is at least relatively
+close to reality and also by staring at the Python implementation. However,
+the Python implementation makes extensive use of OO inheritance as well
+as syntactical sugar via magic methods. These algorithms are my general
+reinterpretation of the Python approach in Erlang terms.
+
+Some common assumptions here:
+
+  * Path is a tuple of UTF-8 encoded binaries.
+
+
+Find Node at Path
+-----------------
+
+  * Get the root node (i.e., `\xFE\x01\xFE\x00`) (though this will
+    obviously be parameterizable).
+  * curr_node = RootNode
+  * For part in path:
+    - node_id = erlfdb:get(Tx, Root + to_string(Path))
+    - if node_id == not_found: return not_found
+    - curr_node = `'\xFE' + node_id`
+  * return curr_node
+
+
+Open Directory at Path
+----------------------
+
+  * Find node at path
+  * If not found, return not_found or throw an error or w/e
+  * Read layer value
+  * Optionally pre-fetch child keys. Would be more efficient as
+    a single range scan, but super wide trees would be ungood
+  * if layer != "partition": Return value representing this directory
+    - root reference
+    - path referecne
+    - layer
+    - children
+  * else: Return record representing this partition
+    - previous root
+    - new root
+    - path to partition root
+
+
+Create New Directory at Path
+----------------------------
+
+  * Find directory at Path[:-1]
+  * If direcotry[Path[-1]] exists: return eexists
+  * Allocate new node id via HCA
+  * Write `'\xFE' + node_id + '\x01layer\x00' = whatever was passed
+  * return samesies as for open path
+
+
+Remove A Directory at Path
+--------------------------
+
+  * Find node at Path
+  * del_children(node_id):
+  *    for name, child_id in children(node_id):
+  *        del_children(child_id)
+  *        clear_range for all content related to child_id
+  *    clear_range for this node's layer and children nodes
+  * delete node_id from Path[:-1]
+
+
+Move a Directory from PathA to PathB
+------------------------------------
+
+  * Bunch of error checking and partition handling
+  * Insert PathA node_id to PathB[:-1]'s list of children
+  * Then remove PathA's node_id from PathA[:-1]
+
+
+List Directory at Path
+----------------------
+
+  * Find Directory at Path
+  * Return children list. Mebbe after a range read if they
+    haven't been pre-loaded.
+
+
+Directory Exists
+----------------
+
+  * Find directory, see if it throws an error
+
+
+Tests
+-----
+
+  * Opening a directory creates parent directories?
+  * Creating parent directories does not set a label?
+  * Opening nested directories does not check intermediate labels?
+
+
+
diff --git a/src/erlfdb_directory.erl b/src/erlfdb_directory.erl
new file mode 100644
index 0000000..d727ad3
--- /dev/null
+++ b/src/erlfdb_directory.erl
@@ -0,0 +1,828 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(erlfdb_directory).
+
+
+-export([
+    root/0,
+    root/1,
+
+    create_or_open/3,
+    create_or_open/4,
+    create/3,
+    create/4,
+    open/3,
+    open/4,
+
+    list/2,
+    list/3,
+
+    exists/2,
+    exists/3,
+
+    move/4,
+    move_to/3,
+
+    remove/2,
+    remove/3,
+    remove_if_exists/2,
+    remove_if_exists/3,
+
+    get_id/1,
+    get_name/1,
+    get_root/1,
+    get_root_for_path/2,
+    get_node_prefix/1,
+    get_path/1,
+    get_layer/1,
+    get_subspace/1,
+
+    subspace/2,
+    key/1,
+    pack/2,
+    pack_vs/2,
+    unpack/2,
+    range/1,
+    range/2,
+    contains/2,
+
+    debug_nodes/2
+]).
+
+
+-include("erlfdb.hrl").
+
+
+-define(LAYER_VERSION, {1, 0, 0}).
+-define(DEFAULT_NODE_PREFIX, <<16#FE>>).
+-define(SUBDIRS, 0).
+
+
+root() ->
+    init_root([]).
+
+
+root(Options) ->
+    init_root(Options).
+
+
+create_or_open(TxObj, Node, Path) ->
+    create_or_open(TxObj, Node, Path, <<>>).
+
+
+create_or_open(TxObj, Node, PathIn, Layer) ->
+    {Root, Path} = adj_path(Node, PathIn),
+    if Path /= [] -> ok; true ->
+        ?ERLFDB_ERROR({open_error, cannot_open_root})
+    end,
+    case create_or_open_int(TxObj, Root, Path, Layer) of
+        #{is_absolute_root := true} ->
+            ?ERLFDB_ERROR({open_error, cannot_open_root});
+        Else ->
+            Else
+    end.
+
+create(TxObj, Node, Path) ->
+    create(TxObj, Node, Path, []).
+
+
+create(TxObj, Node, PathIn, Options) ->
+    {Root, Path} = adj_path(Node, PathIn),
+    check_manual_node_name(Root, Options),
+    erlfdb:transactional(TxObj, fun(Tx) ->
+        Layer = erlfdb_util:get(Options, layer, <<>>),
+        NodeName = erlfdb_util:get(Options, node_name, undefined),
+        create_int(Tx, Root, Path, Layer, NodeName)
+    end).
+
+
+open(TxObj, Node, Path) ->
+    open(TxObj, Node, Path, []).
+
+
+open(TxObj, Node, PathIn, Options) ->
+    {Root, Path} = adj_path(Node, PathIn),
+    if Path /= [] -> ok; true ->
+        ?ERLFDB_ERROR({open_error, cannot_open_root})
+    end,
+    erlfdb:transactional(TxObj, fun(Tx) ->
+        Layer = erlfdb_util:get(Options, layer, <<>>),
+        open_int(Tx, Root, Path, Layer)
+    end).
+
+
+list(TxObj, Node) ->
+    list(TxObj, Node, {}).
+
+
+list(TxObj, Node, PathIn) ->
+    {Root, Path} = adj_path(Node, PathIn),
+    erlfdb:transactional(TxObj, fun(Tx) ->
+        check_version(Tx, Root, read),
+        case find(Tx, Root, Path) of
+            not_found ->
+                ?ERLFDB_ERROR({list_error, missing_path, Path});
+            ListNode ->
+                Subdirs = ?ERLFDB_EXTEND(get_id(ListNode), ?SUBDIRS),
+                SDLen = size(Subdirs),
+                SDStart = <<Subdirs:SDLen/binary, 16#00>>,
+                SDEnd = <<Subdirs:SDLen/binary, 16#FF>>,
+                SubDirKVs = erlfdb:wait(erlfdb:get_range(Tx, SDStart, SDEnd)),
+                lists:map(fun({Key, NodeName}) ->
+                    {DName} = ?ERLFDB_EXTRACT(Subdirs, Key),
+                    ChildNode = init_node(Tx, ListNode, NodeName, DName),
+                    {DName, ChildNode}
+                end, SubDirKVs)
+        end
+    end).
+
+
+exists(TxObj, Node) ->
+    exists(TxObj, Node, {}).
+
+
+exists(TxObj, Node, PathIn) ->
+    %Root = get_root(Node),
+    Root = get_root_for_path(Node, PathIn),
+    {Root, Path} = adj_path(Root, Node, PathIn),
+    erlfdb:transactional(TxObj, fun(Tx) ->
+        check_version(Tx, Root, read),
+        case find(Tx, Root, Path) of
+            not_found ->
+                false;
+            _ChildNode ->
+                true
+        end
+    end).
+
+
+move(TxObj, Node, OldPathIn, NewPathIn) ->
+    {Root, OldPath} = adj_path(Node, OldPathIn),
+    {Root, NewPath} = adj_path(Node, NewPathIn),
+    erlfdb:transactional(TxObj, fun(Tx) ->
+        check_version(Tx, Root, write),
+        check_not_subpath(OldPath, NewPath),
+
+        OldNode = find(Tx, Root, OldPath),
+        NewNode = find(Tx, Root, NewPath),
+
+        if OldNode /= not_found -> ok; true ->
+            ?ERLFDB_ERROR({move_error, missing_source, OldPath})
+        end,
+
+        if NewNode == not_found -> ok; true ->
+            ?ERLFDB_ERROR({move_error, target_exists, NewPath})
+        end,
+
+        {NewParentPath, [NewName]} = lists:split(length(NewPath) - 1, NewPath),
+        case find(Tx, Root, NewParentPath) of
+            not_found ->
+                ?ERLFDB_ERROR({move_error, missing_parent_node, NewParentPath});
+            NewParentNode ->
+                check_same_partition(OldNode, NewParentNode),
+                ParentId = get_id(NewParentNode),
+                NodeEntryId = ?ERLFDB_PACK(ParentId, {?SUBDIRS, NewName}),
+                erlfdb:set(Tx, NodeEntryId, get_name(OldNode)),
+                remove_from_parent(Tx, OldNode),
+                OldNode#{path := get_path(Root) ++ NewPath}
+        end
+    end).
+
+
+move_to(_TxObj, #{is_absolute_root := true}, _NewPath) ->
+    ?ERLFDB_ERROR({move_error, root_cannot_be_moved});
+
+move_to(TxObj, Node, NewAbsPathIn) ->
+    Root = get_root_for_path(Node, []),
+    RootPath = get_path(Root),
+    RootPathLen = length(RootPath),
+    NewAbsPath = path_init(NewAbsPathIn),
+    IsPrefix = lists:prefix(RootPath, NewAbsPath),
+    if IsPrefix -> ok; true ->
+        ?ERLFDB_ERROR({move_error, partition_mismatch, RootPath, NewAbsPath})
+    end,
+    NodePath = get_path(Node),
+    SrcPath = lists:nthtail(RootPathLen, NodePath),
+    TgtPath = lists:nthtail(RootPathLen, NewAbsPath),
+    move(TxObj, Root, SrcPath, TgtPath).
+
+
+remove(TxObj, Node) ->
+    remove_int(TxObj, Node, {}, false).
+
+
+remove(TxObj, Node, Path) ->
+    remove_int(TxObj, Node, Path, false).
+
+
+remove_if_exists(TxObj, Node) ->
+    remove_int(TxObj, Node, {}, true).
+
+
+remove_if_exists(TxObj, Node, Path) ->
+    remove_int(TxObj, Node, Path, true).
+
+
+get_id(Node) ->
+    invoke(Node, get_id, []).
+
+
+get_name(Node) ->
+    invoke(Node, get_name, []).
+
+
+get_root(Node) ->
+    invoke(Node, get_root, []).
+
+
+get_root_for_path(Node, Path) ->
+    invoke(Node, get_root_for_path, [Path]).
+
+
+get_partition(Node) ->
+    invoke(Node, get_partition, []).
+
+
+get_node_prefix(Node) ->
+    invoke(Node, get_node_prefix, []).
+
+
+get_path(Node) ->
+    invoke(Node, get_path, []).
+
+
+get_layer(Node) ->
+    invoke(Node, get_layer, []).
+
+
+get_subspace(Node) ->
+    invoke(Node, get_subspace, []).
+
+
+subspace(Node, Tuple) ->
+    erlfdb_subspace:create(get_subspace(Node), Tuple).
+
+
+key(Node) ->
+    erlfdb_subspace:key(get_subspace(Node)).
+
+
+pack(Node, Tuple) ->
+    erlfdb_subspace:pack(get_subspace(Node), Tuple).
+
+
+pack_vs(Node, Tuple) ->
+    erlfdb_subspace:pack_vs(get_subspace(Node), Tuple).
+
+
+unpack(Node, Key) ->
+    erlfdb_subspace:unpack(get_subspace(Node), Key).
+
+
+range(Node) ->
+    range(Node, {}).
+
+
+range(Node, Tuple) ->
+    erlfdb_subspace:range(get_subspace(Node), Tuple).
+
+
+contains(Node, Key) ->
+    erlfdb_subspace:contains(get_subspace(Node), Key).
+
+
+debug_nodes(TxObj, _Node) ->
+    erlfdb:fold_range(TxObj, <<16#02>>, <<16#FF>>, fun({K, V}, _Acc) ->
+        io:format(standard_error, "~s => ~s~n", [
+                erlfdb_util:repr(K),
+                erlfdb_util:repr(V)
+            ])
+    end, nil).
+
+
+invoke(not_found, _, _) ->
+    erlang:error(broken);
+
+invoke(Node, FunName, Args) ->
+    case Node of
+        #{FunName := Fun} ->
+            erlang:apply(Fun, [Node | Args]);
+        #{} ->
+            ?ERLFDB_ERROR({op_not_supported, FunName, Node})
+    end.
+
+
+init_root(Options) ->
+    DefNodePref = ?DEFAULT_NODE_PREFIX,
+    NodePrefix = erlfdb_util:get(Options, node_prefix, DefNodePref),
+    RootNodeId = ?ERLFDB_EXTEND(NodePrefix, NodePrefix),
+    ContentPrefix = erlfdb_util:get(Options, content_prefix, <<>>),
+    AllowManual = erlfdb_util:get(Options, allow_manual_names, false),
+    Allocator = erlfdb_hca:create(?ERLFDB_EXTEND(RootNodeId, <<"hca">>)),
+    #{
+        id => ?ERLFDB_EXTEND(NodePrefix, NodePrefix),
+        node_prefix => NodePrefix,
+        content_prefix => ContentPrefix,
+        allocator => Allocator,
+        allow_manual_names => AllowManual,
+        is_absolute_root => true,
+
+        get_id => fun(Self) -> maps:get(id, Self) end,
+        get_root => fun(Self) -> Self end,
+        get_root_for_path => fun(Self, _Path) -> Self end,
+        get_partition => fun(Self) -> Self end,
+        get_node_prefix => fun(Self) -> maps:get(node_prefix, Self) end,
+        get_path => fun(_Self) -> [] end,
+        get_layer => fun(_Self) -> <<>> end,
+        get_subspace => fun(_Self) ->
+            ?ERLFDB_ERROR({subspace_error, subspace_unsupported_for_root})
+        end
+    }.
+
+
+init_node(Tx, Node, NodeName, PathName) ->
+    NodePrefix = get_node_prefix(Node),
+    NodeLayerId = ?ERLFDB_PACK(NodePrefix, {NodeName, <<"layer">>}),
+    Layer = case erlfdb:wait(erlfdb:get(Tx, NodeLayerId)) of
+        not_found ->
+            ?ERLFDB_ERROR({internal_error, missing_node_layer, NodeLayerId});
+        LName ->
+            LName
+    end,
+    case Layer of
+        <<"partition">> ->
+            init_partition(Node, NodeName, PathName);
+        _ ->
+            init_directory(Node, NodeName, PathName, Layer)
+    end.
+
+
+init_partition(ParentNode, NodeName, PathName) ->
+    NodeNameLen = size(NodeName),
+    NodePrefix = <<NodeName:NodeNameLen/binary, 16#FE>>,
+    RootNodeId = ?ERLFDB_EXTEND(NodePrefix, NodePrefix),
+    Allocator = erlfdb_hca:create(?ERLFDB_EXTEND(RootNodeId, <<"hca">>)),
+    #{
+        id => RootNodeId,
+        name => NodeName,
+        root => get_root(ParentNode),
+        node_prefix => NodePrefix,
+        content_prefix => NodeName,
+        allocator => Allocator,
+        allow_manual_names => false,
+        path => path_append(get_path(ParentNode), PathName),
+        is_partition => true,
+
+        get_id => fun(Self) -> maps:get(id, Self) end,
+        get_name => fun(Self) -> maps:get(name, Self) end,
+        get_root => fun(Self) -> Self end,
+        get_root_for_path => fun(Self, PathIn) ->
+            case PathIn of
+                {} -> maps:get(root, Self);
+                [] -> maps:get(root, Self);
+                _ -> Self
+            end
+        end,
+        get_partition => fun(Self) -> maps:get(root, Self) end,
+        get_node_prefix => fun(Self) -> maps:get(node_prefix, Self) end,
+        get_path => fun(Self) -> maps:get(path, Self) end,
+        get_layer => fun(_Self) -> <<"partition">> end,
+        get_subspace => fun(_Self) ->
+            ?ERLFDB_ERROR({subspace_error, subspace_unsupported_for_partition})
+        end
+    }.
+
+
+init_directory(ParentNode, NodeName, PathName, Layer) ->
+    NodePrefix = get_node_prefix(ParentNode),
+    ParentPath = get_path(ParentNode),
+    #{
+        id => ?ERLFDB_EXTEND(NodePrefix, NodeName),
+        name => NodeName,
+        root => get_root(ParentNode),
+        path => path_append(ParentPath, PathName),
+        layer => Layer,
+
+        get_id => fun(Self) -> maps:get(id, Self) end,
+        get_name => fun(Self) -> maps:get(name, Self) end,
+        get_root => fun(Self) -> maps:get(root, Self) end,
+        get_root_for_path => fun(Self, Path) ->
+            NewPath = maps:get(path, Self) ++ path_init(Path),
+            get_root_for_path(maps:get(root, Self), NewPath)
+        end,
+        get_partition => fun(Self) -> maps:get(root, Self) end,
+        get_node_prefix => fun(Self) ->
+            Root = maps:get(root, Self),
+            get_node_prefix(Root)
+        end,
+        get_path => fun(Self) -> maps:get(path, Self) end,
+        get_layer => fun(Self) -> maps:get(layer, Self) end,
+        get_subspace => fun(Self) ->
+            erlfdb_subspace:create({}, maps:get(name, Self))
+        end
+    }.
+
+
+find(_Tx, Node, []) ->
+    Node;
+
+find(Tx, Node, [PathName | RestPath]) ->
+    NodeEntryId = ?ERLFDB_PACK(get_id(Node), {?SUBDIRS, PathName}),
+    case erlfdb:wait(erlfdb:get(Tx, NodeEntryId)) of
+        not_found ->
+            not_found;
+        ChildNodeName ->
+            ChildNode = init_node(Tx, Node, ChildNodeName, PathName),
+            find(Tx, ChildNode, RestPath)
+    end.
+
+
+find_deepest(_Tx, Node, []) ->
+    Node;
+
+find_deepest(Tx, Node, [PathName | RestPath]) ->
+    NodeEntryId = ?ERLFDB_PACK(get_id(Node), {?SUBDIRS, PathName}),
+    case erlfdb:wait(erlfdb:get(Tx, NodeEntryId)) of
+        not_found ->
+            Node;
+        ChildNodeName ->
+            ChildNode = init_node(Tx, Node, ChildNodeName, PathName),
+            find_deepest(Tx, ChildNode, RestPath)
+    end.
+
+
+create_or_open_int(TxObj, Node, {}, Layer) ->
+    create_or_open_int(TxObj, Node, [], Layer);
+
+create_or_open_int(_TxObj, Node, [], LayerIn) ->
+    Layer = case LayerIn of
+        <<>> -> <<>>;
+        null -> <<>>;
+        undefined -> <<>>;
+        Else when is_binary(Else) -> Else
+    end,
+    NodeLayer = get_layer(Node),
+    if Layer == <<>> orelse Layer == NodeLayer -> ok; true ->
+        ?ERLFDB_ERROR({open_error, layer_mismatch, Layer, NodeLayer})
+    end,
+    Node;
+
+create_or_open_int(TxObj, Node, PathIn, Layer) ->
+    {Root, Path} = adj_path(Node, PathIn),
+    erlfdb:transactional(TxObj, fun(Tx) ->
+        {ParentPath, [PathName]} = lists:split(length(Path) - 1, Path),
+
+        Parent = lists:foldl(fun(Name, CurrNode) ->
+            try
+                open_int(Tx, CurrNode, Name, <<>>)
+            catch error:{?MODULE, {open_error, path_missing, _}} ->
+                create_int(Tx, CurrNode, Name, <<>>, undefined)
+            end
+        end, Root, ParentPath),
+
+        try
+            open_int(Tx, Parent, PathName, Layer)
+        catch error:{?MODULE, {open_error, path_missing, _}} ->
+            create_int(Tx, Parent, PathName, Layer, undefined)
+        end
+    end).
+
+
+create_int(Tx, Node, PathIn, Layer, NodeNameIn) ->
+    Path = path_init(PathIn),
+    try
+        open_int(Tx, Node, Path, <<>>),
+        ?ERLFDB_ERROR({create_error, path_exists, Path})
+    catch error:{?MODULE, {open_error, path_missing, _}} ->
+        Deepest = find_deepest(Tx, Node, Path),
+        NodeName = create_node_name(Tx, Deepest, NodeNameIn),
+        {ParentPath, [PathName]} = lists:split(length(Path) - 1, Path),
+        case create_or_open_int(Tx, Node, ParentPath, <<>>) of
+            not_found ->
+                ?ERLFDB_ERROR({create_error, missing_parent, ParentPath});
+            Parent ->
+                check_version(Tx, Parent, write),
+                create_node(Tx, Parent, PathName, NodeName, Layer),
+                R = find(Tx, Parent, [PathName]),
+                if R /= not_found -> R; true ->
+                    erlang:error(broken)
+                end
+        end
+    end.
+
+
+create_node(Tx, Parent, PathName, NodeName, LayerIn) ->
+    NodeEntryId = ?ERLFDB_PACK(get_id(Parent), {?SUBDIRS, PathName}),
+    erlfdb:set(Tx, NodeEntryId, NodeName),
+
+    NodePrefix = get_node_prefix(Parent),
+    NodeLayerId = ?ERLFDB_PACK(NodePrefix, {NodeName, <<"layer">>}),
+    Layer = if LayerIn == undefined -> <<>>; true -> LayerIn end,
+    erlfdb:set(Tx, NodeLayerId, Layer).
+
+
+open_int(Tx, Node, PathIn, Layer) ->
+    check_version(Tx, Node, read),
+    Path = path_init(PathIn),
+    case find(Tx, Node, Path) of
+        not_found ->
+            ?ERLFDB_ERROR({open_error, path_missing, Path});
+        #{is_absolute_root := true} ->
+            ?ERLFDB_ERROR({open_error, cannot_open_root});
+        Opened ->
+            NodeLayer = get_layer(Opened),
+            if Layer == <<>> orelse Layer == NodeLayer -> ok; true ->
+                ?ERLFDB_ERROR({open_error, layer_mismatch, Layer, NodeLayer})
+            end,
+            Opened
+    end.
+
+
+remove_int(TxObj, Node, PathIn, IgnoreMissing) ->
+    Root = get_root_for_path(Node, PathIn),
+    {Root, Path} = adj_path(Root, Node, PathIn),
+    erlfdb:transactional(TxObj, fun(Tx) ->
+        check_version(Tx, Root, write),
+        case find(Tx, Root, Path) of
+            not_found when IgnoreMissing ->
+                ok;
+            not_found ->
+                ?ERLFDB_ERROR({remove_error, path_missing, Path});
+            #{is_absolute_root := true} ->
+                ?ERLFDB_ERROR({remove_error, cannot_remove_root});
+            ToRem ->
+                remove_recursive(Tx, ToRem),
+                remove_from_parent(Tx, ToRem)
+        end
+    end).
+
+
+remove_recursive(Tx, Node) ->
+    % Remove all subdirectories
+    lists:foreach(fun({_DirName, ChildNode}) ->
+        remove_recursive(Tx, ChildNode)
+    end, list(Tx, Node)),
+
+    % Delete all content for the node.
+    ContentSS = erlfdb_subspace:create({}, get_name(Node)),
+    {ContentStart, ContentEnd} = erlfdb_subspace:range(ContentSS),
+    erlfdb:clear_range(Tx, ContentStart, ContentEnd),
+
+    % Delete this node from the tree hierarchy
+    NodeSubspace = erlfdb_subspace:create({}, get_id(Node)),
+    {NodeStart, NodeEnd} = erlfdb_subspace:range(NodeSubspace),
+    erlfdb:clear_range(Tx, NodeStart, NodeEnd).
+
+
+remove_from_parent(Tx, Node) ->
+    {Root, Path} = adj_path(get_root_for_path(Node, []), Node, []),
+    {ParentPath, [PathName]} = lists:split(length(Path) - 1, Path),
+    Parent = find(Tx, Root, ParentPath),
+
+    NodeEntryId = ?ERLFDB_PACK(get_id(Parent), {?SUBDIRS, PathName}),
+    erlfdb:clear(Tx, NodeEntryId).
+
+
+check_manual_node_name(Root, Options) ->
+    AllowManual = maps:get(allow_manual_names, Root),
+    IsManual = lists:keyfind(node_name, 1, Options) /= false,
+    if not (IsManual and not AllowManual) -> ok; true ->
+        ?ERLFDB_ERROR({create_error, manual_node_names_prohibited})
+    end.
+
+
+create_node_name(Tx, Parent, NameIn) ->
+    #{
+        content_prefix := ContentPrefix,
+        allow_manual_names := AllowManual,
+        allocator := Allocator
+    } = get_root(Parent),
+    Name = case NameIn of
+        null -> undefined;
+        undefined -> undefined;
+        _ when is_binary(NameIn) -> NameIn
+    end,
+    case Name of
+        _ when Name == undefined ->
+            BaseId = erlfdb_hca:allocate(Allocator, Tx),
+            CPLen = size(ContentPrefix),
+            NewName = <<ContentPrefix:CPLen/binary, BaseId/binary>>,
+
+            KeysExist = erlfdb:get_range_startswith(Tx, NewName, [{limit, 1}]),
+            if KeysExist == [] -> ok; true ->
+                ?ERLFDB_ERROR({
+                        create_error,
+                        keys_exist_for_allocated_name,
+                        NewName
+                    })
+            end,
+
+            IsFree = is_prefix_free(erlfdb:snapshot(Tx), Parent, NewName),
+            if IsFree -> ok; true ->
+                ?ERLFDB_ERROR({
+                        create_error,
+                        manual_names_conflict_with_allocated_name,
+                        NewName
+                    })
+            end,
+
+            NewName;
+        _ when AllowManual andalso is_binary(Name) ->
+            case is_prefix_free(Tx, Parent, NameIn) of
+                true ->
+                    ok;
+                false ->
+                    ?ERLFDB_ERROR({create_error, node_name_in_use, NameIn})
+            end,
+            NameIn;
+        _ ->
+            ?ERLFDB_ERROR({create_error, manual_node_names_prohibited})
+    end.
+
+
+is_prefix_free(Tx, Parent, NodeName) ->
+    % We have to make sure that NodeName does not interact with
+    % anything that currently exists in the tree. This means that
+    % it must not be a prefix of any existing node id and also
+    % that no existing node id is a prefix of this NodeName.
+    %
+    % A motivating example for why is that deletion of nodes
+    % in the tree would end up deleting unrelated portions
+    % of the tree when node ids overlapped. There would also
+    % be other badness if keys overlapped with the layer
+    % or ?SUBDIRS spaces.
+
+    try
+        % An empty name would obviously be kind of bonkers.
+        if NodeName /= <<>> -> ok; true ->
+            throw(false)
+        end,
+
+        Root = get_root(Parent),
+        RootId = get_id(Root),
+        NodePrefix = get_node_prefix(Root),
+        NPLen = size(NodePrefix),
+
+        % First check that the special case of the root node
+        case bin_startswith(NodeName, RootId) of
+            true -> throw(false);
+            false -> ok
+        end,
+
+        % Check if any node id is a prefix of NodeName
+        Start1 = <<NodePrefix:NPLen/binary, 16#00>>,
+        End1 = ?ERLFDB_PACK(NodePrefix, {NodeName, null}),
+        Opts1 = [{reverse, true}, {limit, 1}, {streaming_mode, exact}],
+        Subspace = erlfdb_subspace:create({}, get_node_prefix(Parent)),
+        erlfdb:fold_range(Tx, Start1, End1, fun({Key, _} = _E, _) ->
+            KeyNodeId = element(1, erlfdb_subspace:unpack(Subspace, Key)),
+            case bin_startswith(NodeName, KeyNodeId) of
+                true -> throw(false);
+                false -> ok
+            end
+        end, nil, Opts1),
+
+        % Check if NodeName is a prefix of any existing key
+        Start2 = ?ERLFDB_EXTEND(NodePrefix, NodeName),
+        End2 = ?ERLFDB_EXTEND(NodePrefix, erlfdb_key:strinc(NodeName)),
+        Opts2 = [{limit, 1}, {streaming_mode, exact}],
+        case erlfdb:wait(erlfdb:get_range(Tx, Start2, End2, Opts2)) of
+            [_E | _] -> throw(false);
+            [] -> ok
+        end,
+
+        true
+    catch throw:false ->
+        false
+    end.
+
+
+bin_startswith(Subject, Prefix) ->
+    PrefixLen = size(Prefix),
+    case Subject of
+        <<Prefix:PrefixLen/binary, _/binary>> -> true;
+        _ -> false
+    end.
+
+
+check_version(Tx, Node, PermLevel) ->
+    Root = get_root(Node),
+    VsnKey = ?ERLFDB_EXTEND(get_id(Root), <<"version">>),
+    {LV1, LV2, _LV3} = ?LAYER_VERSION,
+    {Major, Minor, Patch} = case erlfdb:wait(erlfdb:get(Tx, VsnKey)) of
+        not_found when PermLevel == write ->
+            initialize_directory(Tx, VsnKey);
+        not_found ->
+            ?LAYER_VERSION;
+        VsnBin ->
+            <<
+                V1:32/little-unsigned,
+                V2:32/little-unsigned,
+                V3:32/little-unsigned
+            >> = VsnBin,
+            {V1, V2, V3}
+    end,
+
+    Path = get_path(Node),
+
+    if Major =< LV1 -> ok; true ->
+        ?ERLFDB_ERROR({version_error, unreadable, Path, {Major, Minor, Patch}})
+    end,
+
+    if not (Minor > LV2 andalso PermLevel /= read) -> ok; true ->
+        ?ERLFDB_ERROR({version_error, unwritable, Path, {Major, Minor, Patch}})
+    end.
+
+
+initialize_directory(Tx, VsnKey) ->
+    {V1, V2, V3} = ?LAYER_VERSION,
+    Packed = <<
+            V1:32/little-unsigned,
+            V2:32/little-unsigned,
+            V3:32/little-unsigned
+        >>,
+    erlfdb:set(Tx, VsnKey, Packed),
+    ?LAYER_VERSION.
+
+
+check_same_partition(OldNode, NewParentNode) ->
+    OldRoot = get_partition(OldNode),
+    NewRoot = get_root(NewParentNode),
+    if NewRoot == OldRoot -> ok; true ->
+        ?ERLFDB_ERROR({move_error, partition_mismatch, OldRoot, NewRoot})
+    end.
+
+
+adj_path(Node, PathIn) ->
+    adj_path(get_root(Node), Node, PathIn).
+
+
+adj_path(Root, Node, PathIn) ->
+    RootPathLen = length(get_path(Root)),
+    NodePath = get_path(Node),
+    NodeRelPath = lists:nthtail(RootPathLen, NodePath),
+    Path = NodeRelPath ++ path_init(PathIn),
+    {Root, Path}.
+
+
+path_init(<<_/binary>> = Bin) ->
+    check_utf8(0, Bin),
+    [{utf8, Bin}];
+
+path_init({utf8, <<_/binary>> = Bin} = Path) ->
+    check_utf8(0, Bin),
+    [Path];
+
+path_init(Path) when is_list(Path) ->
+    lists:flatmap(fun(Part) ->
+        path_init(Part)
+    end, Path);
+
+path_init(Path) when is_tuple(Path) ->
+    path_init(tuple_to_list(Path));
+
+path_init(Else) ->
+    ?ERLFDB_ERROR({path_error, invalid_path_component, Else}).
+
+
+check_utf8(Offset, Binary) ->
+    case Binary of
+        <<_:Offset/binary>> ->
+            true;
+        <<_:Offset/binary, _/utf8, Rest/binary>> ->
+            % Recalculating offset as a subtraction here is
+            % slightly odd but this is to avoid having to
+            % re-encode the utf8 code point and adding the
+            % size of that new binary.
+            check_utf8(size(Binary) - size(Rest), Binary);
+        <<_:Offset/binary, _/binary>> ->
+            ?ERLFDB_ERROR({path_error, invalid_utf8, Binary})
+    end.
+
+
+path_append(Path, Part) ->
+    Path ++ path_init(Part).
+
+
+check_not_subpath(OldPath, NewPath) ->
+    case lists:prefix(OldPath, NewPath) of
+        true ->
+            ?ERLFDB_ERROR({
+                    move_error,
+                    target_is_subdirectory,
+                    OldPath,
+                    NewPath
+                });
+        false ->
+            ok
+    end.
diff --git a/test/tester.es b/test/tester.es
index e0a28e5..101f535 100755
--- a/test/tester.es
+++ b/test/tester.es
@@ -4,6 +4,18 @@
 -mode(compile).
 
 
+-define(DIRECTORY_CREATE_OPS, [
+    <<"DIRECTORY_CREATE_SUBSPACE">>,
+    <<"DIRECTORY_CREATE_LAYER">>,
+    <<"DIRECTORY_CREATE_OR_OPEN">>,
+    <<"DIRECTORY_CREATE">>,
+    <<"DIRECTORY_OPEN">>,
+    <<"DIRECTORY_MOVE">>,
+    <<"DIRECTORY_MOVE_TO">>,
+    <<"DIRECTORY_OPEN_SUBSPACE">>
+]).
+
+
 -record(st, {
     db,
     tx_mgr,
@@ -16,10 +28,24 @@
     is_snapshot,
     last_version,
     pids,
-    directory_extension
+
+    % Directory Layer tests
+    is_directory_op,
+    dir_list,
+    dir_index,
+    dir_error_index
 }).
 
 
+init_rand() ->
+    case os:getenv("RANDOM_SEED") of
+        false ->
+            ok;
+        Seed ->
+            rand:seed(exsplus, {list_to_integer(Seed), 0, 0})
+    end.
+
+
 stack_create() ->
     Pid = spawn_link(fun() -> stack_loop([]) end),
     spawn(fun() ->
@@ -192,6 +218,20 @@
     stack_push(Pid, {Idx, Value}).
 
 
+stack_pop_tuples(St) ->
+    {Tuple} = stack_pop_tuples(St, 1),
+    Tuple.
+
+
+stack_pop_tuples(St, Count) ->
+    TupleList = lists:map(fun(_) ->
+        TupleSize = stack_pop(St),
+        TupleElems = stack_pop(St, TupleSize),
+        list_to_tuple(TupleElems)
+    end, lists:seq(1, Count)),
+    list_to_tuple(TupleList).
+
+
 get_transaction(TxName) ->
     get({'$erlfdb_tx', TxName}).
 
@@ -201,6 +241,23 @@
     put({'$erlfdb_tx', TxName}, Tx).
 
 
+switch_transaction(Db, TxName) ->
+    case get_transaction(TxName) of
+        undefined ->
+            new_transaction(Db, TxName);
+        _ ->
+            ok
+    end.
+
+
+has_prefix(Subject, Prefix) ->
+    PrefLen = size(Prefix),
+    case Subject of
+        <<Prefix:PrefLen/binary, _/binary>> -> true;
+        _ -> false
+    end.
+
+
 has_suffix(Subject, Suffix) ->
     SubjSize = size(Subject),
     SuffSize = size(Suffix),
@@ -235,8 +292,8 @@
 
 
 wait_for_empty(Db, Prefix) ->
-    erlfdb:transactional(Db, fun(Tr) ->
-        Future = erlfdb:get_range_startswith(Tr, Prefix, [{limit, 1}]),
+    erlfdb:transactional(Db, fun(Tx) ->
+        Future = erlfdb:get_range_startswith(Tx, Prefix, [{limit, 1}]),
         case erlfdb:wait(Future) of
             [_|_] -> erlang:error({erlfdb_error, 1020});
             [] -> ok
@@ -244,7 +301,21 @@
     end).
 
 
+append_dir(St, Dir) ->
+    case Dir of
+        not_found -> erlang:error(broken);
+        _ -> ok
+    end,
+    #st{
+        dir_list = DirList
+    } = St,
+    St#st{
+        dir_list = DirList ++ [Dir]
+    }.
+
+
 init_run_loop(Db, Prefix) ->
+    init_rand(),
     {StartKey, EndKey} = erlfdb_tuple:range({Prefix}),
     St = #st{
         db = Db,
@@ -257,10 +328,13 @@
         is_snapshot = undefined,
         last_version = 0,
         pids = [],
-        directory_extension = undefined
+
+        dir_list = [erlfdb_directory:root()],
+        dir_index = 0,
+        dir_error_index = 0
     },
     %% lists:foreach(fun({K, V}) ->
-    %%     io:format("'~s'~n'~s'~n", [py_repr(K), py_repr(V)])
+    %%     io:format("'~s'~n'~s'~n", [erlfdb_util:repr(K), erlfdb_util:repr(V)])
     %% end, St#st.instructions),
     run_loop(St).
 
@@ -290,6 +364,7 @@
 
     IsDb = has_suffix(Op, <<"_DATABASE">>),
     IsSS = has_suffix(Op, <<"_SNAPSHOT">>),
+    IsDir = has_prefix(Op, <<"DIRECTORY_">>),
 
     OpName = if not (IsDb or IsSS) -> Op; true ->
         binary:part(Op, {0, size(Op) - 9}) % strip off _DATABASE/_SNAPSHOT
@@ -307,7 +382,8 @@
     PreSt = St#st{
         op_tuple = OpTuple,
         is_db = IsDb,
-        is_snapshot = IsSS
+        is_snapshot = IsSS,
+        is_directory_op = IsDir
     },
 
     PostSt = try
@@ -324,7 +400,8 @@
         index = Index + 1,
         op_tuple = undefined,
         is_db = undefined,
-        is_snapshot = undefined
+        is_snapshot = undefined,
+        is_directory_op = undefined
     }).
 
 
@@ -382,6 +459,7 @@
 
 execute(_TxObj, St, <<"USE_TRANSACTION">>) ->
     TxName = stack_pop(St),
+    switch_transaction(St#st.db, TxName),
     St#st{
         tx_name = TxName
     };
@@ -664,10 +742,263 @@
     % TODO
     St;
 
+execute(TxObj, #st{is_directory_op = true} = St, Op) ->
+    #st{
+        dir_list = DirList,
+        dir_index = DirIdx
+    } = St,
+    Dir = lists:nth(DirIdx + 1, DirList),
+    try
+        execute_dir(TxObj, St, Dir, Op)
+    catch error:{erlfdb_directory, _} = _R ->
+        NewSt = case lists:member(Op, ?DIRECTORY_CREATE_OPS) of
+            true -> append_dir(St, null);
+            false -> St
+        end,
+        stack_push(St, <<"DIRECTORY_ERROR">>),
+        NewSt
+    end;
+
 execute(_TxObj, _St, UnknownOp) ->
     erlang:error({unknown_op, UnknownOp}).
 
 
+execute_dir(_TxObj, St, _Dir, <<"DIRECTORY_CREATE_SUBSPACE">>) ->
+    Path = stack_pop_tuples(St),
+    RawPrefix = stack_pop(St),
+    Subspace = erlfdb_subspace:create(Path, RawPrefix),
+    append_dir(St, Subspace);
+
+execute_dir(_TxObj, St, _Dir, <<"DIRECTORY_CREATE_LAYER">>) ->
+    #st{
+        dir_list = DirList
+    } = St,
+    [Index1, Index2, AllowManual] = stack_pop(St, 3),
+    NodeSS = lists:nth(Index1 + 1, DirList),
+    ContentSS = lists:nth(Index2 + 1, DirList),
+    case (NodeSS == null orelse ContentSS == null) of
+        true ->
+            append_dir(St, null);
+        false ->
+            Opts = [
+                {node_prefix, erlfdb_subspace:key(NodeSS)},
+                {content_prefix, erlfdb_subspace:key(ContentSS)},
+                {allow_manual_names, AllowManual == 1}
+            ],
+            append_dir(St, erlfdb_directory:root(Opts))
+    end;
+
+execute_dir(_TxObj, St, _Dir, <<"DIRECTORY_CHANGE">>) ->
+    #st{
+        dir_list = DirList,
+        dir_error_index = ErrIdx
+    } = St,
+    DirIdx1 = stack_pop(St),
+    DirIdx2 = case lists:nth(DirIdx1 + 1, DirList) of
+        null -> ErrIdx;
+        _ -> DirIdx1
+    end,
+    St#st{
+        dir_index = DirIdx2
+    };
+
+execute_dir(_TxObj, St, _Dir, <<"DIRECTORY_SET_ERROR_INDEX">>) ->
+    St#st{
+        dir_error_index = stack_pop(St)
+    };
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_CREATE_OR_OPEN">>) ->
+    Path = stack_pop_tuples(St),
+    Layer = stack_pop(St),
+    NewDir = erlfdb_directory:create_or_open(TxObj, Dir, Path, Layer),
+    append_dir(St, NewDir);
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_CREATE">>) ->
+    Path = stack_pop_tuples(St),
+    [Layer, Prefix] = stack_pop(St, 2),
+    Opts = [{layer, Layer}] ++ case Prefix of
+        null -> [];
+        _ -> [{node_name, Prefix}]
+    end,
+    NewDir = erlfdb_directory:create(TxObj, Dir, Path, Opts),
+    append_dir(St, NewDir);
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_OPEN">>) ->
+    Path = stack_pop_tuples(St),
+    Layer = stack_pop(St),
+    Opts = [{layer, Layer}],
+    NewDir = erlfdb_directory:open(TxObj, Dir, Path, Opts),
+    append_dir(St, NewDir);
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_MOVE">>) ->
+    {OldPath, NewPath} = stack_pop_tuples(St, 2),
+    NewDir = erlfdb_directory:move(TxObj, Dir, OldPath, NewPath),
+    append_dir(St, NewDir);
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_MOVE_TO">>) ->
+    NewAbsPath = stack_pop_tuples(St),
+    NewDir = erlfdb_directory:move_to(TxObj, Dir, NewAbsPath),
+    append_dir(St, NewDir);
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_REMOVE">>) ->
+    Count = stack_pop(St),
+    case Count == 0 of
+        true ->
+            erlfdb_directory:remove(TxObj, Dir);
+        false ->
+            Path = stack_pop_tuples(St),
+            erlfdb_directory:remove(TxObj, Dir, Path)
+    end,
+    St;
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_REMOVE_IF_EXISTS">>) ->
+    Count = stack_pop(St),
+    case Count == 0 of
+        true ->
+            erlfdb_directory:remove_if_exists(TxObj, Dir);
+        false ->
+            Path = stack_pop_tuples(St),
+            erlfdb_directory:remove_if_exists(TxObj, Dir, Path)
+    end,
+    St;
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_LIST">>) ->
+    Count = stack_pop(St),
+    Results = case Count == 0 of
+        true ->
+            erlfdb_directory:list(TxObj, Dir);
+        false ->
+            Path = stack_pop_tuples(St),
+            erlfdb_directory:list(TxObj, Dir, Path)
+    end,
+    Names = lists:map(fun({N, _}) -> N end, Results),
+    stack_push(St, erlfdb_tuple:pack(list_to_tuple(Names))),
+    St;
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_EXISTS">>) ->
+    Count = stack_pop(St),
+    Result = case Count == 0 of
+        true ->
+            erlfdb_directory:exists(TxObj, Dir);
+        false ->
+            Path = stack_pop_tuples(St),
+            erlfdb_directory:exists(TxObj, Dir, Path)
+    end,
+    case Result of
+        true -> stack_push(St, 1);
+        false -> stack_push(St, 0)
+    end,
+    St;
+
+execute_dir(_TxObj, St, Dir, <<"DIRECTORY_PACK_KEY">>) ->
+    Tuple = stack_pop_tuples(St),
+    Mod = get_dir_or_ss_mod(Dir),
+    Result = Mod:pack(Dir, Tuple),
+    stack_push(St, Result),
+    St;
+
+execute_dir(_TxObj, St, Dir, <<"DIRECTORY_UNPACK_KEY">>) ->
+    Key = stack_pop(St),
+    Mod = get_dir_or_ss_mod(Dir),
+    Tuple = Mod:unpack(Dir, Key),
+    lists:foreach(fun(Elem) ->
+        stack_push(St, Elem)
+    end, tuple_to_list(Tuple)),
+    St;
+
+execute_dir(_TxObj, St, Dir, <<"DIRECTORY_RANGE">>) ->
+    Tuple = stack_pop_tuples(St),
+    Mod = get_dir_or_ss_mod(Dir),
+    {Start, End} = Mod:range(Dir, Tuple),
+    stack_push(St, Start),
+    stack_push(St, End),
+    St;
+
+execute_dir(_TxObj, St, Dir, <<"DIRECTORY_CONTAINS">>) ->
+    Key = stack_pop(St),
+    Mod = get_dir_or_ss_mod(Dir),
+    Result = Mod:contains(Dir, Key),
+    case Result of
+        true -> stack_push(St, 1);
+        false -> stack_push(St, 0)
+    end,
+    St;
+
+execute_dir(_TxObj, St, Dir, <<"DIRECTORY_OPEN_SUBSPACE">>) ->
+    Path = stack_pop_tuples(St),
+    Mod = get_dir_or_ss_mod(Dir),
+    Subspace = Mod:subspace(Dir, Path),
+    append_dir(St, Subspace);
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_LOG_SUBSPACE">>) ->
+    #st{
+        dir_index = DirIdx
+    } = St,
+    Prefix = stack_pop(St),
+    LogKey = erlfdb_tuple:pack({DirIdx}, Prefix),
+    Mod = get_dir_or_ss_mod(Dir),
+    erlfdb:set(TxObj, LogKey, Mod:key(Dir)),
+    St;
+
+execute_dir(TxObj, St, Dir, <<"DIRECTORY_LOG_DIRECTORY">>) ->
+    #st{
+        dir_index = DirIdx
+    } = St,
+    Prefix = stack_pop(St),
+    LogPrefix = erlfdb_tuple:pack({DirIdx}, Prefix),
+
+    Exists = erlfdb_directory:exists(TxObj, Dir),
+    Children = case Exists of
+        true ->
+            ListResult = erlfdb_directory:list(TxObj, Dir),
+            Names = lists:map(fun({N, _}) -> N end, ListResult),
+            list_to_tuple(Names);
+        false ->
+            {}
+    end,
+
+    PathKey = erlfdb_tuple:pack({{utf8, <<"path">>}}, LogPrefix),
+    Path = erlfdb_tuple:pack(list_to_tuple(erlfdb_directory:get_path(Dir))),
+    erlfdb:set(TxObj, PathKey, Path),
+
+    LayerKey = erlfdb_tuple:pack({{utf8, <<"layer">>}}, LogPrefix),
+    Layer = erlfdb_tuple:pack({erlfdb_directory:get_layer(Dir)}),
+    erlfdb:set(TxObj, LayerKey, Layer),
+
+    ExistsKey = erlfdb_tuple:pack({{utf8, <<"exists">>}}, LogPrefix),
+    ExistsVal = erlfdb_tuple:pack({if Exists -> 1; true -> 0 end}),
+    erlfdb:set(TxObj, ExistsKey, ExistsVal),
+
+    ChildrenKey = erlfdb_tuple:pack({{utf8, <<"children">>}}, LogPrefix),
+    ChildrenVal = erlfdb_tuple:pack(Children),
+    erlfdb:set(TxObj, ChildrenKey, ChildrenVal),
+
+    St;
+
+execute_dir(_TxObj, St, Dir, <<"DIRECTORY_STRIP_PREFIX">>) ->
+    ToStrip = stack_pop(St),
+    Mod = get_dir_or_ss_mod(Dir),
+    DirKey = Mod:key(Dir),
+    DKLen = size(DirKey),
+    case ToStrip of
+        _ when not is_binary(ToStrip) ->
+            erlang:error({erlfdb_directory, prefix_not_a_binary});
+        <<DirKey:DKLen/binary, Rest/binary>> ->
+            stack_push(St, Rest);
+        _ ->
+            erlang:error({erlfdb_directory, {invalid_prefix_strip, ToStrip, DirKey}})
+    end,
+    St;
+
+execute_dir(_TxObj, _St, _Dir, UnknownOp) ->
+    erlang:error({unknown_directory_op, UnknownOp}).
+
+
+get_dir_or_ss_mod(Subspace) when element(1, Subspace) == erlfdb_subspace ->
+    erlfdb_subspace;
+get_dir_or_ss_mod(#{}) ->
+    erlfdb_directory.
+
 
 main([Prefix, APIVsn]) ->
     main([Prefix, APIVsn, ""]);