each_top_k(int k, ANY group, double value, arg1, arg2, ..., argN) returns a top-k records for each group. It returns a relation consists of (int rank, double value, arg1, arg2, .., argN).

This function is particularly useful for applying a similarity/distance function where the computation complexity is O(nm).

each_top_k is very fast when compared to other methods running top-k queries (e.g., rank/distribute by) in Hive.

Caution

  • each_top_k is supported from Hivemall v0.3.2-3 or later.
  • This UDTF assumes that input records are sorted by group. Use DISTRIBUTE BY group SORT BY group to ensure that. Or, you can use LEFT OUTER JOIN for certain cases.
  • It takes variable lengths arguments in argN.
  • The third argument value is used for the comparison.
  • Any number types or timestamp are accepted for the type of value.
  • If k is less than 0, reverse order is used and tail-K records are returned for each group.
  • Note that this function returns a pseudo ranking for top-k. It always returns at-most K records for each group. The ranking scheme is similar to dense_rank but slightly different in certain cases.

Usage

Efficient Top-k Query Processing using each_top_k

Efficient processing of Top-k queries is a crucial requirement in many interactive environments that involve massive amounts of data. Our Hive extension each_top_k helps running Top-k processing efficiently.

  • Suppose the following table as the input
studentclassscore
1b70
2a80
3a90
4b50
5a70
6b60
  • Then, list top-2 students for each class
studentclassscorerank
3a901
2a802
1b701
6b602

The standard way using SQL window function would be as follows:

SELECT 
  student, class, score, rank
FROM (
  SELECT
    student, class, score, 
    rank() over (PARTITION BY class ORDER BY score DESC) as rank
  FROM
    table
) t
WHRE rank <= 2

An alternative and efficient way to compute top-k items using each_top_k is as follows:

SELECT 
  each_top_k(
    2, class, score,
    class, student -- output columns other in addition to rank and score
  ) as (rank, score, class, student)
FROM (
  SELECT * FROM table
  CLUSTER BY class -- Mandatory for `each_top_k`
) t

Note

CLUSTER BY x is a synonym of DISTRIBUTE BY x CLASS SORT BY x and required when using each_top_k.

The function signature of each_top_k is each_top_k(int k, ANY group, double value, arg1, arg2, ..., argN) and it returns a relation (int rank, double value, arg1, arg2, .., argN).

Any number types or timestamp are accepted for the type of value but it MUST be not NULL. Do null hanlding like if(value is null, -1, value) to avoid null.

If k is less than 0, reverse order is used and tail-K records are returned for each group.

The ranking semantics of each_top_k follows SQL's dense_rank and then limits results by k.

Caution

each_top_k is benefical where the number of grouping keys are large. If the number of grouping keys are not so large (e.g., less than 100), consider using rank() over instead.

top-k clicks

https://stackoverflow.com/questions/9390698/hive-getting-top-n-records-in-group-by-query/32559050#32559050

set hivevar:k=5;

select
  page-id, 
  user-id,
  clicks
from (
  select
    each_top_k(${k}, page-id, clicks, page-id, user-id)
      as (rank, clicks, page-id, user-id)
  from (
    select
      page-id, user-id, clicks
    from
      mytable
    DISTRIBUTE BY page-id SORT BY page-id
  ) t1
) t2
order by page-id ASC, clicks DESC;

Top-k similarity computation

set hivevar:k=10;

SELECT
  each_top_k(
    ${k}, t2.id, angular_similarity(t2.features, t1.features), 
    t2.id, 
    t1.id,  
    t1.y
  ) as (rank, similarity, base_id, neighbor_id, y)
FROM
  test_hivemall t2 
  LEFT OUTER JOIN train_hivemall t1;
ranksimilaritybase_idneighbor_idy
10.859465062618255612105140
20.858529984951019312117190
30.85660213232040412210090
40.856205463409423812175820
50.851631402969360412220060
60.849939703941345212253640
70.8467264771461487129000
80.84633553028106691280180
90.84391784667968751270410
100.843887686729431212215950
10.839079380035400425211250
20.834451079368591325140730
30.83406025171279912590080
40.83288621902465822565980
50.8301891088485718259430
60.827195525169372625204000
70.825561940670013425109220
80.82415759563446042584770
90.82228153944015525259770
100.820575118064880425211150
10.97613304853439333425130
20.95368194580078123486970
30.95315331220626833473260
40.949327647686004634151730
50.948055744171142634194680
..........

Explicit grouping using distribute by and sort by

SELECT
  each_top_k(
    10, id1, angular_similarity(features1, features2), 
    id1, 
    id2,  
    y
  ) as (rank, similarity, id, other_id, y)
FROM (
select
  t1.id as id1,
  t2.id as id2,
  t1.features as features1,
  t2.features as features2,
  t1.y
from
  train_hivemall t1
  CROSS JOIN test_hivemall t2
DISTRIBUTE BY id1 SORT BY id1
) t;

