blob: 206c2e3a82f3ccce19f60d33b3642676cef2357d [file] [log] [blame]
/*
* Copyright 2016, Yahoo! Inc.
* Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
*/
package com.yahoo.sketches.pig.tuple;
import org.apache.pig.Accumulator;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
import com.yahoo.memory.Memory;
import com.yahoo.sketches.tuple.Sketch;
import com.yahoo.sketches.tuple.SketchIterator;
import com.yahoo.sketches.tuple.Sketches;
import com.yahoo.sketches.tuple.UpdatableSketch;
import com.yahoo.sketches.tuple.UpdatableSketchBuilder;
import com.yahoo.sketches.tuple.adouble.DoubleSummary;
import com.yahoo.sketches.tuple.adouble.DoubleSummaryDeserializer;
import com.yahoo.sketches.tuple.adouble.DoubleSummaryFactory;
public class DataToDoubleSummarySketchTest {
@Test
public void execNullInputTuple() throws Exception {
EvalFunc<Tuple> func = new DataToDoubleSummarySketch();
Tuple resultTuple = func.exec(null);
Assert.assertNull(resultTuple);
}
@Test
public void execEmptyInputTuple() throws Exception {
EvalFunc<Tuple> func = new DataToDoubleSummarySketch();
Tuple resultTuple = func.exec(TupleFactory.getInstance().newTuple());
Assert.assertNull(resultTuple);
}
@Test
public void execEmptyBag() throws Exception {
EvalFunc<Tuple> func = new DataToDoubleSummarySketch();
Tuple inputTuple = PigUtil.objectsToTuple(BagFactory.getInstance().newDefaultBag());
Tuple resultTuple = func.exec(inputTuple);
Assert.assertNotNull(resultTuple);
Assert.assertEquals(resultTuple.size(), 1);
DataByteArray bytes = (DataByteArray) resultTuple.get(0);
Assert.assertTrue(bytes.size() > 0);
Sketch<DoubleSummary> sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer());
Assert.assertEquals(sketch.getEstimate(), 0.0);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void execWrongSizeOfInnerTuple() throws Exception {
EvalFunc<Tuple> func = new DataToDoubleSummarySketch();
DataBag bag = BagFactory.getInstance().newDefaultBag();
bag.add(PigUtil.objectsToTuple(1));
Tuple inputTuple = PigUtil.objectsToTuple(bag);
func.exec(inputTuple);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void execWrongKeyType() throws Exception {
EvalFunc<Tuple> func = new DataToDoubleSummarySketch();
DataBag bag = BagFactory.getInstance().newDefaultBag();
bag.add(PigUtil.objectsToTuple(new Object(), 1.0)); // Object in place of key is not supported
Tuple inputTuple = PigUtil.objectsToTuple(bag);
func.exec(inputTuple);
}
@Test
public void execAllInputTypes() throws Exception {
EvalFunc<Tuple> func = new DataToDoubleSummarySketch();
DataBag bag = BagFactory.getInstance().newDefaultBag();
bag.add(PigUtil.objectsToTuple("a", 1.0));
bag.add(PigUtil.objectsToTuple("b", 1.0));
bag.add(PigUtil.objectsToTuple("a", 2.0));
bag.add(PigUtil.objectsToTuple("b", 2.0));
bag.add(PigUtil.objectsToTuple(1, 3.0));
bag.add(PigUtil.objectsToTuple(2L, 3.0));
bag.add(PigUtil.objectsToTuple(1f, 3.0));
bag.add(PigUtil.objectsToTuple(2.0, 3.0));
bag.add(PigUtil.objectsToTuple((byte)3, 3.0));
bag.add(PigUtil.objectsToTuple(new DataByteArray("c".getBytes()), 3.0));
Tuple inputTuple = PigUtil.objectsToTuple(bag);
Tuple resultTuple = func.exec(inputTuple);
Assert.assertNotNull(resultTuple);
Assert.assertEquals(resultTuple.size(), 1);
DataByteArray bytes = (DataByteArray) resultTuple.get(0);
Assert.assertTrue(bytes.size() > 0);
Sketch<DoubleSummary> sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer());
Assert.assertEquals(sketch.getEstimate(), 8.0, 0.0);
SketchIterator<DoubleSummary> it = sketch.iterator();
while (it.next()) {
Assert.assertEquals(it.getSummary().getValue(), 3.0);
}
}
@Test
public void execMinMode() throws Exception {
EvalFunc<Tuple> func = new DataToDoubleSummarySketch("32", "Min");
DataBag bag = BagFactory.getInstance().newDefaultBag();
bag.add(PigUtil.objectsToTuple("a", 1.0));
bag.add(PigUtil.objectsToTuple("b", 2.0));
bag.add(PigUtil.objectsToTuple("a", 2.0));
bag.add(PigUtil.objectsToTuple("b", 1.0));
Tuple inputTuple = PigUtil.objectsToTuple(bag);
Tuple resultTuple = func.exec(inputTuple);
Assert.assertNotNull(resultTuple);
Assert.assertEquals(resultTuple.size(), 1);
DataByteArray bytes = (DataByteArray) resultTuple.get(0);
Assert.assertTrue(bytes.size() > 0);
Sketch<DoubleSummary> sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer());
Assert.assertEquals(sketch.getEstimate(), 2.0, 0.0);
SketchIterator<DoubleSummary> it = sketch.iterator();
while (it.next()) {
Assert.assertEquals(it.getSummary().getValue(), 1.0);
}
}
@Test
public void accumulator() throws Exception {
Accumulator<Tuple> func = new DataToDoubleSummarySketch("32");
Tuple inputTuple = TupleFactory.getInstance().newTuple(1);
DataBag bag = BagFactory.getInstance().newDefaultBag();
bag.add(PigUtil.objectsToTuple("a", 1.0));
inputTuple.set(0, bag);
func.accumulate(inputTuple);
inputTuple = TupleFactory.getInstance().newTuple(1);
bag = BagFactory.getInstance().newDefaultBag();
bag.add(PigUtil.objectsToTuple("b", 1.0));
bag.add(PigUtil.objectsToTuple("a", 2.0));
bag.add(PigUtil.objectsToTuple("b", 2.0));
inputTuple.set(0, bag);
func.accumulate(inputTuple);
Tuple resultTuple = func.getValue();
Assert.assertNotNull(resultTuple);
Assert.assertEquals(resultTuple.size(), 1);
DataByteArray bytes = (DataByteArray) resultTuple.get(0);
Assert.assertTrue(bytes.size() > 0);
Sketch<DoubleSummary> sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer());
Assert.assertEquals(sketch.getEstimate(), 2.0, 0.0);
SketchIterator<DoubleSummary> it = sketch.iterator();
while (it.next()) {
Assert.assertEquals(it.getSummary().getValue(), 3.0);
}
// after cleanup, the value should always be 0
func.cleanup();
resultTuple = func.getValue();
Assert.assertNotNull(resultTuple);
Assert.assertEquals(resultTuple.size(), 1);
bytes = (DataByteArray) resultTuple.get(0);
Assert.assertTrue(bytes.size() > 0);
Sketch<DoubleSummary> sketch2 = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer());
Assert.assertEquals(sketch2.getEstimate(), 0.0, 0.0);
}
@Test
public void algebraicInitialOneParam() throws Exception {
EvalFunc<Tuple> func = new DataToDoubleSummarySketch.Initial(null);
Tuple inputTuple = TupleFactory.getInstance().newTuple(1);
DataBag bag = BagFactory.getInstance().newDefaultBag();
bag.add(PigUtil.objectsToTuple(null, null));
bag.add(PigUtil.objectsToTuple(null, null));
bag.add(PigUtil.objectsToTuple(null, null));
inputTuple.set(0, bag);
Tuple resultTuple = func.exec(inputTuple);
Assert.assertNotNull(resultTuple);
Assert.assertEquals(resultTuple.size(), 1);
DataBag resultBag = (DataBag) resultTuple.get(0);
Assert.assertEquals(resultBag.size(), 3);
}
@Test
public void algebraicInitialTwoParams() throws Exception {
EvalFunc<Tuple> func = new DataToDoubleSummarySketch.Initial(null, null);
Tuple inputTuple = TupleFactory.getInstance().newTuple(1);
DataBag bag = BagFactory.getInstance().newDefaultBag();
bag.add(PigUtil.objectsToTuple(null, null));
bag.add(PigUtil.objectsToTuple(null, null));
bag.add(PigUtil.objectsToTuple(null, null));
inputTuple.set(0, bag);
Tuple resultTuple = func.exec(inputTuple);
Assert.assertNotNull(resultTuple);
Assert.assertEquals(resultTuple.size(), 1);
DataBag resultBag = (DataBag) resultTuple.get(0);
Assert.assertEquals(resultBag.size(), 3);
}
@Test
public void algebraicIntermediateFinal() throws Exception {
EvalFunc<Tuple> func = new DataToDoubleSummarySketch.IntermediateFinal("32");
Tuple inputTuple = TupleFactory.getInstance().newTuple(1);
DataBag bag = BagFactory.getInstance().newDefaultBag();
inputTuple.set(0, bag);
// this is to simulate the output from Initial
bag.add(PigUtil.objectsToTuple(PigUtil.tuplesToBag(PigUtil.objectsToTuple("a", 1.0))));
// this is to simulate the output from a prior call of IntermediateFinal
UpdatableSketch<Double, DoubleSummary> ts = new UpdatableSketchBuilder<Double, DoubleSummary>(new DoubleSummaryFactory()).build();
ts.update("b", 1.0);
ts.update("a", 2.0);
ts.update("b", 2.0);
Sketch<DoubleSummary> cs = ts.compact();
bag.add(PigUtil.objectsToTuple(new DataByteArray(cs.toByteArray())));
Tuple resultTuple = func.exec(inputTuple);
Assert.assertNotNull(resultTuple);
Assert.assertEquals(resultTuple.size(), 1);
DataByteArray bytes = (DataByteArray) resultTuple.get(0);
Assert.assertTrue(bytes.size() > 0);
Sketch<DoubleSummary> sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer());
Assert.assertEquals(sketch.getEstimate(), 2.0, 0.0);
SketchIterator<DoubleSummary> it = sketch.iterator();
while (it.next()) {
Assert.assertEquals(it.getSummary().getValue(), 3.0);
}
}
@Test
public void algebraicIntermediateFinalMaxMode() throws Exception {
EvalFunc<Tuple> func = new DataToDoubleSummarySketch.IntermediateFinal("32", "Max");
Tuple inputTuple = TupleFactory.getInstance().newTuple(1);
DataBag bag = BagFactory.getInstance().newDefaultBag();
inputTuple.set(0, bag);
// this is to simulate the output from Initial
bag.add(PigUtil.objectsToTuple(PigUtil.tuplesToBag(PigUtil.objectsToTuple("a", 1.0))));
bag.add(PigUtil.objectsToTuple(PigUtil.tuplesToBag(PigUtil.objectsToTuple("b", 1.0))));
bag.add(PigUtil.objectsToTuple(PigUtil.tuplesToBag(PigUtil.objectsToTuple("a", 2.0))));
bag.add(PigUtil.objectsToTuple(PigUtil.tuplesToBag(PigUtil.objectsToTuple("b", 2.0))));
Tuple resultTuple = func.exec(inputTuple);
Assert.assertNotNull(resultTuple);
Assert.assertEquals(resultTuple.size(), 1);
DataByteArray bytes = (DataByteArray) resultTuple.get(0);
Assert.assertTrue(bytes.size() > 0);
Sketch<DoubleSummary> sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer());
Assert.assertEquals(sketch.getEstimate(), 2.0, 0.0);
SketchIterator<DoubleSummary> it = sketch.iterator();
while (it.next()) {
Assert.assertEquals(it.getSummary().getValue(), 2.0);
}
}
}