blob: 2d64ea4bad178f2b8bbe812d6e94246877816fc6 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_btree).
-export([open/2, open/3, query_modify/4, add/2, add_remove/3]).
-export([fold/4, full_reduce/1, final_reduce/2, size/1, foldl/3, foldl/4]).
-export([fold_reduce/4, lookup/2, get_state/1, set_options/2]).
-export([extract/2, assemble/3, less/3]).
-include_lib("couch/include/couch_db.hrl").
-define(FILL_RATIO, 0.5).
extract(#btree{extract_kv = undefined}, Value) ->
Value;
extract(#btree{extract_kv = Extract}, Value) ->
Extract(Value).
assemble(#btree{assemble_kv = undefined}, Key, Value) ->
{Key, Value};
assemble(#btree{assemble_kv = Assemble}, Key, Value) ->
Assemble(Key, Value).
less(#btree{less = undefined}, A, B) ->
A < B;
less(#btree{less = Less}, A, B) ->
Less(A, B).
% pass in 'nil' for State if a new Btree.
open(State, Fd) ->
{ok, #btree{root = State, fd = Fd}}.
set_options(Bt, []) ->
Bt;
set_options(Bt, [{split, Extract} | Rest]) ->
set_options(Bt#btree{extract_kv = Extract}, Rest);
set_options(Bt, [{join, Assemble} | Rest]) ->
set_options(Bt#btree{assemble_kv = Assemble}, Rest);
set_options(Bt, [{less, Less} | Rest]) ->
set_options(Bt#btree{less = Less}, Rest);
set_options(Bt, [{reduce, Reduce} | Rest]) ->
set_options(Bt#btree{reduce = Reduce}, Rest);
set_options(Bt, [{compression, Comp} | Rest]) ->
set_options(Bt#btree{compression = Comp}, Rest).
open(State, Fd, Options) ->
{ok, set_options(#btree{root = State, fd = Fd}, Options)}.
get_state(#btree{root = Root}) ->
Root.
final_reduce(#btree{reduce = Reduce}, Val) ->
final_reduce(Reduce, Val);
final_reduce(Reduce, {[], []}) ->
Reduce(reduce, []);
final_reduce(_Bt, {[], [Red]}) ->
Red;
final_reduce(Reduce, {[], Reductions}) ->
Reduce(rereduce, Reductions);
final_reduce(Reduce, {KVs, Reductions}) ->
Red = Reduce(reduce, KVs),
final_reduce(Reduce, {[], [Red | Reductions]}).
fold_reduce(#btree{root = Root} = Bt, Fun, Acc, Options) ->
Dir = couch_util:get_value(dir, Options, fwd),
StartKey = couch_util:get_value(start_key, Options),
InEndRangeFun = make_key_in_end_range_function(Bt, Dir, Options),
KeyGroupFun = get_group_fun(Bt, Options),
try
{ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} =
reduce_stream_node(
Bt,
Dir,
Root,
StartKey,
InEndRangeFun,
undefined,
[],
[],
KeyGroupFun,
Fun,
Acc
),
if
GroupedKey2 == undefined ->
{ok, Acc2};
true ->
case Fun(GroupedKey2, {GroupedKVsAcc2, GroupedRedsAcc2}, Acc2) of
{ok, Acc3} -> {ok, Acc3};
{stop, Acc3} -> {ok, Acc3}
end
end
catch
throw:{stop, AccDone} -> {ok, AccDone}
end.
full_reduce(#btree{root = nil, reduce = Reduce}) ->
{ok, Reduce(reduce, [])};
full_reduce(#btree{root = Root}) ->
{ok, element(2, Root)}.
size(#btree{root = nil}) ->
0;
size(#btree{root = {_P, _Red}}) ->
% pre 1.2 format
nil;
size(#btree{root = {_P, _Red, Size}}) ->
Size.
get_group_fun(Bt, Options) ->
case couch_util:get_value(key_group_level, Options) of
exact ->
make_group_fun(Bt, exact);
0 ->
fun(_, _) -> true end;
N when is_integer(N), N > 0 ->
make_group_fun(Bt, N);
undefined ->
couch_util:get_value(key_group_fun, Options, fun(_, _) -> true end)
end.
make_group_fun(Bt, exact) ->
fun({Key1, _}, {Key2, _}) ->
case less(Bt, {Key1, nil}, {Key2, nil}) of
false ->
case less(Bt, {Key2, nil}, {Key1, nil}) of
false ->
true;
_ ->
false
end;
_ ->
false
end
end;
make_group_fun(Bt, GroupLevel) when is_integer(GroupLevel), GroupLevel > 0 ->
fun
GF({{p, Partition, Key1}, Val1}, {{p, Partition, Key2}, Val2}) ->
GF({Key1, Val1}, {Key2, Val2});
GF({[_ | _] = Key1, _}, {[_ | _] = Key2, _}) ->
SL1 = lists:sublist(Key1, GroupLevel),
SL2 = lists:sublist(Key2, GroupLevel),
case less(Bt, {SL1, nil}, {SL2, nil}) of
false ->
case less(Bt, {SL2, nil}, {SL1, nil}) of
false ->
true;
_ ->
false
end;
_ ->
false
end;
GF({Key1, _}, {Key2, _}) ->
case less(Bt, {Key1, nil}, {Key2, nil}) of
false ->
case less(Bt, {Key2, nil}, {Key1, nil}) of
false ->
true;
_ ->
false
end;
_ ->
false
end
end.
% wraps a 2 arity function with the proper 3 arity function
convert_fun_arity(Fun) when is_function(Fun, 2) ->
fun
(visit, KV, _Reds, AccIn) -> Fun(KV, AccIn);
(traverse, _K, _Red, AccIn) -> {ok, AccIn}
end;
convert_fun_arity(Fun) when is_function(Fun, 3) ->
fun
(visit, KV, Reds, AccIn) -> Fun(KV, Reds, AccIn);
(traverse, _K, _Red, AccIn) -> {ok, AccIn}
end;
convert_fun_arity(Fun) when is_function(Fun, 4) ->
% Already arity 4
Fun.
make_key_in_end_range_function(Bt, fwd, Options) ->
case couch_util:get_value(end_key_gt, Options) of
undefined ->
case couch_util:get_value(end_key, Options) of
undefined ->
fun(_Key) -> true end;
LastKey ->
fun(Key) -> not less(Bt, LastKey, Key) end
end;
EndKey ->
fun(Key) -> less(Bt, Key, EndKey) end
end;
make_key_in_end_range_function(Bt, rev, Options) ->
case couch_util:get_value(end_key_gt, Options) of
undefined ->
case couch_util:get_value(end_key, Options) of
undefined ->
fun(_Key) -> true end;
LastKey ->
fun(Key) -> not less(Bt, Key, LastKey) end
end;
EndKey ->
fun(Key) -> less(Bt, EndKey, Key) end
end.
foldl(Bt, Fun, Acc) ->
fold(Bt, Fun, Acc, []).
foldl(Bt, Fun, Acc, Options) ->
fold(Bt, Fun, Acc, Options).
fold(#btree{root = nil}, _Fun, Acc, _Options) ->
{ok, {[], []}, Acc};
fold(#btree{root = Root} = Bt, Fun, Acc, Options) ->
Dir = couch_util:get_value(dir, Options, fwd),
InRange = make_key_in_end_range_function(Bt, Dir, Options),
Result =
case couch_util:get_value(start_key, Options) of
undefined ->
stream_node(
Bt,
[],
Bt#btree.root,
InRange,
Dir,
convert_fun_arity(Fun),
Acc
);
StartKey ->
stream_node(
Bt,
[],
Bt#btree.root,
StartKey,
InRange,
Dir,
convert_fun_arity(Fun),
Acc
)
end,
case Result of
{ok, Acc2} ->
FullReduction = element(2, Root),
{ok, {[], [FullReduction]}, Acc2};
{stop, LastReduction, Acc2} ->
{ok, LastReduction, Acc2}
end.
add(Bt, InsertKeyValues) ->
add_remove(Bt, InsertKeyValues, []).
add_remove(Bt, InsertKeyValues, RemoveKeys) ->
{ok, [], Bt2} = query_modify(Bt, [], InsertKeyValues, RemoveKeys),
{ok, Bt2}.
query_modify(Bt, LookupKeys, InsertValues, RemoveKeys) ->
#btree{root = Root} = Bt,
InsertActions = lists:map(
fun(KeyValue) ->
{Key, Value} = extract(Bt, KeyValue),
{insert, Key, Value}
end,
InsertValues
),
RemoveActions = [{remove, Key, nil} || Key <- RemoveKeys],
FetchActions = [{fetch, Key, nil} || Key <- LookupKeys],
SortFun =
fun({OpA, A, _}, {OpB, B, _}) ->
case A == B of
% A and B are equal, sort by op.
true -> op_order(OpA) < op_order(OpB);
false -> less(Bt, A, B)
end
end,
Actions = lists:sort(SortFun, lists:append([InsertActions, RemoveActions, FetchActions])),
{ok, KeyPointers, QueryResults} = modify_node(Bt, Root, Actions, []),
{ok, NewRoot} = complete_root(Bt, KeyPointers),
{ok, QueryResults, Bt#btree{root = NewRoot}}.
% for ordering different operations with the same key.
% fetch < remove < insert
op_order(fetch) -> 1;
op_order(remove) -> 2;
op_order(insert) -> 3.
lookup(#btree{root = Root, less = Less} = Bt, Keys) ->
SortedKeys =
case Less of
undefined -> lists:sort(Keys);
_ -> lists:sort(Less, Keys)
end,
{ok, SortedResults} = lookup(Bt, Root, SortedKeys),
% We want to return the results in the same order as the keys were input
% but we may have changed the order when we sorted. So we need to put the
% order back into the results.
couch_util:reorder_results(Keys, SortedResults).
lookup(_Bt, nil, Keys) ->
{ok, [{Key, not_found} || Key <- Keys]};
lookup(Bt, Node, Keys) ->
Pointer = element(1, Node),
{NodeType, NodeList} = get_node(Bt, Pointer),
case NodeType of
kp_node ->
lookup_kpnode(Bt, list_to_tuple(NodeList), 1, Keys, []);
kv_node ->
lookup_kvnode(Bt, list_to_tuple(NodeList), 1, Keys, [])
end.
lookup_kpnode(_Bt, _NodeTuple, _LowerBound, [], Output) ->
{ok, lists:reverse(Output)};
lookup_kpnode(_Bt, NodeTuple, LowerBound, Keys, Output) when tuple_size(NodeTuple) < LowerBound ->
{ok, lists:reverse(Output, [{Key, not_found} || Key <- Keys])};
lookup_kpnode(Bt, NodeTuple, LowerBound, [FirstLookupKey | _] = LookupKeys, Output) ->
N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), FirstLookupKey),
{Key, PointerInfo} = element(N, NodeTuple),
SplitFun = fun(LookupKey) -> not less(Bt, Key, LookupKey) end,
case lists:splitwith(SplitFun, LookupKeys) of
{[], GreaterQueries} ->
lookup_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, Output);
{LessEqQueries, GreaterQueries} ->
{ok, Results} = lookup(Bt, PointerInfo, LessEqQueries),
lookup_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, lists:reverse(Results, Output))
end.
lookup_kvnode(_Bt, _NodeTuple, _LowerBound, [], Output) ->
{ok, lists:reverse(Output)};
lookup_kvnode(_Bt, NodeTuple, LowerBound, Keys, Output) when tuple_size(NodeTuple) < LowerBound ->
% keys not found
{ok, lists:reverse(Output, [{Key, not_found} || Key <- Keys])};
lookup_kvnode(Bt, NodeTuple, LowerBound, [LookupKey | RestLookupKeys], Output) ->
N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), LookupKey),
{Key, Value} = element(N, NodeTuple),
case less(Bt, LookupKey, Key) of
true ->
% LookupKey is less than Key
lookup_kvnode(Bt, NodeTuple, N, RestLookupKeys, [{LookupKey, not_found} | Output]);
false ->
case less(Bt, Key, LookupKey) of
true ->
% LookupKey is greater than Key
lookup_kvnode(Bt, NodeTuple, N + 1, RestLookupKeys, [
{LookupKey, not_found} | Output
]);
false ->
% LookupKey is equal to Key
lookup_kvnode(Bt, NodeTuple, N, RestLookupKeys, [
{LookupKey, {ok, assemble(Bt, LookupKey, Value)}} | Output
])
end
end.
complete_root(_Bt, []) ->
{ok, nil};
complete_root(_Bt, [{_Key, PointerInfo}]) ->
{ok, PointerInfo};
complete_root(Bt, KPs) ->
{ok, ResultKeyPointers} = write_node(Bt, kp_node, KPs),
complete_root(Bt, ResultKeyPointers).
%%%%%%%%%%%%% The chunkify function sucks! %%%%%%%%%%%%%
% It is inaccurate as it does not account for compression when blocks are
% written. Plus with the "case byte_size(term_to_binary(InList)) of" code
% it's probably really inefficient.
chunkify(InList) ->
BaseChunkSize = get_chunk_size(),
case ?term_size(InList) of
Size when Size > BaseChunkSize ->
NumberOfChunksLikely = ((Size div BaseChunkSize) + 1),
ChunkThreshold = Size div NumberOfChunksLikely,
chunkify(InList, ChunkThreshold, [], 0, []);
_Else ->
[InList]
end.
chunkify([], _ChunkThreshold, [], 0, OutputChunks) ->
lists:reverse(OutputChunks);
chunkify([], _ChunkThreshold, [Item], _OutListSize, [PrevChunk | RestChunks]) ->
NewPrevChunk = PrevChunk ++ [Item],
lists:reverse(RestChunks, [NewPrevChunk]);
chunkify([], _ChunkThreshold, OutList, _OutListSize, OutputChunks) ->
lists:reverse([lists:reverse(OutList) | OutputChunks]);
chunkify([InElement | RestInList], ChunkThreshold, OutList, OutListSize, OutputChunks) ->
case ?term_size(InElement) of
Size when (Size + OutListSize) > ChunkThreshold andalso OutList /= [] ->
chunkify(RestInList, ChunkThreshold, [], 0, [
lists:reverse([InElement | OutList]) | OutputChunks
]);
Size ->
chunkify(
RestInList, ChunkThreshold, [InElement | OutList], OutListSize + Size, OutputChunks
)
end.
-compile({inline, [get_chunk_size/0]}).
get_chunk_size() ->
try
list_to_integer(config:get("couchdb", "btree_chunk_size", "1279"))
catch
error:badarg ->
1279
end.
modify_node(Bt, RootPointerInfo, Actions, QueryOutput) ->
{NodeType, NodeList} =
case RootPointerInfo of
nil ->
{kv_node, []};
_Tuple ->
Pointer = element(1, RootPointerInfo),
get_node(Bt, Pointer)
end,
NodeTuple = list_to_tuple(NodeList),
{ok, NewNodeList, QueryOutput2} =
case NodeType of
kp_node -> modify_kpnode(Bt, NodeTuple, 1, Actions, [], QueryOutput);
kv_node -> modify_kvnode(Bt, NodeTuple, 1, Actions, [], QueryOutput)
end,
case NewNodeList of
% no nodes remain
[] ->
{ok, [], QueryOutput2};
% nothing changed
NodeList ->
{LastKey, _LastValue} = element(tuple_size(NodeTuple), NodeTuple),
{ok, [{LastKey, RootPointerInfo}], QueryOutput2};
_Else2 ->
{ok, ResultList} =
case RootPointerInfo of
nil ->
write_node(Bt, NodeType, NewNodeList);
_ ->
{LastKey, _LastValue} = element(tuple_size(NodeTuple), NodeTuple),
OldNode = {LastKey, RootPointerInfo},
write_node(Bt, OldNode, NodeType, NodeList, NewNodeList)
end,
{ok, ResultList, QueryOutput2}
end.
reduce_node(#btree{reduce = nil}, _NodeType, _NodeList) ->
[];
reduce_node(#btree{reduce = R}, kp_node, NodeList) ->
R(rereduce, [element(2, Node) || {_K, Node} <- NodeList]);
reduce_node(#btree{reduce = R} = Bt, kv_node, NodeList) ->
R(reduce, [assemble(Bt, K, V) || {K, V} <- NodeList]).
reduce_tree_size(kv_node, NodeSize, _KvList) ->
NodeSize;
reduce_tree_size(kp_node, NodeSize, []) ->
NodeSize;
reduce_tree_size(kp_node, _NodeSize, [{_K, {_P, _Red}} | _]) ->
% pre 1.2 format
nil;
reduce_tree_size(kp_node, _NodeSize, [{_K, {_P, _Red, nil}} | _]) ->
nil;
reduce_tree_size(kp_node, NodeSize, [{_K, {_P, _Red, Sz}} | NodeList]) ->
reduce_tree_size(kp_node, NodeSize + Sz, NodeList).
get_node(#btree{fd = Fd}, NodePos) ->
{ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos),
couch_cost:inc_get_node(NodeType),
{NodeType, NodeList}.
write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) ->
% split up nodes into smaller sizes
Chunks = chunkify(NodeList),
% now write out each chunk and return the KeyPointer pairs for those nodes
ToWrite = [{NodeType, Chunk} || Chunk <- Chunks],
WriteOpts = [{compression, Comp}],
{ok, PtrSizes} = couch_file:append_terms(Fd, ToWrite, WriteOpts),
{ok, group_kps(Bt, NodeType, Chunks, PtrSizes)}.
group_kps(_Bt, _NodeType, [], []) ->
[];
group_kps(Bt, NodeType, [Chunk | RestChunks], [{Ptr, Size} | RestPtrSizes]) ->
{LastKey, _} = lists:last(Chunk),
SubTreeSize = reduce_tree_size(NodeType, Size, Chunk),
KP = {LastKey, {Ptr, reduce_node(Bt, NodeType, Chunk), SubTreeSize}},
[KP | group_kps(Bt, NodeType, RestChunks, RestPtrSizes)].
write_node(Bt, _OldNode, NodeType, [], NewList) ->
write_node(Bt, NodeType, NewList);
write_node(Bt, _OldNode, NodeType, [_], NewList) ->
write_node(Bt, NodeType, NewList);
write_node(Bt, OldNode, NodeType, OldList, NewList) ->
case can_reuse_old_node(OldList, NewList) of
{true, Prefix, Suffix} ->
{ok, PrefixKVs} =
case Prefix of
[] -> {ok, []};
_ -> write_node(Bt, NodeType, Prefix)
end,
{ok, SuffixKVs} =
case Suffix of
[] -> {ok, []};
_ -> write_node(Bt, NodeType, Suffix)
end,
Result = PrefixKVs ++ [OldNode] ++ SuffixKVs,
{ok, Result};
false ->
write_node(Bt, NodeType, NewList)
end.
can_reuse_old_node(OldList, NewList) ->
{Prefix, RestNewList} = remove_prefix_kvs(hd(OldList), NewList),
case old_list_is_prefix(OldList, RestNewList, 0) of
{true, Size, Suffix} ->
ReuseThreshold = get_chunk_size() * ?FILL_RATIO,
if
Size < ReuseThreshold -> false;
true -> {true, Prefix, Suffix}
end;
false ->
false
end.
remove_prefix_kvs(KV1, [KV2 | Rest]) when KV2 < KV1 ->
{Prefix, RestNewList} = remove_prefix_kvs(KV1, Rest),
{[KV2 | Prefix], RestNewList};
remove_prefix_kvs(_, RestNewList) ->
{[], RestNewList}.
% No more KV's in the old node so its a prefix
old_list_is_prefix([], Suffix, Size) ->
{true, Size, Suffix};
% Some KV's have been removed from the old node
old_list_is_prefix(_OldList, [], _Size) ->
false;
% KV is equal in both old and new node so continue
old_list_is_prefix([KV | Rest1], [KV | Rest2], Acc) ->
old_list_is_prefix(Rest1, Rest2, ?term_size(KV) + Acc);
% KV mismatch between old and new node so not a prefix
old_list_is_prefix(_OldList, _NewList, _Acc) ->
false.
modify_kpnode(Bt, {}, _LowerBound, Actions, [], QueryOutput) ->
modify_node(Bt, nil, Actions, QueryOutput);
modify_kpnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) ->
{ok,
lists:reverse(
ResultNode,
bounded_tuple_to_list(
NodeTuple,
LowerBound,
tuple_size(NodeTuple),
[]
)
),
QueryOutput};
modify_kpnode(
Bt,
NodeTuple,
LowerBound,
[{_, FirstActionKey, _} | _] = Actions,
ResultNode,
QueryOutput
) ->
Sz = tuple_size(NodeTuple),
N = find_first_gteq(Bt, NodeTuple, LowerBound, Sz, FirstActionKey),
case N =:= Sz of
true ->
% perform remaining actions on last node
{_, PointerInfo} = element(Sz, NodeTuple),
{ok, ChildKPs, QueryOutput2} =
modify_node(Bt, PointerInfo, Actions, QueryOutput),
NodeList = lists:reverse(
ResultNode,
bounded_tuple_to_list(
NodeTuple,
LowerBound,
Sz - 1,
ChildKPs
)
),
{ok, NodeList, QueryOutput2};
false ->
{NodeKey, PointerInfo} = element(N, NodeTuple),
SplitFun = fun({_ActionType, ActionKey, _ActionValue}) ->
not less(Bt, NodeKey, ActionKey)
end,
{LessEqQueries, GreaterQueries} = lists:splitwith(SplitFun, Actions),
{ok, ChildKPs, QueryOutput2} =
modify_node(Bt, PointerInfo, LessEqQueries, QueryOutput),
ResultNode2 = lists:reverse(
ChildKPs,
bounded_tuple_to_revlist(
NodeTuple,
LowerBound,
N - 1,
ResultNode
)
),
modify_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, ResultNode2, QueryOutput2)
end.
bounded_tuple_to_revlist(_Tuple, Start, End, Tail) when Start > End ->
Tail;
bounded_tuple_to_revlist(Tuple, Start, End, Tail) ->
bounded_tuple_to_revlist(Tuple, Start + 1, End, [element(Start, Tuple) | Tail]).
bounded_tuple_to_list(Tuple, Start, End, Tail) ->
bounded_tuple_to_list2(Tuple, Start, End, [], Tail).
bounded_tuple_to_list2(_Tuple, Start, End, Acc, Tail) when Start > End ->
lists:reverse(Acc, Tail);
bounded_tuple_to_list2(Tuple, Start, End, Acc, Tail) ->
bounded_tuple_to_list2(Tuple, Start + 1, End, [element(Start, Tuple) | Acc], Tail).
find_first_gteq(_Bt, _Tuple, Start, End, _Key) when Start == End ->
End;
find_first_gteq(Bt, Tuple, Start, End, Key) ->
Mid = Start + ((End - Start) div 2),
{TupleKey, _} = element(Mid, Tuple),
case less(Bt, TupleKey, Key) of
true ->
find_first_gteq(Bt, Tuple, Mid + 1, End, Key);
false ->
find_first_gteq(Bt, Tuple, Start, Mid, Key)
end.
modify_kvnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) ->
{ok,
lists:reverse(
ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound, tuple_size(NodeTuple), [])
),
QueryOutput};
modify_kvnode(
Bt,
NodeTuple,
LowerBound,
[{ActionType, ActionKey, ActionValue} | RestActions],
ResultNode,
QueryOutput
) when LowerBound > tuple_size(NodeTuple) ->
case ActionType of
insert ->
modify_kvnode(
Bt,
NodeTuple,
LowerBound,
RestActions,
[{ActionKey, ActionValue} | ResultNode],
QueryOutput
);
remove ->
% just drop the action
modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, QueryOutput);
fetch ->
% the key/value must not exist in the tree
modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, [
{not_found, {ActionKey, nil}} | QueryOutput
])
end;
modify_kvnode(
Bt,
NodeTuple,
LowerBound,
[{ActionType, ActionKey, ActionValue} | RestActions],
AccNode,
QueryOutput
) ->
N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), ActionKey),
{Key, Value} = element(N, NodeTuple),
ResultNode = bounded_tuple_to_revlist(NodeTuple, LowerBound, N - 1, AccNode),
case less(Bt, ActionKey, Key) of
true ->
case ActionType of
insert ->
% ActionKey is less than the Key, so insert
modify_kvnode(
Bt,
NodeTuple,
N,
RestActions,
[{ActionKey, ActionValue} | ResultNode],
QueryOutput
);
remove ->
% ActionKey is less than the Key, just drop the action
modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, QueryOutput);
fetch ->
% ActionKey is less than the Key, the key/value must not exist in the tree
modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [
{not_found, {ActionKey, nil}} | QueryOutput
])
end;
false ->
% ActionKey and Key are maybe equal.
case less(Bt, Key, ActionKey) of
false ->
case ActionType of
insert ->
modify_kvnode(
Bt,
NodeTuple,
N + 1,
RestActions,
[{ActionKey, ActionValue} | ResultNode],
QueryOutput
);
remove ->
modify_kvnode(
Bt, NodeTuple, N + 1, RestActions, ResultNode, QueryOutput
);
fetch ->
% ActionKey is equal to the Key, insert into the QueryOuput, but re-process the node
% since an identical action key can follow it.
modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [
{ok, assemble(Bt, Key, Value)} | QueryOutput
])
end;
true ->
modify_kvnode(
Bt,
NodeTuple,
N + 1,
[{ActionType, ActionKey, ActionValue} | RestActions],
[{Key, Value} | ResultNode],
QueryOutput
)
end
end.
reduce_stream_node(
_Bt,
_Dir,
nil,
_KeyStart,
_InEndRangeFun,
GroupedKey,
GroupedKVsAcc,
GroupedRedsAcc,
_KeyGroupFun,
_Fun,
Acc
) ->
{ok, Acc, GroupedRedsAcc, GroupedKVsAcc, GroupedKey};
reduce_stream_node(
Bt,
Dir,
Node,
KeyStart,
InEndRangeFun,
GroupedKey,
GroupedKVsAcc,
GroupedRedsAcc,
KeyGroupFun,
Fun,
Acc
) ->
P = element(1, Node),
case get_node(Bt, P) of
{kp_node, NodeList} ->
NodeList2 = adjust_dir(Dir, NodeList),
reduce_stream_kp_node(
Bt,
Dir,
NodeList2,
KeyStart,
InEndRangeFun,
GroupedKey,
GroupedKVsAcc,
GroupedRedsAcc,
KeyGroupFun,
Fun,
Acc
);
{kv_node, KVs} ->
KVs2 = adjust_dir(Dir, KVs),
reduce_stream_kv_node(
Bt,
Dir,
KVs2,
KeyStart,
InEndRangeFun,
GroupedKey,
GroupedKVsAcc,
GroupedRedsAcc,
KeyGroupFun,
Fun,
Acc
)
end.
reduce_stream_kv_node(
Bt,
Dir,
KVs,
KeyStart,
InEndRangeFun,
GroupedKey,
GroupedKVsAcc,
GroupedRedsAcc,
KeyGroupFun,
Fun,
Acc
) ->
GTEKeyStartKVs =
case KeyStart of
undefined ->
KVs;
_ ->
DropFun =
case Dir of
fwd ->
fun({Key, _}) -> less(Bt, Key, KeyStart) end;
rev ->
fun({Key, _}) -> less(Bt, KeyStart, Key) end
end,
lists:dropwhile(DropFun, KVs)
end,
KVs2 = lists:takewhile(
fun({Key, _}) -> InEndRangeFun(Key) end, GTEKeyStartKVs
),
reduce_stream_kv_node2(
Bt,
KVs2,
GroupedKey,
GroupedKVsAcc,
GroupedRedsAcc,
KeyGroupFun,
Fun,
Acc
).
reduce_stream_kv_node2(
_Bt,
[],
GroupedKey,
GroupedKVsAcc,
GroupedRedsAcc,
_KeyGroupFun,
_Fun,
Acc
) ->
{ok, Acc, GroupedRedsAcc, GroupedKVsAcc, GroupedKey};
reduce_stream_kv_node2(
Bt,
[{Key, Value} | RestKVs],
GroupedKey,
GroupedKVsAcc,
GroupedRedsAcc,
KeyGroupFun,
Fun,
Acc
) ->
case GroupedKey of
undefined ->
reduce_stream_kv_node2(
Bt,
RestKVs,
Key,
[assemble(Bt, Key, Value)],
[],
KeyGroupFun,
Fun,
Acc
);
_ ->
case KeyGroupFun(GroupedKey, Key) of
true ->
reduce_stream_kv_node2(
Bt,
RestKVs,
GroupedKey,
[assemble(Bt, Key, Value) | GroupedKVsAcc],
GroupedRedsAcc,
KeyGroupFun,
Fun,
Acc
);
false ->
case Fun(GroupedKey, {GroupedKVsAcc, GroupedRedsAcc}, Acc) of
{ok, Acc2} ->
reduce_stream_kv_node2(
Bt,
RestKVs,
Key,
[assemble(Bt, Key, Value)],
[],
KeyGroupFun,
Fun,
Acc2
);
{stop, Acc2} ->
throw({stop, Acc2})
end
end
end.
reduce_stream_kp_node(
Bt,
Dir,
NodeList,
KeyStart,
InEndRangeFun,
GroupedKey,
GroupedKVsAcc,
GroupedRedsAcc,
KeyGroupFun,
Fun,
Acc
) ->
Nodes =
case KeyStart of
undefined ->
NodeList;
_ ->
case Dir of
fwd ->
lists:dropwhile(fun({Key, _}) -> less(Bt, Key, KeyStart) end, NodeList);
rev ->
RevKPs = lists:reverse(NodeList),
case
lists:splitwith(fun({Key, _}) -> less(Bt, Key, KeyStart) end, RevKPs)
of
{_Before, []} ->
NodeList;
{Before, [FirstAfter | _]} ->
[FirstAfter | lists:reverse(Before)]
end
end
end,
{InRange, MaybeInRange} = lists:splitwith(
fun({Key, _}) -> InEndRangeFun(Key) end, Nodes
),
NodesInRange =
case MaybeInRange of
[FirstMaybeInRange | _] when Dir =:= fwd ->
InRange ++ [FirstMaybeInRange];
_ ->
InRange
end,
reduce_stream_kp_node2(
Bt,
Dir,
NodesInRange,
KeyStart,
InEndRangeFun,
GroupedKey,
GroupedKVsAcc,
GroupedRedsAcc,
KeyGroupFun,
Fun,
Acc
).
reduce_stream_kp_node2(
Bt,
Dir,
[{_Key, NodeInfo} | RestNodeList],
KeyStart,
InEndRangeFun,
undefined,
[],
[],
KeyGroupFun,
Fun,
Acc
) ->
{ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} =
reduce_stream_node(
Bt,
Dir,
NodeInfo,
KeyStart,
InEndRangeFun,
undefined,
[],
[],
KeyGroupFun,
Fun,
Acc
),
reduce_stream_kp_node2(
Bt,
Dir,
RestNodeList,
KeyStart,
InEndRangeFun,
GroupedKey2,
GroupedKVsAcc2,
GroupedRedsAcc2,
KeyGroupFun,
Fun,
Acc2
);
reduce_stream_kp_node2(
Bt,
Dir,
NodeList,
KeyStart,
InEndRangeFun,
GroupedKey,
GroupedKVsAcc,
GroupedRedsAcc,
KeyGroupFun,
Fun,
Acc
) ->
{Grouped0, Ungrouped0} = lists:splitwith(
fun({Key, _}) ->
KeyGroupFun(GroupedKey, Key)
end,
NodeList
),
{GroupedNodes, UngroupedNodes} =
case Grouped0 of
[] ->
{Grouped0, Ungrouped0};
_ ->
[FirstGrouped | RestGrouped] = lists:reverse(Grouped0),
{RestGrouped, [FirstGrouped | Ungrouped0]}
end,
GroupedReds = [element(2, Node) || {_, Node} <- GroupedNodes],
case UngroupedNodes of
[{_Key, NodeInfo} | RestNodes] ->
{ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} =
reduce_stream_node(
Bt,
Dir,
NodeInfo,
KeyStart,
InEndRangeFun,
GroupedKey,
GroupedKVsAcc,
GroupedReds ++ GroupedRedsAcc,
KeyGroupFun,
Fun,
Acc
),
reduce_stream_kp_node2(
Bt,
Dir,
RestNodes,
KeyStart,
InEndRangeFun,
GroupedKey2,
GroupedKVsAcc2,
GroupedRedsAcc2,
KeyGroupFun,
Fun,
Acc2
);
[] ->
{ok, Acc, GroupedReds ++ GroupedRedsAcc, GroupedKVsAcc, GroupedKey}
end.
adjust_dir(fwd, List) ->
List;
adjust_dir(rev, List) ->
lists:reverse(List).
stream_node(Bt, Reds, Node, StartKey, InRange, Dir, Fun, Acc) ->
Pointer = element(1, Node),
{NodeType, NodeList} = get_node(Bt, Pointer),
case NodeType of
kp_node ->
stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc);
kv_node ->
stream_kv_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc)
end.
stream_node(Bt, Reds, Node, InRange, Dir, Fun, Acc) ->
Pointer = element(1, Node),
{NodeType, NodeList} = get_node(Bt, Pointer),
case NodeType of
kp_node ->
stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc);
kv_node ->
stream_kv_node2(Bt, Reds, [], adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc)
end.
stream_kp_node(_Bt, _Reds, [], _InRange, _Dir, _Fun, Acc) ->
{ok, Acc};
stream_kp_node(Bt, Reds, [{Key, Node} | Rest], InRange, Dir, Fun, Acc) ->
Red = element(2, Node),
case Fun(traverse, Key, Red, Acc) of
{ok, Acc2} ->
case stream_node(Bt, Reds, Node, InRange, Dir, Fun, Acc2) of
{ok, Acc3} ->
stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc3);
{stop, LastReds, Acc3} ->
{stop, LastReds, Acc3}
end;
{skip, Acc2} ->
stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc2);
{stop, Acc2} ->
{stop, Reds, Acc2}
end.
drop_nodes(_Bt, Reds, _StartKey, []) ->
{Reds, []};
drop_nodes(Bt, Reds, StartKey, [{NodeKey, Node} | RestKPs]) ->
case less(Bt, NodeKey, StartKey) of
true ->
drop_nodes(Bt, [element(2, Node) | Reds], StartKey, RestKPs);
false ->
{Reds, [{NodeKey, Node} | RestKPs]}
end.
stream_kp_node(Bt, Reds, KPs, StartKey, InRange, Dir, Fun, Acc) ->
{NewReds, NodesToStream} =
case Dir of
fwd ->
% drop all nodes sorting before the key
drop_nodes(Bt, Reds, StartKey, KPs);
rev ->
% keep all nodes sorting before the key, AND the first node to sort after
RevKPs = lists:reverse(KPs),
case lists:splitwith(fun({Key, _Pointer}) -> less(Bt, Key, StartKey) end, RevKPs) of
{_RevsBefore, []} ->
% everything sorts before it
{Reds, KPs};
{RevBefore, [FirstAfter | Drop]} ->
{[element(2, Node) || {_K, Node} <- Drop] ++ Reds, [
FirstAfter | lists:reverse(RevBefore)
]}
end
end,
case NodesToStream of
[] ->
{ok, Acc};
[{_Key, Node} | Rest] ->
case stream_node(Bt, NewReds, Node, StartKey, InRange, Dir, Fun, Acc) of
{ok, Acc2} ->
Red = element(2, Node),
stream_kp_node(Bt, [Red | NewReds], Rest, InRange, Dir, Fun, Acc2);
{stop, LastReds, Acc2} ->
{stop, LastReds, Acc2}
end
end.
stream_kv_node(Bt, Reds, KVs, StartKey, InRange, Dir, Fun, Acc) ->
DropFun =
case Dir of
fwd ->
fun({Key, _}) -> less(Bt, Key, StartKey) end;
rev ->
fun({Key, _}) -> less(Bt, StartKey, Key) end
end,
{LTKVs, GTEKVs} = lists:splitwith(DropFun, KVs),
AssembleLTKVs = [assemble(Bt, K, V) || {K, V} <- LTKVs],
stream_kv_node2(Bt, Reds, AssembleLTKVs, GTEKVs, InRange, Dir, Fun, Acc).
stream_kv_node2(_Bt, _Reds, _PrevKVs, [], _InRange, _Dir, _Fun, Acc) ->
{ok, Acc};
stream_kv_node2(Bt, Reds, PrevKVs, [{K, V} | RestKVs], InRange, Dir, Fun, Acc) ->
case InRange(K) of
false ->
{stop, {PrevKVs, Reds}, Acc};
true ->
couch_cost:inc_changes_processed(),
AssembledKV = assemble(Bt, K, V),
case Fun(visit, AssembledKV, {PrevKVs, Reds}, Acc) of
{ok, Acc2} ->
stream_kv_node2(
Bt, Reds, [AssembledKV | PrevKVs], RestKVs, InRange, Dir, Fun, Acc2
);
{stop, Acc2} ->
{stop, {PrevKVs, Reds}, Acc2}
end
end.