blob: a6d5181a12512ce65ce604422eb89070be684c16 [file] [log] [blame]
%% @doc: Implementation of HyperLogLog with bias correction as
%% described in the Google paper,
%% http://static.googleusercontent.com/external_content/untrusted_dlcp/
%% research.google.com/en//pubs/archive/40671.pdf
-module(hyper).
-include_lib("eunit/include/eunit.hrl").
-export([new/1, insert/2, card/1, union/1, union/2, intersect_card/2]).
-export([to_json/1, from_json/1]).
-compile(native).
-record(hyper, {p, registers}).
-type precision() :: 4..16.
-type value() :: binary().
-type filter() :: #hyper{}.
-export_type([filter/0]).
%%
%% API
%%
-spec new(precision()) -> filter().
new(P) when 4 =< P andalso P =< 16 ->
M = trunc(pow(2, P)),
Registers = array:new([{size, M}, {fixed, true}, {default, 0}]),
#hyper{p = P, registers = Registers}.
-spec insert(value(), filter()) -> filter().
insert(Value, #hyper{registers = Registers, p = P} = Hyper)
when is_binary(Value) ->
Hash = crypto:hash(sha, Value),
<<Index:P, RegisterValue:P/bitstring, _/bitstring>> = Hash,
ZeroCount = run_of_zeroes(RegisterValue) + 1,
case array:get(Index, Hyper#hyper.registers) < ZeroCount of
true ->
Hyper#hyper{registers = array:set(Index, ZeroCount, Registers)};
false ->
Hyper
end;
insert(_Value, _Hyper) ->
error(badarg).
-spec union([filter()]) -> filter().
union(Filters) when is_list(Filters) ->
%% Must have the same P
case lists:usort(lists:map(fun (H) -> H#hyper.p end, Filters)) of
[P] ->
lists:foldl(fun union/2, hyper:new(P), Filters)
end.
union(#hyper{registers = LeftRegisters} = Left,
#hyper{registers = RightRegisters} = Right) when
Left#hyper.p =:= Right#hyper.p ->
NewRegisters = array:sparse_foldl(
fun (Index, LeftValue, Registers) ->
Max = max(LeftValue, array:get(Index, Registers)),
array:set(Index, Max, Registers)
end, RightRegisters, LeftRegisters),
Left#hyper{registers = NewRegisters}.
%% NOTE: use with caution, no guarantees on accuracy.
-spec intersect_card(filter(), filter()) -> float().
intersect_card(Left, Right) when Left#hyper.p =:= Right#hyper.p ->
max(0.0, (card(Left) + card(Right)) - card(union(Left, Right))).
-spec card(filter()) -> float().
card(#hyper{registers = Registers, p = P}) ->
M = trunc(pow(2, P)),
RegisterSum = lists:sum(lists:map(fun (Register) ->
pow(2, -Register)
end, array:to_list(Registers))),
E = alpha(M) * pow(M, 2) / RegisterSum,
Ep = case E =< 5 * M of
true -> E - estimate_bias(E, P);
false -> E
end,
V = length(lists:filter(fun (Register) -> Register =:= 0 end,
array:to_list(Registers))),
H = case V of
0 ->
Ep;
_ ->
M * math:log(M / V)
end,
case H =< hyper_const:threshold(P) of
true ->
H;
false ->
Ep
end.
%%
%% SERIALIZATION
%%
-spec to_json(filter()) -> any().
to_json(Hyper) ->
{[
{<<"p">>, Hyper#hyper.p},
{<<"registers">>, encode_registers(Hyper#hyper.registers)}
]}.
-spec from_json(any()) -> filter().
from_json({Struct}) ->
P = proplists:get_value(<<"p">>, Struct),
M = trunc(math:pow(2, P)),
EmptyArray = array:new([{size, M}, {fixed, true}, {default, 0}]),
{_, Registers} = lists:foldl(fun (0, {I, A}) ->
{I+1, A};
(V, {I, A}) ->
{I+1, array:set(I, V, A)}
end,
{0, EmptyArray},
decode_registers(
proplists:get_value(<<"registers">>, Struct))),
#hyper{p = P, registers = Registers}.
encode_registers(Registers) ->
ByteEncoded = array:foldl(fun (_I, V, A) ->
[<<V:8/integer>> | A]
end, [], array:resize(Registers)),
base64:encode(
zlib:gzip(
iolist_to_binary(
lists:reverse(ByteEncoded)))).
decode_registers(B) ->
decode_registers(zlib:gunzip(base64:decode(B)), []).
decode_registers(<<>>, Acc) ->
lists:reverse(Acc);
decode_registers(<<I:8/integer, Rest/binary>>, Acc) ->
decode_registers(Rest, [I | Acc]).
%%
%% HELPERS
%%
alpha(16) -> 0.673;
alpha(32) -> 0.697;
alpha(64) -> 0.709;
alpha(M) -> 0.7213 / (1 + 1.079 / M).
pow(X, Y) ->
math:pow(X, Y).
run_of_zeroes(B) ->
run_of_zeroes(1, B).
run_of_zeroes(I, B) ->
case B of
<<0:I, _/bitstring>> ->
run_of_zeroes(I + 1, B);
_ ->
I - 1
end.
estimate_bias(E, P) ->
BiasVector = list_to_tuple(hyper_const:bias_data(P)),
NearestNeighbours = nearest_neighbours(E, list_to_tuple(hyper_const:estimate_data(P))),
lists:sum([element(Index, BiasVector) || Index <- NearestNeighbours])
/ length(NearestNeighbours).
nearest_neighbours(E, Vector) ->
Distances = lists:map(fun (Index) ->
V = element(Index, Vector),
{pow((E - V), 2), Index}
end, lists:seq(1, size(Vector))),
SortedDistances = lists:keysort(1, Distances),
{_, Indexes} = lists:unzip(lists:sublist(SortedDistances, 6)),
Indexes.
%%
%% TESTS
%%
basic_test() ->
?assertEqual(1, trunc(card(insert(<<"1">>, new(4))))).
serialization_test() ->
Hyper = insert_many(generate_unique(10), new(14)),
?assertEqual(trunc(card(Hyper)), trunc(card(from_json(to_json(Hyper))))),
?assertEqual(Hyper, from_json(to_json(Hyper))).
encoding_test() ->
Hyper = insert_many(generate_unique(100000), new(14)),
?assertEqual(trunc(card(Hyper)), trunc(card(from_json(to_json(Hyper))))).
error_range_test() ->
Run = fun (Cardinality, P) ->
lists:foldl(fun (V, H) ->
insert(V, H)
end, new(P), generate_unique(Cardinality))
end,
Report = lists:map(
fun (Card) ->
CardString = string:left(integer_to_list(Card), 10, $ ),
Estimate = trunc(card(Run(Card, 14))),
io_lib:format("~s ~p~n", [CardString, Estimate])
end, lists:seq(0, 50000, 1000)),
error_logger:info_msg("~s~n", [Report]).
many_union_test() ->
random:seed(1, 2, 3),
Card = 100,
NumSets = 3,
Sets = [sets:from_list(generate_unique(Card)) || _ <- lists:seq(1, NumSets)],
Filters = lists:map(fun (S) -> insert_many(sets:to_list(S), new(14)) end,
Sets),
?assert(abs(sets:size(sets:union(Sets)) - card(union(Filters)))
< (Card * NumSets) * 0.1).
union_test() ->
random:seed(1, 2, 3),
LeftDistinct = sets:from_list(generate_unique(10000)),
RightDistinct = sets:from_list(generate_unique(5000)
++ lists:sublist(sets:to_list(LeftDistinct),
5000)),
LeftHyper = insert_many(sets:to_list(LeftDistinct), new(13)),
RightHyper = insert_many(sets:to_list(RightDistinct), new(13)),
UnionHyper = union(LeftHyper, RightHyper),
Intersection = card(LeftHyper) + card(RightHyper) - card(UnionHyper),
error_logger:info_msg("left distinct: ~p~n"
"right distinct: ~p~n"
"true union: ~p~n"
"true intersection: ~p~n"
"estimated union: ~p~n"
"estimated intersection: ~p~n",
[sets:size(LeftDistinct),
sets:size(RightDistinct),
sets:size(
sets:union(LeftDistinct, RightDistinct)),
sets:size(
sets:intersection(LeftDistinct, RightDistinct)),
card(UnionHyper),
Intersection
]).
intersect_card_test() ->
random:seed(1, 2, 3),
LeftDistinct = sets:from_list(generate_unique(10000)),
RightDistinct = sets:from_list(generate_unique(5000)
++ lists:sublist(sets:to_list(LeftDistinct),
5000)),
LeftHyper = insert_many(sets:to_list(LeftDistinct), new(13)),
RightHyper = insert_many(sets:to_list(RightDistinct), new(13)),
IntersectCard = intersect_card(LeftHyper, RightHyper),
?assert(IntersectCard =< hyper:card(hyper:union(LeftHyper, RightHyper))),
%% NOTE: we can't really say much about the error here,
%% so just pick something and see if the intersection makes sense
Error = 0.05,
?assert((abs(5000 - IntersectCard) / 5000) =< Error).
%% report_wrapper_test_() ->
%% [{timeout, 600000000, ?_test(estimate_report())}].
estimate_report() ->
random:seed(erlang:now()),
Ps = lists:seq(10, 16, 1),
Cardinalities = [100, 1000, 10000, 100000, 1000000],
Repetitions = 50,
%% Ps = [4, 5],
%% Cardinalities = [100],
%% Repetitions = 100,
Stats = [run_report(P, Card, Repetitions) || P <- Ps,
Card <- Cardinalities],
error_logger:info_msg("~p~n", [Stats]),
Result =
"p,card,mean,p99,p1,bytes~n" ++
lists:map(fun ({P, Card, Mean, P99, P1, Bytes}) ->
io_lib:format("~p,~p,~.2f,~.2f,~.2f,~p~n",
[P, Card, Mean, P99, P1, Bytes])
end, Stats),
error_logger:info_msg(Result),
ok = file:write_file("../data.csv", io_lib:format(Result, [])).
run_report(P, Card, Repetitions) ->
Estimations = lists:map(fun (_) ->
Elements = generate_unique(Card),
abs(Card - card(insert_many(Elements, new(P))))
end, lists:seq(1, Repetitions)),
error_logger:info_msg("p=~p, card=~p, reps=~p~nestimates=~p~n",
[P, Card, Repetitions, Estimations]),
Hist = basho_stats_histogram:update_all(
Estimations,
basho_stats_histogram:new(
0,
lists:max(Estimations),
length(Estimations))),
{_, Mean, _, _, Sd} = basho_stats_histogram:summary_stats(Hist),
P99 = basho_stats_histogram:quantile(0.99, Hist),
P1 = basho_stats_histogram:quantile(0.01, Hist),
{P, Card, Mean, P99, P1, trunc(pow(2, P))}.
generate_unique(N) ->
generate_unique(lists:usort(random_bytes(N)), N).
generate_unique(L, N) ->
case length(L) of
N ->
L;
Less ->
generate_unique(lists:usort(random_bytes(N - Less) ++ L), N)
end.
random_bytes(N) ->
random_bytes([], N).
random_bytes(Acc, 0) -> Acc;
random_bytes(Acc, N) ->
Int = random:uniform(100000000000000),
random_bytes([<<Int:64/integer>> | Acc], N-1).
insert_many(L, Hyper) ->
lists:foldl(fun insert/2, Hyper, L).