blob: 2982473f1a5252e5de6e29986aa3dbe6e18dd91f [file] [log] [blame]
%% @doc: Implementation of HyperLogLog with bias correction as
%% described in the Google paper,
%% http://static.googleusercontent.com/external_content/untrusted_dlcp/
%% research.google.com/en//pubs/archive/40671.pdf
-module(hyper).
%%-compile(native).
-include_lib("eunit/include/eunit.hrl").
-export([new/1, new/2, insert/2, card/1, union/1, union/2, intersect_card/2]).
-export([to_json/1, from_json/1, from_json/2, compact/1]).
-export([perf_report/0, bytes/1]).
-type precision() :: 4..16.
-type registers() :: any().
-record(hyper, {p :: precision(),
registers :: {module(), registers()}}).
-type value() :: binary().
-type filter() :: #hyper{}.
-export_type([filter/0, precision/0, registers/0]).
%%
%% API
%%
-spec new(precision()) -> filter().
new(P) ->
new(P, hyper_binary).
-spec new(precision(), module()) -> filter().
new(P, Mod) when 4 =< P andalso P =< 16 andalso is_atom(Mod) ->
#hyper{p = P, registers = {Mod, Mod:new(P)}}.
-spec insert(value(), filter()) -> filter().
insert(Value, #hyper{registers = {Mod, Registers}, p = P} = Hyper)
when is_binary(Value) ->
Hash = crypto:hash(sha, Value),
<<Index:P, RegisterValue:P/bitstring, _/bitstring>> = Hash,
ZeroCount = run_of_zeroes(RegisterValue) + 1,
case Mod:get(Index, Registers) of
{ok, Small} when Small < ZeroCount ->
Hyper#hyper{registers = {Mod, Mod:set(Index, ZeroCount, Registers)}};
{ok, Large} when ZeroCount =< Large ->
Hyper;
undefined ->
Hyper#hyper{registers = {Mod, Mod:set(Index, ZeroCount, Registers)}}
end;
insert(_Value, _Hyper) ->
error(badarg).
-spec union([filter()]) -> filter().
union(Filters) when is_list(Filters) ->
%% Must have the same P and backend
case lists:usort(lists:map(fun (#hyper{p = P, registers = {Mod, _}}) ->
{P, Mod}
end, Filters)) of
[{_P, Mod}] ->
Registers = lists:map(fun (#hyper{registers = {_, R}}) ->
R
end, Filters),
[First | _] = Filters,
First#hyper{registers = {Mod, Mod:max_merge(Registers)}}
end.
union(#hyper{registers = {Mod, SmallRegisters}} = Small,
#hyper{registers = {Mod, BigRegisters}} = Big)
when Small#hyper.p =:= Big#hyper.p ->
NewRegisters = Mod:max_merge(SmallRegisters, BigRegisters),
Big#hyper{registers = {Mod, NewRegisters}}.
%% NOTE: use with caution, no guarantees on accuracy.
-spec intersect_card(filter(), filter()) -> float().
intersect_card(Left, Right) when Left#hyper.p =:= Right#hyper.p ->
max(0.0, (card(Left) + card(Right)) - card(union(Left, Right))).
-spec card(filter()) -> float().
card(#hyper{registers = {Mod, Registers0}, p = P}) ->
M = trunc(pow(2, P)),
Registers = Mod:compact(Registers0),
RegisterSum = Mod:register_sum(Registers),
E = alpha(M) * pow(M, 2) / RegisterSum,
Ep = case E =< 5 * M of
true -> E - estimate_bias(E, P);
false -> E
end,
V = Mod:zero_count(Registers),
H = case V of
0 ->
Ep;
_ ->
M * math:log(M / V)
end,
case H =< hyper_const:threshold(P) of
true ->
H;
false ->
Ep
end.
bytes(#hyper{registers = {Mod, Registers}}) ->
Mod:bytes(Registers).
compact(#hyper{registers = {Mod, Registers}} = Hyper) ->
Hyper#hyper{registers = {Mod, Mod:compact(Registers)}}.
%%
%% SERIALIZATION
%%
-spec to_json(filter()) -> any().
to_json(#hyper{p = P, registers = {Mod, Registers}}) ->
Compact = Mod:compact(Registers),
{[
{<<"p">>, P},
{<<"registers">>, base64:encode(
zlib:gzip(
Mod:encode_registers(Compact)))}
]}.
-spec from_json(any()) -> filter().
from_json(Struct) ->
from_json(Struct, hyper_gb).
-spec from_json(any(), module()) -> filter().
from_json({Struct}, Mod) ->
P = proplists:get_value(<<"p">>, Struct),
Bytes = zlib:gunzip(
base64:decode(
proplists:get_value(<<"registers">>, Struct))),
Registers = Mod:decode_registers(Bytes, P),
#hyper{p = P, registers = {Mod, Registers}}.
%%
%% HELPERS
%%
alpha(16) -> 0.673;
alpha(32) -> 0.697;
alpha(64) -> 0.709;
alpha(M) -> 0.7213 / (1 + 1.079 / M).
pow(X, Y) ->
math:pow(X, Y).
run_of_zeroes(B) ->
run_of_zeroes(1, B).
run_of_zeroes(I, B) ->
case B of
<<0:I, _/bitstring>> ->
run_of_zeroes(I + 1, B);
_ ->
I - 1
end.
estimate_bias(E, P) ->
BiasVector = list_to_tuple(hyper_const:bias_data(P)),
NearestNeighbours = nearest_neighbours(E, list_to_tuple(hyper_const:estimate_data(P))),
lists:sum([element(Index, BiasVector) || Index <- NearestNeighbours])
/ length(NearestNeighbours).
nearest_neighbours(E, Vector) ->
Distances = lists:map(fun (Index) ->
V = element(Index, Vector),
{pow((E - V), 2), Index}
end, lists:seq(1, size(Vector))),
SortedDistances = lists:keysort(1, Distances),
{_, Indexes} = lists:unzip(lists:sublist(SortedDistances, 6)),
Indexes.
%%
%% TESTS
%%
hyper_test_() ->
{foreach, fun () -> ok end, fun (_) -> ok end,
[
?_test(basic_t()),
?_test(serialization_t()),
?_test(backend_t()),
?_test(encoding_t()),
?_test(register_sum_t()),
{timeout, 10, ?_test(error_range_t())},
?_test(many_union_t()),
?_test(union_t()),
?_test(small_big_union_t()),
?_test(intersect_card_t())
]}.
basic_t() ->
lists:foreach(fun (Mod) ->
?assertEqual(1, trunc(card(insert(<<"1">>, new(4, Mod)))))
end, [hyper_bisect, hyper_binary, hyper_gb, hyper_array]).
serialization_t() ->
Mod = hyper_binary,
Hyper = compact(insert_many(generate_unique(10), new(5, Mod))),
{hyper_binary, L} = Hyper#hyper.registers,
{hyper_binary, R} = (compact(from_json(to_json(Hyper), Mod)))#hyper.registers,
?assertEqual(trunc(card(Hyper)), trunc(card(from_json(to_json(Hyper), Mod)))),
?assertEqual(Hyper#hyper.p, (from_json(to_json(Hyper), Mod))#hyper.p),
?assertEqual(L, R).
backend_t() ->
Values = generate_unique(15),
Gb = compact(insert_many(Values, new(7, hyper_gb))),
Array = compact(insert_many(Values, new(7, hyper_array))),
Bisect = compact(insert_many(Values, new(7, hyper_bisect))),
Binary = compact(insert_many(Values, new(7, hyper_binary))),
?assertEqual(card(Gb), card(Array)),
?assertEqual(card(Gb), card(Bisect)),
?assertEqual(card(Gb), card(Binary)),
{hyper_gb , GbRegisters} = Gb#hyper.registers,
{hyper_array , ArrayRegisters} = Array#hyper.registers,
{hyper_bisect, BisectRegisters} = Bisect#hyper.registers,
{hyper_binary, BinaryRegisters} = Binary#hyper.registers,
%% error_logger:info_msg("Gb: ~p~nArray: ~p~nBisect: ~p~nBinary: ~p~n",
%% [hyper_gb:encode_registers(GbRegisters),
%% hyper_array:encode_registers(ArrayRegisters),
%% hyper_bisect:encode_registers(BisectRegisters),
%% hyper_binary:encode_registers(BinaryRegisters)]),
?assertEqual(hyper_gb:encode_registers(GbRegisters),
hyper_array:encode_registers(ArrayRegisters)),
?assertEqual(hyper_gb:encode_registers(GbRegisters),
hyper_bisect:encode_registers(BisectRegisters)),
?assertEqual(hyper_gb:encode_registers(GbRegisters),
hyper_binary:encode_registers(BinaryRegisters)),
{_, {GbSerialized, _}} = (from_json(to_json(Array), hyper_gb))#hyper.registers,
?assertEqual(gb_trees:to_list(element(1, GbRegisters)),
gb_trees:to_list(GbSerialized)),
?assertEqual(card(Gb), card(from_json(to_json(Array), hyper_gb))),
?assertEqual(Array, from_json(to_json(Array), hyper_array)),
?assertEqual(Array, from_json(to_json(Bisect), hyper_array)),
?assertEqual(Bisect, from_json(to_json(Array), hyper_bisect)),
?assertEqual(to_json(Gb), to_json(Array)),
?assertEqual(to_json(Gb), to_json(Bisect)),
?assertEqual(to_json(Gb), to_json(Binary)).
encoding_t() ->
Hyper = insert_many(generate_unique(10), new(4)),
?assertEqual(trunc(card(Hyper)), trunc(card(from_json(to_json(Hyper))))).
register_sum_t() ->
Mods = [hyper_array, hyper_gb, hyper_bisect, hyper_binary],
P = 4,
M = trunc(math:pow(2, P)),
SetRegisters = [1, 5, 10],
RegisterValue = 3,
ExpectedSum =
(math:pow(2, -0) * M)
- (math:pow(2, -0) * length(SetRegisters))
+ (math:pow(2, -RegisterValue) * length(SetRegisters)),
[begin
Registers = lists:foldl(fun (I, Acc) ->
Mod:set(I, RegisterValue, Acc)
end, Mod:new(P), SetRegisters),
?assertEqual({Mod, ExpectedSum}, {Mod, Mod:register_sum(Registers)})
end || Mod <- Mods].
error_range_t() ->
Mods = [hyper_gb, hyper_array, hyper_bisect, hyper_binary],
Run = fun (Cardinality, P, Mod) ->
lists:foldl(fun (V, H) ->
insert(V, H)
end, new(P, Mod), generate_unique(Cardinality))
end,
ExpectedError = 0.02,
P = 14,
random:seed(1, 2, 3),
[begin
Estimate = trunc(card(Run(Card, P, Mod))),
?assert(abs(Estimate - Card) < Card * ExpectedError)
end || Card <- lists:seq(1000, 50000, 5000),
Mod <- Mods].
many_union_t() ->
random:seed(1, 2, 3),
Card = 1000,
NumSets = 3,
Sets = [sets:from_list(generate_unique(Card)) || _ <- lists:seq(1, NumSets)],
Filters = lists:map(fun (S) ->
insert_many(sets:to_list(S),
new(10, hyper_bisect))
end, Sets),
?assert(abs(sets:size(sets:union(Sets)) - card(union(Filters)))
< (Card * NumSets) * 0.1).
union_t() ->
random:seed(1, 2, 3),
LeftDistinct = sets:from_list(generate_unique(10000)),
RightDistinct = sets:from_list(generate_unique(5000)
++ lists:sublist(sets:to_list(LeftDistinct),
5000)),
LeftHyper = insert_many(sets:to_list(LeftDistinct), new(13)),
RightHyper = insert_many(sets:to_list(RightDistinct), new(13)),
UnionHyper = union(LeftHyper, RightHyper),
Intersection = card(LeftHyper) + card(RightHyper) - card(UnionHyper),
?assert(abs(card(UnionHyper) - sets:size(sets:union(LeftDistinct, RightDistinct)))
< 200),
?assert(abs(Intersection - sets:size(
sets:intersection(LeftDistinct, RightDistinct)))
< 200).
small_big_union_t() ->
random:seed(1, 2, 3),
SmallCard = 100,
BigCard = 15000, % switches to dense at 10922 items
SmallSet = sets:from_list(generate_unique(SmallCard)),
BigSet = sets:from_list(generate_unique(BigCard)),
SmallHyper = insert_many(sets:to_list(SmallSet), new(15, hyper_bisect)),
BigHyper = insert_many(sets:to_list(BigSet), new(15, hyper_bisect)),
?assertMatch({hyper_bisect, {sparse, _, _, _}}, SmallHyper#hyper.registers),
?assertMatch({hyper_bisect, {dense, _}}, BigHyper#hyper.registers),
UnionHyper = union(SmallHyper, BigHyper),
TrueUnion = sets:size(sets:union(SmallSet, BigSet)),
?assert(abs(card(UnionHyper) - TrueUnion) < TrueUnion * 0.01).
intersect_card_t() ->
random:seed(1, 2, 3),
LeftDistinct = sets:from_list(generate_unique(10000)),
RightDistinct = sets:from_list(generate_unique(5000)
++ lists:sublist(sets:to_list(LeftDistinct),
5000)),
LeftHyper = insert_many(sets:to_list(LeftDistinct), new(13)),
RightHyper = insert_many(sets:to_list(RightDistinct), new(13)),
IntersectCard = intersect_card(LeftHyper, RightHyper),
?assert(IntersectCard =< hyper:card(hyper:union(LeftHyper, RightHyper))),
%% NOTE: we can't really say much about the error here,
%% so just pick something and see if the intersection makes sense
Error = 0.05,
?assert((abs(5000 - IntersectCard) / 5000) =< Error).
%% report_wrapper_test_() ->
%% [{timeout, 600000000, ?_test(estimate_report())}].
estimate_report() ->
random:seed(erlang:now()),
Ps = lists:seq(10, 16, 1),
Cardinalities = [100, 1000, 10000, 100000, 1000000],
Repetitions = 50,
%% Ps = [4, 5],
%% Cardinalities = [100],
%% Repetitions = 100,
Stats = [run_report(P, Card, Repetitions) || P <- Ps,
Card <- Cardinalities],
error_logger:info_msg("~p~n", [Stats]),
Result =
"p,card,mean,p99,p1,bytes~n" ++
lists:map(fun ({P, Card, Mean, P99, P1, Bytes}) ->
io_lib:format("~p,~p,~.2f,~.2f,~.2f,~p~n",
[P, Card, Mean, P99, P1, Bytes])
end, Stats),
error_logger:info_msg(Result),
ok = file:write_file("../data.csv", io_lib:format(Result, [])).
run_report(P, Card, Repetitions) ->
Estimations = lists:map(fun (_) ->
Elements = generate_unique(Card),
abs(Card - card(insert_many(Elements, new(P))))
end, lists:seq(1, Repetitions)),
error_logger:info_msg("p=~p, card=~p, reps=~p~nestimates=~p~n",
[P, Card, Repetitions, Estimations]),
Hist = basho_stats_histogram:update_all(
Estimations,
basho_stats_histogram:new(
0,
lists:max(Estimations),
length(Estimations))),
{_, Mean, _, _, _} = basho_stats_histogram:summary_stats(Hist),
P99 = basho_stats_histogram:quantile(0.99, Hist),
P1 = basho_stats_histogram:quantile(0.01, Hist),
{P, Card, Mean, P99, P1, trunc(pow(2, P))}.
generate_unique(N) ->
generate_unique(lists:usort(random_bytes(N)), N).
generate_unique(L, N) ->
case length(L) of
N ->
L;
Less ->
generate_unique(lists:usort(random_bytes(N - Less) ++ L), N)
end.
random_bytes(N) ->
random_bytes([], N).
random_bytes(Acc, 0) -> Acc;
random_bytes(Acc, N) ->
Int = random:uniform(100000000000000),
random_bytes([<<Int:64/integer>> | Acc], N-1).
insert_many(L, Hyper) ->
lists:foldl(fun insert/2, Hyper, L).
%%
%% REPORTS
%%
perf_report() ->
Ps = [15],
Cards = [1, 100, 1000, 5000, 10000, 15000, 25000, 50000, 100000, 1000000],
Mods = [hyper_gb, hyper_array, hyper_bisect, hyper_binary],
Repeats = 1,
random:seed(1, 2, 3),
Time = fun (F, Args) ->
Run = fun () ->
Parent = self(),
Pid = spawn_link(
fun () ->
{ElapsedUs, _} = timer:tc(F, Args),
Parent ! {self(), ElapsedUs}
end),
receive {Pid, ElapsedUs} -> ElapsedUs end
end,
lists:sum([Run() || _ <- lists:seq(1, Repeats)]) / Repeats
end,
R = [begin
InsertUs = Time(fun (Values, H) ->
insert_many(Values, H)
end,
[generate_unique(Card), new(P, Mod)]),
UnionUs = Time(fun union/2,
[insert_many(generate_unique(Card div 10), new(P, Mod)),
insert_many(generate_unique(Card), new(P, Mod))]),
CardUs = Time(fun card/1,
[insert_many(generate_unique(Card), new(P, Mod))]),
ToJsonUs = Time(fun to_json/1,
[insert_many(generate_unique(Card), new(P, Mod))]),
Filter = insert_many(generate_unique(Card), new(P, Mod)),
{Mod, Registers} = Filter#hyper.registers,
Fill = Mod:fold(fun (_, V, Acc) when V > 0 -> Acc+1;
(_, _, Acc) -> Acc
end, 0, Registers),
Bytes = bytes(Filter),
{Mod, P, Card, Fill, Bytes,
InsertUs / Card, UnionUs, CardUs, ToJsonUs}
end || Mod <- Mods,
P <- Ps,
Card <- Cards],
io:format("~s ~s ~s ~s ~s ~s ~s ~s ~s~n",
[string:left("module" , 12, $ ),
string:left("P" , 4, $ ),
string:right("card" , 8, $ ),
string:right("fill" , 6, $ ),
string:right("bytes" , 10, $ ),
string:right("insert us" , 10, $ ),
string:right("union ms" , 10, $ ),
string:right("card ms" , 10, $ ),
string:right("json ms" , 10, $ )
]),
lists:foreach(fun ({Mod, P, Card, Fill, Bytes,
AvgInsertUs, AvgUnionUs, AvgCardUs, AvgToJsonUs}) ->
M = trunc(math:pow(2, P)),
Filled = lists:flatten(io_lib:format("~.2f", [Fill / M])),
AvgInsertUsL = lists:flatten(
io_lib:format("~.2f", [AvgInsertUs])),
UnionMs = lists:flatten(
io_lib:format("~.2f", [AvgUnionUs / 1000])),
CardMs = lists:flatten(
io_lib:format("~.2f", [AvgCardUs / 1000])),
ToJsonMs = lists:flatten(
io_lib:format("~.2f", [AvgToJsonUs / 1000])),
io:format("~s ~s ~s ~s ~s ~s ~s ~s ~s~n",
[
string:left(atom_to_list(Mod) , 12, $ ),
string:left(integer_to_list(P) , 4, $ ),
string:right(integer_to_list(Card) , 8, $ ),
string:right(Filled , 6, $ ),
string:right(integer_to_list(Bytes), 10, $ ),
string:right(AvgInsertUsL , 10, $ ),
string:right(UnionMs , 10, $ ),
string:right(CardMs , 10, $ ),
string:right(ToJsonMs , 10, $ )
])
end, R).