| /* |
| * Copyright 2016, Yahoo! Inc. |
| * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms. |
| */ |
| package com.yahoo.sketches.pig.tuple; |
| |
| import org.apache.pig.Accumulator; |
| import org.apache.pig.EvalFunc; |
| import org.apache.pig.data.BagFactory; |
| import org.apache.pig.data.DataBag; |
| import org.apache.pig.data.DataByteArray; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.data.TupleFactory; |
| import org.testng.Assert; |
| import org.testng.annotations.Test; |
| |
| import com.yahoo.memory.Memory; |
| import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; |
| import com.yahoo.sketches.tuple.ArrayOfDoublesSketches; |
| import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; |
| import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; |
| |
| public class DataToArrayOfDoublesSketchTest { |
| |
| @Test |
| public void execNullInputTuple() throws Exception { |
| EvalFunc<Tuple> func = new DataToArrayOfDoublesSketch(); |
| Tuple resultTuple = func.exec(null); |
| Assert.assertNull(resultTuple); |
| } |
| |
| @Test |
| public void execEmptyInputTuple() throws Exception { |
| EvalFunc<Tuple> func = new DataToArrayOfDoublesSketch(); |
| Tuple resultTuple = func.exec(TupleFactory.getInstance().newTuple()); |
| Assert.assertNull(resultTuple); |
| } |
| |
| @Test |
| public void execEmptyBag() throws Exception { |
| EvalFunc<Tuple> func = new DataToArrayOfDoublesSketch("1"); |
| Tuple inputTuple = PigUtil.objectsToTuple(BagFactory.getInstance().newDefaultBag()); |
| Tuple resultTuple = func.exec(inputTuple); |
| Assert.assertNotNull(resultTuple); |
| Assert.assertEquals(resultTuple.size(), 1); |
| DataByteArray bytes = (DataByteArray) resultTuple.get(0); |
| Assert.assertTrue(bytes.size() > 0); |
| ArrayOfDoublesSketch sketch = ArrayOfDoublesSketches.heapifySketch(Memory.wrap(bytes.get())); |
| Assert.assertEquals(sketch.getEstimate(), 0.0); |
| } |
| |
| @Test(expectedExceptions = IllegalArgumentException.class) |
| public void execWrongSizeOfInnerTuple() throws Exception { |
| EvalFunc<Tuple> func = new DataToArrayOfDoublesSketch("32", "1"); |
| DataBag bag = BagFactory.getInstance().newDefaultBag(); |
| bag.add(PigUtil.objectsToTuple(1)); |
| Tuple inputTuple = PigUtil.objectsToTuple(bag); |
| func.exec(inputTuple); |
| } |
| |
| @Test(expectedExceptions = IllegalArgumentException.class) |
| public void execWrongKeyType() throws Exception { |
| EvalFunc<Tuple> func = new DataToArrayOfDoublesSketch("32", "1"); |
| DataBag bag = BagFactory.getInstance().newDefaultBag(); |
| bag.add(PigUtil.objectsToTuple(new Object(), 1.0)); // Object in place of key is not supported |
| Tuple inputTuple = PigUtil.objectsToTuple(bag); |
| func.exec(inputTuple); |
| } |
| |
| @Test |
| public void execAllInputTypes() throws Exception { |
| EvalFunc<Tuple> func = new DataToArrayOfDoublesSketch("32", "1"); |
| DataBag bag = BagFactory.getInstance().newDefaultBag(); |
| bag.add(PigUtil.objectsToTuple("a", 1.0)); |
| bag.add(PigUtil.objectsToTuple("b", 1.0)); |
| bag.add(PigUtil.objectsToTuple("a", 2.0)); |
| bag.add(PigUtil.objectsToTuple("b", 2.0)); |
| |
| bag.add(PigUtil.objectsToTuple(1, 3.0)); |
| bag.add(PigUtil.objectsToTuple(2L, 3.0)); |
| bag.add(PigUtil.objectsToTuple(1f, 3.0)); |
| bag.add(PigUtil.objectsToTuple(2.0, 3.0)); |
| bag.add(PigUtil.objectsToTuple((byte) 3, 3.0)); |
| bag.add(PigUtil.objectsToTuple(new DataByteArray("c".getBytes()), 3.0)); |
| |
| Tuple inputTuple = PigUtil.objectsToTuple(bag); |
| Tuple resultTuple = func.exec(inputTuple); |
| Assert.assertNotNull(resultTuple); |
| Assert.assertEquals(resultTuple.size(), 1); |
| DataByteArray bytes = (DataByteArray) resultTuple.get(0); |
| Assert.assertTrue(bytes.size() > 0); |
| ArrayOfDoublesSketch sketch = ArrayOfDoublesSketches.heapifySketch(Memory.wrap(bytes.get())); |
| Assert.assertEquals(sketch.getEstimate(), 8.0, 0.0); |
| |
| for (double[] values: sketch.getValues()) { |
| Assert.assertEquals(values[0], 3.0); |
| } |
| } |
| |
| @Test |
| public void execWithSampling() throws Exception { |
| EvalFunc<Tuple> func = new DataToArrayOfDoublesSketch("1024", "0.5", "1"); |
| DataBag bag = BagFactory.getInstance().newDefaultBag(); |
| int uniques = 10000; |
| for (int i = 0; i < uniques; i++) bag.add(PigUtil.objectsToTuple(i, 1.0)); |
| Tuple resultTuple = func.exec(PigUtil.objectsToTuple(bag)); |
| Assert.assertNotNull(resultTuple); |
| Assert.assertEquals(resultTuple.size(), 1); |
| DataByteArray bytes = (DataByteArray) resultTuple.get(0); |
| Assert.assertTrue(bytes.size() > 0); |
| ArrayOfDoublesSketch sketch = ArrayOfDoublesSketches.heapifySketch(Memory.wrap(bytes.get())); |
| Assert.assertEquals(sketch.getEstimate(), uniques, uniques * 0.01); |
| } |
| |
| @Test |
| public void accumulator() throws Exception { |
| Accumulator<Tuple> func = new DataToArrayOfDoublesSketch("32", "1"); |
| |
| Tuple inputTuple = TupleFactory.getInstance().newTuple(1); |
| DataBag bag = BagFactory.getInstance().newDefaultBag(); |
| bag.add(PigUtil.objectsToTuple("a", 1.0)); |
| inputTuple.set(0, bag); |
| |
| func.accumulate(inputTuple); |
| |
| inputTuple = TupleFactory.getInstance().newTuple(1); |
| bag = BagFactory.getInstance().newDefaultBag(); |
| bag.add(PigUtil.objectsToTuple("b", 1.0)); |
| bag.add(PigUtil.objectsToTuple("a", 2.0)); |
| bag.add(PigUtil.objectsToTuple("b", 2.0)); |
| inputTuple.set(0, bag); |
| |
| func.accumulate(inputTuple); |
| |
| Tuple resultTuple = func.getValue(); |
| Assert.assertNotNull(resultTuple); |
| Assert.assertEquals(resultTuple.size(), 1); |
| DataByteArray bytes = (DataByteArray) resultTuple.get(0); |
| Assert.assertTrue(bytes.size() > 0); |
| ArrayOfDoublesSketch sketch = ArrayOfDoublesSketches.heapifySketch(Memory.wrap(bytes.get())); |
| Assert.assertEquals(sketch.getEstimate(), 2.0, 0.0); |
| |
| for (double[] values: sketch.getValues()) { |
| Assert.assertEquals(values[0], 3.0); |
| } |
| |
| // after cleanup, the value should always be 0 |
| func.cleanup(); |
| resultTuple = func.getValue(); |
| Assert.assertNotNull(resultTuple); |
| Assert.assertEquals(resultTuple.size(), 1); |
| bytes = (DataByteArray) resultTuple.get(0); |
| Assert.assertTrue(bytes.size() > 0); |
| ArrayOfDoublesSketch sketch2 = ArrayOfDoublesSketches.heapifySketch(Memory.wrap(bytes.get())); |
| Assert.assertEquals(sketch2.getEstimate(), 0.0, 0.0); |
| } |
| |
| @Test |
| public void algebraicInitial() throws Exception { |
| EvalFunc<Tuple> func = new DataToArrayOfDoublesSketch.Initial(null); |
| Tuple inputTuple = TupleFactory.getInstance().newTuple(1); |
| DataBag bag = BagFactory.getInstance().newDefaultBag(); |
| bag.add(PigUtil.objectsToTuple(null, null)); |
| bag.add(PigUtil.objectsToTuple(null, null)); |
| bag.add(PigUtil.objectsToTuple(null, null)); |
| inputTuple.set(0, bag); |
| |
| Tuple resultTuple = func.exec(inputTuple); |
| Assert.assertNotNull(resultTuple); |
| Assert.assertEquals(resultTuple.size(), 1); |
| DataBag resultBag = (DataBag) resultTuple.get(0); |
| Assert.assertEquals(resultBag.size(), 3); |
| } |
| |
| @Test |
| public void algebraicIntermediateFinal() throws Exception { |
| EvalFunc<Tuple> func = new DataToArrayOfDoublesSketch.IntermediateFinal("1"); |
| Tuple inputTuple = TupleFactory.getInstance().newTuple(1); |
| DataBag bag = BagFactory.getInstance().newDefaultBag(); |
| inputTuple.set(0, bag); |
| |
| // this is to simulate the output from Initial |
| bag.add(PigUtil.objectsToTuple(PigUtil.tuplesToBag(PigUtil.objectsToTuple("a", 1.0)))); |
| |
| // this is to simulate the output from a prior call of IntermediateFinal |
| { |
| ArrayOfDoublesUpdatableSketch sketch = new ArrayOfDoublesUpdatableSketchBuilder().build(); |
| sketch.update("b", new double[] {1.0}); |
| sketch.update("a", new double[] {2.0}); |
| sketch.update("b", new double[] {2.0}); |
| bag.add(PigUtil.objectsToTuple(new DataByteArray(sketch.compact().toByteArray()))); |
| } |
| |
| Tuple resultTuple = func.exec(inputTuple); |
| Assert.assertNotNull(resultTuple); |
| Assert.assertEquals(resultTuple.size(), 1); |
| DataByteArray bytes = (DataByteArray) resultTuple.get(0); |
| Assert.assertTrue(bytes.size() > 0); |
| ArrayOfDoublesSketch sketch = ArrayOfDoublesSketches.heapifySketch(Memory.wrap(bytes.get())); |
| Assert.assertEquals(sketch.getEstimate(), 2.0, 0.0); |
| |
| for (double[] values: sketch.getValues()) { |
| Assert.assertEquals(values[0], 3.0); |
| } |
| } |
| |
| @Test |
| public void algebraicIntermediateFinalWithSampling() throws Exception { |
| EvalFunc<Tuple> func = new DataToArrayOfDoublesSketch.IntermediateFinal("1024", "0.5", "1"); |
| |
| DataBag bag = BagFactory.getInstance().newDefaultBag(); |
| int uniques = 10000; |
| for (int i = 0; i < uniques; i++) bag.add(PigUtil.objectsToTuple(i, 1.0)); |
| |
| Tuple resultTuple = func.exec(PigUtil.objectsToTuple(PigUtil.tuplesToBag(PigUtil.objectsToTuple(bag)))); |
| Assert.assertNotNull(resultTuple); |
| Assert.assertEquals(resultTuple.size(), 1); |
| DataByteArray bytes = (DataByteArray) resultTuple.get(0); |
| Assert.assertTrue(bytes.size() > 0); |
| ArrayOfDoublesSketch sketch = ArrayOfDoublesSketches.heapifySketch(Memory.wrap(bytes.get())); |
| Assert.assertEquals(sketch.getEstimate(), uniques, uniques * 0.01); |
| } |
| |
| @Test(expectedExceptions = IllegalArgumentException.class) |
| public void algebraicIntermediateFinalNullBag() throws Exception { |
| EvalFunc<Tuple> func = new DataToArrayOfDoublesSketch.IntermediateFinal("32", "1"); |
| func.exec(TupleFactory.getInstance().newTuple(1)); |
| } |
| |
| @Test(expectedExceptions = IllegalArgumentException.class) |
| public void algebraicIntermediateFinalWrongType() throws Exception { |
| EvalFunc<Tuple> func = new DataToArrayOfDoublesSketch.IntermediateFinal("32", "1"); |
| DataBag bag = BagFactory.getInstance().newDefaultBag(); |
| bag.add(TupleFactory.getInstance().newTuple(1.0)); |
| func.exec(TupleFactory.getInstance().newTuple(bag)); |
| } |
| } |