This article introduces how to find outliers using Local Outlier Detection (LOF) on Hivemall.
create database lof; use lof; create external table hundred_balls ( rowid int, weight double, specific_heat double, reflectance double ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS TEXTFILE LOCATION '/dataset/lof/hundred_balls';
Download hundred_balls.txt that is originally provides in this article.
In this example, Rowid 87
is apparently an outlier.
awk '{FS=" "; OFS=" "; print NR,$0}' hundred_balls.txt | \ hadoop fs -put - /dataset/lof/hundred_balls/hundred_balls.txt
create table train as select rowid, array(concat("weight:", weight), concat("specific_heat:", specific_heat), concat("reflectance:", reflectance)) as features from hundred_balls;
create table train_normalized as WITH fv as ( select rowid, extract_feature(feature) as feature, extract_weight(feature) as value from train LATERAL VIEW explode(features) exploded AS feature ), stats as ( select feature, -- avg(value) as mean, stddev_pop(value) as stddev min(value) as min, max(value) as max from fv group by feature ), norm as ( select rowid, t1.feature, -- zscore(t1.value, t2.mean, t2.stddev) as zscore rescale(t1.value, t2.min, t2.max) as minmax from fv t1 JOIN stats t2 ON (t1.feature = t2.feature) ), norm_fv as ( select rowid, -- concat(feature, ":", zscore) as feature concat(feature, ":", minmax) as feature from norm ) select rowid, collect_list(feature) as features from norm_fv group by rowid ;
hive> select * from train_normalized limit 3; 1 ["reflectance:0.5252967","specific_heat:0.19863537","weight:0.0"] 2 ["reflectance:0.5950446","specific_heat:0.09166764","weight:0.052084323"] 3 ["reflectance:0.6797837","specific_heat:0.12567581","weight:0.13255163"]
-- workaround to deal with a bug in Hive/Tez -- https://issues.apache.org/jira/browse/HIVE-10729 -- set hive.auto.convert.join=false; set hive.mapjoin.optimized.hashtable=false; -- parameter of LoF set hivevar:k=12; -- find topk outliers set hivevar:topk=3;
create table list_neighbours as select each_top_k( -${k}, t1.rowid, euclid_distance(t1.features, t2.features), t1.rowid, t2.rowid ) as (rank, distance, target, neighbour) from train_normalized t1 LEFT OUTER JOIN train_normalized t2 where t1.rowid != t2.rowid ;
Caution
list_neighbours
table SHOULD be created becauselist_neighbours
is used multiple times.
Info
To parallelize a top-k computation, break LEFT-hand table into piece as describe in this page.
WITH k_distance as ( select target, max(distance) as k_distance from list_neighbours group by target ), reach_distance as ( select t1.target, max2(t2.k_distance, t1.distance) as reach_distance from list_neighbours t1 JOIN k_distance t2 ON (t1.neighbour = t2.target) ), lrd as ( select target, 1.0 / avg(reach_distance) as lrd from reach_distance group by target ), neighbours_lrd as ( select t1.target, t2.lrd from list_neighbours t1 JOIN lrd t2 on (t1.neighbour = t2.target) ) select t1.target, sum(t2.lrd / t1.lrd) / count(1) as lof from lrd t1 JOIN neighbours_lrd t2 on (t1.target = t2.target) group by t1.target order by lof desc limit ${topk};
> 87 3.031143749957831 > 16 1.9755564408378874 > 1 1.8415763570939774