Parallelization of similarity computation using WITH clause

create table similarities
as
WITH test_rnd as (
select
  rand(31) as rnd,
  id,
  features
from
  test_hivemall
),
t01 as (
select
 id,
 features
from
 test_rnd
where
 rnd < 0.2
),
t02 as (
select
 id,
 features
from
 test_rnd
where
 rnd >= 0.2 and rnd < 0.4
),
t03 as (
select
 id,
 features
from
 test_rnd
where
 rnd >= 0.4 and rnd < 0.6
),
t04 as (
select
 id,
 features
from
 test_rnd
where
 rnd >= 0.6 and rnd < 0.8
),
t05 as (
select
 id,
 features
from
 test_rnd
where
 rnd >= 0.8
),
s01 as (
SELECT
  each_top_k(
    10, t2.id, angular_similarity(t2.features, t1.features), 
    t2.id, 
    t1.id,  
    t1.y
  ) as (rank, similarity, base_id, neighbor_id, y)
FROM
  t01 t2 
  LEFT OUTER JOIN train_hivemall t1
),
s02 as (
SELECT
  each_top_k(
    10, t2.id, angular_similarity(t2.features, t1.features), 
    t2.id, 
    t1.id,  
    t1.y
  ) as (rank, similarity, base_id, neighbor_id, y)
FROM
  t02 t2 
  LEFT OUTER JOIN train_hivemall t1
),
s03 as (
SELECT
  each_top_k(
    10, t2.id, angular_similarity(t2.features, t1.features), 
    t2.id, 
    t1.id,  
    t1.y
  ) as (rank, similarity, base_id, neighbor_id, y)
FROM
  t03 t2 
  LEFT OUTER JOIN train_hivemall t1
),
s04 as (
SELECT
  each_top_k(
    10, t2.id, angular_similarity(t2.features, t1.features), 
    t2.id, 
    t1.id,  
    t1.y
  ) as (rank, similarity, base_id, neighbor_id, y)
FROM
  t04 t2 
  LEFT OUTER JOIN train_hivemall t1
),
s05 as (
SELECT
  each_top_k(
    10, t2.id, angular_similarity(t2.features, t1.features), 
    t2.id, 
    t1.id,  
    t1.y
  ) as (rank, similarity, base_id, neighbor_id, y)
FROM
  t05 t2 
  LEFT OUTER JOIN train_hivemall t1
)
select * from s01
union all
select * from s02
union all
select * from s03
union all
select * from s04
union all
select * from s05;

tail-K

set hivevar:k=-10;

SELECT
  each_top_k(
    ${k}, t2.id, angular_similarity(t2.features, t1.features), 
    t2.id, 
    t1.id,  
    t1.y
  ) as (rank, similarity, base_id, neighbor_id, y)
FROM
  test_hivemall t2 
  LEFT OUTER JOIN train_hivemall t1
-- limit 25
ranksimilaritybase_idneighbor_idy
10.4383084177970886175030
20.441668212413787841101430
30.44243007898330691110730
40.442540645599365231177820
50.44420343637466431185560
60.45163780450820923137860
70.452445030212402341102420
80.45256721973419191216570
90.45271271467208861172180
100.453141331672668461251410
10.44030147790908813237860
20.44087988138198852233860
30.441125631332397462110730
40.44154018163681032228530
50.44221937656402592216570
60.44290328025817872101430
70.44359070062637332244130
80.44569307565689087275030
90.44608438014984132251410
100.44649147987365722242890
10.438629031181335453231501
20.4398220181465149398811
30.442836046218872073271210
40.44321084022521973262201
50.443232297897338873185410
..........

Alternative approaches

In order to utilize mapper-side aggregation and reduce computational cost of shuffling, you can use to_ordered_map or to_ordered_list to get top/tail-k elements instead of each_top_k.

As long as key is unique in each id, the following queries return same result:

with t as (
  select
    each_top_k(
      10, id, key,
      id, value
    ) as (rank, key, id, value)
  from (
    select
      *
    from 
      test
    cluster by 
      id
  ) t
)
select 
  id, collect_list(value) as topk
from 
  t
group by
  id
with t as (
  select
    id, to_ordered_map(key, value, 10) as m
  from 
    test
  group by
    id
)
select 
  id, collect_list(value) as topk
from 
  t
lateral view explode(m) t as key, value
group by
  id
select 
  id, to_ordered_list(value, key, '-k 10') as topk
from 
  test
group by
  id

Caution

In case that key could duplicate in id, to_ordered_map behaves differently because key K is always unique in Map<K, V>.

Similarly to each_top_k, tail-k can also be represented as: to_ordered_map(key, value, -10) and to_ordered_list(value, key, '-k -10').