each_top_k(int k, ANY group, double value, arg1, arg2, ..., argN)
returns a top-k records for each group
. It returns a relation consists of (int rank, double value, arg1, arg2, .., argN)
.
This function is particularly useful for applying a similarity/distance function where the computation complexity is O(nm).
each_top_k
is very fast when compared to other methods running top-k queries (e.g., rank/distribute by
) in Hive.
each_top_k
is supported from Hivemall v0.3.2-3 or later.group
. Use DISTRIBUTE BY group SORT BY group
to ensure that. Or, you can use LEFT OUTER JOIN
for certain cases.argN
.value
is used for the comparison.Any number types
or timestamp
are accepted for the type of value
.tail-K
records are returned for each group
.at-most K
records for each group. The ranking scheme is similar to dense_rank
but slightly different in certain cases.each_top_k
Efficient processing of Top-k queries is a crucial requirement in many interactive environments that involve massive amounts of data. Our Hive extension each_top_k
helps running Top-k processing efficiently.
student | class | score |
---|---|---|
1 | b | 70 |
2 | a | 80 |
3 | a | 90 |
4 | b | 50 |
5 | a | 70 |
6 | b | 60 |
student | class | score | rank |
---|---|---|---|
3 | a | 90 | 1 |
2 | a | 80 | 2 |
1 | b | 70 | 1 |
6 | b | 60 | 2 |
The standard way using SQL window function would be as follows:
SELECT student, class, score, rank FROM ( SELECT student, class, score, rank() over (PARTITION BY class ORDER BY score DESC) as rank FROM table ) t WHRE rank <= 2
An alternative and efficient way to compute top-k items using each_top_k
is as follows:
SELECT each_top_k( 2, class, score, class, student -- output columns other in addition to rank and score ) as (rank, score, class, student) FROM ( SELECT * FROM table CLUSTER BY class -- Mandatory for `each_top_k` ) t
Note
CLUSTER BY x
is a synonym ofDISTRIBUTE BY x CLASS SORT BY x
and required when usingeach_top_k
.
The function signature of each_top_k
is each_top_k(int k, ANY group, double value, arg1, arg2, ..., argN)
and it returns a relation (int rank, double value, arg1, arg2, .., argN)
.
Any number types or timestamp are accepted for the type of value
but it MUST be not NULL. Do null hanlding like if(value is null, -1, value)
to avoid null.
If k
is less than 0, reverse order is used and tail-K records are returned for each group
.
The ranking semantics of each_top_k
follows SQL's dense_rank
and then limits results by k
.
Caution
each_top_k
is benefical where the number of grouping keys are large. If the number of grouping keys are not so large (e.g., less than 100), consider usingrank() over
instead.
set hivevar:k=5; select page-id, user-id, clicks from ( select each_top_k(${k}, page-id, clicks, page-id, user-id) as (rank, clicks, page-id, user-id) from ( select page-id, user-id, clicks from mytable DISTRIBUTE BY page-id SORT BY page-id ) t1 ) t2 order by page-id ASC, clicks DESC;
set hivevar:k=10; SELECT each_top_k( ${k}, t2.id, angular_similarity(t2.features, t1.features), t2.id, t1.id, t1.y ) as (rank, similarity, base_id, neighbor_id, y) FROM test_hivemall t2 LEFT OUTER JOIN train_hivemall t1;
rank | similarity | base_id | neighbor_id | y |
---|---|---|---|---|
1 | 0.8594650626182556 | 12 | 10514 | 0 |
2 | 0.8585299849510193 | 12 | 11719 | 0 |
3 | 0.856602132320404 | 12 | 21009 | 0 |
4 | 0.8562054634094238 | 12 | 17582 | 0 |
5 | 0.8516314029693604 | 12 | 22006 | 0 |
6 | 0.8499397039413452 | 12 | 25364 | 0 |
7 | 0.8467264771461487 | 12 | 900 | 0 |
8 | 0.8463355302810669 | 12 | 8018 | 0 |
9 | 0.8439178466796875 | 12 | 7041 | 0 |
10 | 0.8438876867294312 | 12 | 21595 | 0 |
1 | 0.8390793800354004 | 25 | 21125 | 0 |
2 | 0.8344510793685913 | 25 | 14073 | 0 |
3 | 0.8340602517127991 | 25 | 9008 | 0 |
4 | 0.8328862190246582 | 25 | 6598 | 0 |
5 | 0.8301891088485718 | 25 | 943 | 0 |
6 | 0.8271955251693726 | 25 | 20400 | 0 |
7 | 0.8255619406700134 | 25 | 10922 | 0 |
8 | 0.8241575956344604 | 25 | 8477 | 0 |
9 | 0.822281539440155 | 25 | 25977 | 0 |
10 | 0.8205751180648804 | 25 | 21115 | 0 |
1 | 0.9761330485343933 | 34 | 2513 | 0 |
2 | 0.9536819458007812 | 34 | 8697 | 0 |
3 | 0.9531533122062683 | 34 | 7326 | 0 |
4 | 0.9493276476860046 | 34 | 15173 | 0 |
5 | 0.9480557441711426 | 34 | 19468 | 0 |
.. | .. | .. | .. | .. |
distribute by
and sort by
SELECT each_top_k( 10, id1, angular_similarity(features1, features2), id1, id2, y ) as (rank, similarity, id, other_id, y) FROM ( select t1.id as id1, t2.id as id2, t1.features as features1, t2.features as features2, t1.y from train_hivemall t1 CROSS JOIN test_hivemall t2 DISTRIBUTE BY id1 SORT BY id1 ) t;
create table similarities as WITH test_rnd as ( select rand(31) as rnd, id, features from test_hivemall ), t01 as ( select id, features from test_rnd where rnd < 0.2 ), t02 as ( select id, features from test_rnd where rnd >= 0.2 and rnd < 0.4 ), t03 as ( select id, features from test_rnd where rnd >= 0.4 and rnd < 0.6 ), t04 as ( select id, features from test_rnd where rnd >= 0.6 and rnd < 0.8 ), t05 as ( select id, features from test_rnd where rnd >= 0.8 ), s01 as ( SELECT each_top_k( 10, t2.id, angular_similarity(t2.features, t1.features), t2.id, t1.id, t1.y ) as (rank, similarity, base_id, neighbor_id, y) FROM t01 t2 LEFT OUTER JOIN train_hivemall t1 ), s02 as ( SELECT each_top_k( 10, t2.id, angular_similarity(t2.features, t1.features), t2.id, t1.id, t1.y ) as (rank, similarity, base_id, neighbor_id, y) FROM t02 t2 LEFT OUTER JOIN train_hivemall t1 ), s03 as ( SELECT each_top_k( 10, t2.id, angular_similarity(t2.features, t1.features), t2.id, t1.id, t1.y ) as (rank, similarity, base_id, neighbor_id, y) FROM t03 t2 LEFT OUTER JOIN train_hivemall t1 ), s04 as ( SELECT each_top_k( 10, t2.id, angular_similarity(t2.features, t1.features), t2.id, t1.id, t1.y ) as (rank, similarity, base_id, neighbor_id, y) FROM t04 t2 LEFT OUTER JOIN train_hivemall t1 ), s05 as ( SELECT each_top_k( 10, t2.id, angular_similarity(t2.features, t1.features), t2.id, t1.id, t1.y ) as (rank, similarity, base_id, neighbor_id, y) FROM t05 t2 LEFT OUTER JOIN train_hivemall t1 ) select * from s01 union all select * from s02 union all select * from s03 union all select * from s04 union all select * from s05;
set hivevar:k=-10; SELECT each_top_k( ${k}, t2.id, angular_similarity(t2.features, t1.features), t2.id, t1.id, t1.y ) as (rank, similarity, base_id, neighbor_id, y) FROM test_hivemall t2 LEFT OUTER JOIN train_hivemall t1 -- limit 25
rank | similarity | base_id | neighbor_id | y |
---|---|---|---|---|
1 | 0.4383084177970886 | 1 | 7503 | 0 |
2 | 0.44166821241378784 | 1 | 10143 | 0 |
3 | 0.4424300789833069 | 1 | 11073 | 0 |
4 | 0.44254064559936523 | 1 | 17782 | 0 |
5 | 0.4442034363746643 | 1 | 18556 | 0 |
6 | 0.45163780450820923 | 1 | 3786 | 0 |
7 | 0.45244503021240234 | 1 | 10242 | 0 |
8 | 0.4525672197341919 | 1 | 21657 | 0 |
9 | 0.4527127146720886 | 1 | 17218 | 0 |
10 | 0.45314133167266846 | 1 | 25141 | 0 |
1 | 0.44030147790908813 | 2 | 3786 | 0 |
2 | 0.4408798813819885 | 2 | 23386 | 0 |
3 | 0.44112563133239746 | 2 | 11073 | 0 |
4 | 0.4415401816368103 | 2 | 22853 | 0 |
5 | 0.4422193765640259 | 2 | 21657 | 0 |
6 | 0.4429032802581787 | 2 | 10143 | 0 |
7 | 0.4435907006263733 | 2 | 24413 | 0 |
8 | 0.44569307565689087 | 2 | 7503 | 0 |
9 | 0.4460843801498413 | 2 | 25141 | 0 |
10 | 0.4464914798736572 | 2 | 24289 | 0 |
1 | 0.43862903118133545 | 3 | 23150 | 1 |
2 | 0.4398220181465149 | 3 | 9881 | 1 |
3 | 0.44283604621887207 | 3 | 27121 | 0 |
4 | 0.4432108402252197 | 3 | 26220 | 1 |
5 | 0.44323229789733887 | 3 | 18541 | 0 |
.. | .. | .. | .. | .. |
In order to utilize mapper-side aggregation and reduce computational cost of shuffling, you can use to_ordered_map
or to_ordered_list
to get top/tail-k elements instead of each_top_k
.
As long as key
is unique in each id
, the following queries return same result:
with t as ( select each_top_k( 10, id, key, id, value ) as (rank, key, id, value) from ( select * from test cluster by id ) t ) select id, collect_list(value) as topk from t group by id
with t as ( select id, to_ordered_map(key, value, 10) as m from test group by id ) select id, collect_list(value) as topk from t lateral view explode(m) t as key, value group by id
select id, to_ordered_list(value, key, '-k 10') as topk from test group by id
Caution
In case that
key
could duplicate inid
,to_ordered_map
behaves differently because keyK
is always unique inMap<K, V>
.
Similarly to each_top_k
, tail-k can also be represented as: to_ordered_map(key, value, -10)
and to_ordered_list(value, key, '-k -10')
.