register datasketches-memory-2.0.0.jar; register datasketches-java-3.1.0.jar; register datasketches-pig-1.1.0.jar; -- very small sketch just for the purpose of this tiny example DEFINE ReservoirSampling org.apache.datasketches.pig.sampling.ReservoirSampling('4'); DEFINE ReservoirUnion org.apache.datasketches.pig.sampling.ReesrvoirUnion('4'); raw_data = LOAD 'data.txt' USING PigStorage('\t') AS (scale: double, label: chararray); -- make a few independent sketches from the input data sketches = FOREACH (GROUP raw_data ALL) GENERATE DataToSketch(raw_data) AS sketch0, DataToSketch(raw_data) AS sketch1, DataToSketch(raw_data) AS sketch2 ; sketchBag = FOREACH sketches GENERATE TOBAG(sketch0, sketch1, sketch2)) ; result = FOREACH sketchBag GENERATE FLATTEN(ReservoirUnion(*)) AS (n, k, samples:{(scale, label)}) ; DUMP result; DESCRIBE result;
The test data has 2 fields: scale and label. The first step of the query creates several reservoir samples from the input data. We merge the sketches into a bag in the next step, and then union the independent sketches and dump the results.
Results:
From ‘DUMP result’:
(24,4,{(30.0,h),(7.0,g),(6.0,f),(5.0,e)})
Running this script many, we will see each element appear with equal probability.
From ‘DESCRIBE result’:
result: {n: long,k: int,samples: {(scale: double,label: chararray)}}
1.0 a 2.0 b 3.0 c 4.0 d 5.0 e 6.0 f 7.0 g 30.0 h