blob: 2f2fba687a01b136c01f9603cdcf4aac6b4a7a7e [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(smoosh_priority_queue).
-export([
new/1,
name/1,
in/4,
out/1,
info/1,
flush/1,
to_map/1,
from_map/3
]).
-record(priority_queue, {
name,
map,
tree
}).
new(Name) ->
#priority_queue{name = Name, map = #{}, tree = gb_trees:empty()}.
name(#priority_queue{name = Name}) ->
Name.
flush(#priority_queue{} = Q) ->
Q#priority_queue{map = #{}, tree = gb_trees:empty()}.
in(Key, Priority, Capacity, #priority_queue{map = Map, tree = Tree} = Q) ->
case maps:find(Key, Map) of
{ok, {Priority, _}} ->
% Priority matches, keep everything as is. This might be the case
% for upgrade channels, for instance, where priority is 1.
Q;
{ok, TreeKey} ->
Tree1 = gb_trees:delete(TreeKey, Tree),
insert(Key, Priority, Capacity, Q#priority_queue{tree = Tree1});
error ->
insert(Key, Priority, Capacity, Q)
end.
out(#priority_queue{map = Map, tree = Tree} = Q) ->
case gb_trees:is_empty(Tree) of
true ->
false;
false ->
{_, Key, Tree1} = gb_trees:take_largest(Tree),
{Key, Q#priority_queue{map = maps:remove(Key, Map), tree = Tree1}}
end.
qsize(#priority_queue{tree = Tree}) ->
gb_trees:size(Tree).
info(#priority_queue{tree = Tree} = Q) ->
case gb_trees:is_empty(Tree) of
true ->
#{size => qsize(Q), min => 0, max => 0};
false ->
{{Min, _}, _} = gb_trees:smallest(Tree),
{{Max, _}, _} = gb_trees:largest(Tree),
#{size => qsize(Q), min => Min, max => Max}
end.
insert(Key, Priority, Capacity, #priority_queue{tree = Tree, map = Map} = Q) ->
TreeKey = {Priority, make_ref()},
Tree1 = gb_trees:insert(TreeKey, Key, Tree),
truncate(Capacity, Q#priority_queue{map = Map#{Key => TreeKey}, tree = Tree1}).
truncate(Capacity, Q) when is_integer(Capacity), Capacity > 0 ->
truncate(Capacity, qsize(Q), Q).
truncate(Capacity, Size, Q) when is_integer(Capacity), Size =< Capacity ->
Q;
truncate(Capacity, Size, Q) when is_integer(Capacity), Size > 0 ->
#priority_queue{map = Map, tree = Tree} = Q,
{_, Key, Tree1} = gb_trees:take_smallest(Tree),
Q1 = Q#priority_queue{map = maps:remove(Key, Map), tree = Tree1},
truncate(Capacity, qsize(Q1), Q1).
% Serialize the queue to/from simple maps which look like #{Key => Priority}.
% The intent is for these to be used by the smoosh persistence facility.
%
to_map(#priority_queue{map = Map}) ->
Fun = fun(_Key, {Priority, _Ref}) ->
Priority
end,
maps:map(Fun, Map).
from_map(Name, Capacity, #{} = SerializedMap) ->
Fun = fun(Key, Priority, Acc) ->
insert(Key, Priority, Capacity, Acc)
end,
maps:fold(Fun, new(Name), SerializedMap).
-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").
-define(K1, <<"db1">>).
-define(K2, {<<"db1">>, <<"design/_doc1">>}).
-define(K3, {index_cleanup, <<"db1">>}).
-define(P1, 1).
-define(P2, 2.4).
-define(P3, infinity).
basics_test() ->
Q = new("foo"),
?assertMatch(#priority_queue{}, Q),
?assertEqual("foo", name(Q)),
?assertEqual(0, maps:get(size, info(Q))).
empty_test() ->
Q = new("foo"),
?assertEqual(false, out(Q)),
?assertEqual(Q, truncate(1, Q)),
?assertEqual(Q, flush(Q)),
?assertEqual(#{}, to_map(Q)),
?assertEqual(Q, from_map("foo", 1, #{})).
one_element_test() ->
Q0 = new("foo"),
Q = in(?K1, ?P1, 1, Q0),
?assertMatch(#priority_queue{}, Q),
?assertEqual(#{max => 1, min => 1, size => 1}, info(Q)),
?assertEqual(Q, truncate(1, Q)),
?assertMatch({?K1, #priority_queue{}}, out(Q)),
{?K1, Q2} = out(Q),
?assertEqual(Q2, Q0),
?assertEqual(#{?K1 => ?P1}, to_map(Q)),
Q3 = from_map("foo", 1, to_map(Q)),
?assertEqual("foo", name(Q3)),
?assertEqual(#{max => ?P1, min => ?P1, size => 1}, info(Q3)),
?assertEqual(to_map(Q), to_map(Q3)),
?assertEqual(Q0, flush(Q)).
multiple_elements_basics_test() ->
Q0 = new("foo"),
Q1 = in(?K1, ?P1, 10, Q0),
Q2 = in(?K2, ?P2, 10, Q1),
Q3 = in(?K3, ?P3, 10, Q2),
?assertEqual(#{max => ?P3, min => ?P1, size => 3}, info(Q3)),
?assertEqual([?K3, ?K2, ?K1], drain(Q3)).
update_element_same_priority_test() ->
Q0 = new("foo"),
Q1 = in(?K1, ?P1, 10, Q0),
?assertEqual(Q1, in(?K1, ?P1, 10, Q1)).
update_element_new_priority_test() ->
Q0 = new("foo"),
Q1 = in(?K1, ?P1, 10, Q0),
Q2 = in(?K2, ?P2, 10, Q1),
Q3 = in(?K1, ?P3, 10, Q2),
?assertEqual(#{max => ?P3, min => ?P2, size => 2}, info(Q3)),
?assertEqual([?K1, ?K2], drain(Q3)).
capacity_test() ->
Q0 = new("foo"),
Q1 = in(?K1, ?P1, 2, Q0),
% Capacity = 1, one element only remains
?assertEqual([?K2], drain(in(?K2, ?P2, 1, Q1))),
% Capacity = 2, only top two elements remain
Q2 = in(?K2, ?P2, 2, Q1),
Q3 = in(?K3, ?P3, 2, Q2),
?assertEqual([?K3, ?K2], drain(Q3)).
a_lot_of_elements_test() ->
N = 100000,
KVs = lists:map(
fun(I) ->
P = rand:uniform(100),
{{I, P}, P}
end,
lists:seq(1, N)
),
Q = from_map("foo", N, maps:from_list(KVs)),
?assertMatch(N, maps:get(size, info(Q))),
{_, Priorities} = lists:unzip(drain(Q)),
?assertEqual(lists:reverse(lists:sort(Priorities)), Priorities).
drain(Q) ->
lists:reverse(drain(out(Q), [])).
drain(false, Acc) ->
Acc;
drain({Key, Q}, Acc) ->
drain(out(Q), [Key | Acc]).
-endif.