Merge pull request #35 from DataSketches/kll-udfs
KllFloatsSketch UDFs
diff --git a/pom.xml b/pom.xml
index 6126d25..88f90b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,7 +152,7 @@
<dependency>
<groupId>com.yahoo.datasketches</groupId>
<artifactId>sketches-core</artifactId>
- <version>0.11.0</version>
+ <version>0.12.0</version>
</dependency>
<!-- Hive Dependencies (provided scope) -->
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/DataToSketchUDAF.java b/src/main/java/com/yahoo/sketches/hive/kll/DataToSketchUDAF.java
new file mode 100644
index 0000000..f7348f7
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/DataToSketchUDAF.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+
+@Description(name = "DataToSketch", value = "_FUNC_(value, k) - "
+ + "Returns a KllFloatsSketch in a serialized form as a binary blob."
+ + " Values must be of type float."
+ + " Parameter k controls the accuracy and the size of the sketch."
+ + " If k is ommitted, the default is used.")
+public class DataToSketchUDAF extends AbstractGenericUDAFResolver {
+
+ @Override
+ public GenericUDAFEvaluator getEvaluator(final GenericUDAFParameterInfo info)
+ throws SemanticException {
+ final ObjectInspector[] inspectors = info.getParameterObjectInspectors();
+ if (inspectors.length != 1 && inspectors.length != 2) {
+ throw new UDFArgumentException("One or two arguments expected");
+ }
+ ObjectInspectorValidator.validateGivenPrimitiveCategory(inspectors[0], 0,
+ PrimitiveCategory.FLOAT);
+ if (inspectors.length == 2) {
+ ObjectInspectorValidator.validateGivenPrimitiveCategory(inspectors[1], 1,
+ PrimitiveCategory.INT);
+ }
+ return new DataToSketchEvaluator();
+ }
+
+ static class DataToSketchEvaluator extends SketchEvaluator {
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void iterate(final AggregationBuffer buf, final Object[] data) throws HiveException {
+ if (data[0] == null) { return; }
+ final SketchState state = (SketchState) buf;
+ if (!state.isInitialized()) {
+ if (kInspector_ != null) {
+ state.init(PrimitiveObjectInspectorUtils.getInt(data[1], kInspector_));
+ } else {
+ state.init();
+ }
+ }
+ final float value = (float) inputInspector_.getPrimitiveJavaObject(data[0]);
+ state.update(value);
+ }
+
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/GetPmfFromSketchUDF.java b/src/main/java/com/yahoo/sketches/hive/kll/GetPmfFromSketchUDF.java
new file mode 100644
index 0000000..e3fb773
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/GetPmfFromSketchUDF.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.io.BytesWritable;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+@Description(
+ name = "GetPMF",
+ value = "_FUNC_(sketch, split points...)",
+ extended = "Returns an approximation to the Probability Mass Function (PMF)"
+ + " from a sketch given a set of split points (values)."
+ + " Split points are an array of M unique, monotonically increasing values"
+ + " that divide the real number line into M+1 consecutive disjoint intervals."
+ + " The function returns an array of M+1 doubles, each of which is an approximation"
+ + " to the fraction of the values that fell into one of those intervals."
+ + " The definition of an interval is inclusive of the left split point and exclusive"
+ + " of the right split point")
+public class GetPmfFromSketchUDF extends UDF {
+
+ /**
+ * Returns a list of fractions (PMF) from a given sketch
+ * @param serializedSketch serialized sketch
+ * @param splitPoints list of unique and monotonically increasing values
+ * @return list of fractions from 0 to 1
+ */
+ public List<Double> evaluate(final BytesWritable serializedSketch, final Float... splitPoints) {
+ if (serializedSketch == null) { return null; }
+ final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(serializedSketch.getBytes()));
+ final double[] pmf = sketch.getPMF(Util.objectsToPrimitives(splitPoints));
+ if (pmf == null) { return null; }
+ return Util.primitivesToList(pmf);
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/GetQuantileFromSketchUDF.java b/src/main/java/com/yahoo/sketches/hive/kll/GetQuantileFromSketchUDF.java
new file mode 100644
index 0000000..e05a7a1
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/GetQuantileFromSketchUDF.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.io.BytesWritable;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+@Description(name = "GetQuantile", value = "_FUNC_(sketch, fraction)",
+extended = " Returns a quantile value from a given KllFloatsSketch."
++ " A single value for a given fraction is returned."
++ " The fraction represents a normalized rank, and must be from 0 to 1 inclusive."
++ " For example, a fraction of 0.5 corresponds to 50th percentile, which is"
++ " the median value of the distribution (the number separating the higher half"
++ " of the probability distribution from the lower half).")
+public class GetQuantileFromSketchUDF extends UDF {
+
+ /**
+ * Returns a quantile value from a given sketch
+ * @param serializedSketch serialized sketch
+ * @param fraction value from 0 to 1 inclusive
+ * @return quantile value
+ */
+ public Float evaluate(final BytesWritable serializedSketch, final double fraction) {
+ if (serializedSketch == null) { return null; }
+ final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(serializedSketch.getBytes()));
+ return sketch.getQuantile(fraction);
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/GetQuantilesFromSketchUDF.java b/src/main/java/com/yahoo/sketches/hive/kll/GetQuantilesFromSketchUDF.java
new file mode 100644
index 0000000..5bc117c
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/GetQuantilesFromSketchUDF.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.io.BytesWritable;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+@Description(
+ name = "GetQuantiles",
+ value = "_FUNC_(sketch, fractions...)",
+ extended = "Returns quantile values from a given KllFloatsSketch based on a given list of fractions."
+ + " The fractions represent normalized ranks, and must be from 0 to 1 inclusive."
+ + " For example, a fraction of 0.5 corresponds to 50th percentile,"
+ + " which is the median value of the distribution (the number separating the higher"
+ + " half of the probability distribution from the lower half).")
+public class GetQuantilesFromSketchUDF extends UDF {
+
+ /**
+ * Returns a list of quantile values from a given sketch
+ * @param serializedSketch serialized sketch
+ * @param fractions list of values from 0 to 1 inclusive
+ * @return list of quantile values
+ */
+ public List<Float> evaluate(final BytesWritable serializedSketch, final Double... fractions) {
+ if (serializedSketch == null) { return null; }
+ final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(serializedSketch.getBytes()));
+ return Util.primitivesToList(sketch.getQuantiles(Util.objectsToPrimitives(fractions)));
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/GetRankFromSketchUDF.java b/src/main/java/com/yahoo/sketches/hive/kll/GetRankFromSketchUDF.java
new file mode 100644
index 0000000..c8a2707
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/GetRankFromSketchUDF.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.io.BytesWritable;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+@Description(name = "GetRank", value = "_FUNC_(sketch, value)",
+extended = " Returns a normalized rank of a given value from a given KllFloatsSketch."
++ " The returned rank is an approximation to the fraction of values of the distribution"
++ " that are less than the given value (mass of the distribution below the given value).")
+public class GetRankFromSketchUDF extends UDF {
+
+ /**
+ * Returns a normalized rank of a given value from a given sketch
+ * @param serializedSketch serialized sketch
+ * @param value
+ * @return rank
+ */
+ public Double evaluate(final BytesWritable serializedSketch, final float value) {
+ if (serializedSketch == null) { return null; }
+ final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(serializedSketch.getBytes()));
+ return sketch.getRank(value);
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/ObjectInspectorValidator.java b/src/main/java/com/yahoo/sketches/hive/kll/ObjectInspectorValidator.java
new file mode 100644
index 0000000..b82e39a
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/ObjectInspectorValidator.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+
+final class ObjectInspectorValidator {
+
+ static void validateCategoryPrimitive(
+ final ObjectInspector inspector, final int index) throws SemanticException {
+ if (inspector.getCategory() != ObjectInspector.Category.PRIMITIVE) {
+ throw new UDFArgumentTypeException(index, "Primitive argument expected, but "
+ + inspector.getCategory().name() + " was recieved");
+ }
+ }
+
+ static void validateGivenPrimitiveCategory(final ObjectInspector inspector, final int index,
+ final PrimitiveObjectInspector.PrimitiveCategory category) throws SemanticException
+ {
+ validateCategoryPrimitive(inspector, index);
+ final PrimitiveObjectInspector primitiveInspector = (PrimitiveObjectInspector) inspector;
+ if (primitiveInspector.getPrimitiveCategory() != category) {
+ throw new UDFArgumentTypeException(index, category.name() + " value expected as the argument "
+ + (index + 1) + " but " + primitiveInspector.getPrimitiveCategory().name() + " was received");
+ }
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/SketchEvaluator.java b/src/main/java/com/yahoo/sketches/hive/kll/SketchEvaluator.java
new file mode 100644
index 0000000..ba1af04
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/SketchEvaluator.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+abstract class SketchEvaluator extends GenericUDAFEvaluator {
+
+ protected PrimitiveObjectInspector inputInspector_;
+ protected PrimitiveObjectInspector kInspector_;
+
+ @Override
+ public ObjectInspector init(final Mode mode, final ObjectInspector[] parameters) throws HiveException {
+ super.init(mode, parameters);
+ inputInspector_ = (PrimitiveObjectInspector) parameters[0];
+
+ // Parameters:
+ // In PARTIAL1 and COMPLETE mode, the parameters are original data.
+ // In PARTIAL2 and FINAL mode, the parameters are partial aggregations.
+ if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {
+ if (parameters.length > 1) {
+ kInspector_ = (PrimitiveObjectInspector) parameters[1];
+ }
+ }
+
+ return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveCategory.BINARY);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void reset(final AggregationBuffer buf) throws HiveException {
+ final SketchState state = (SketchState) buf;
+ state.reset();
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public Object terminatePartial(final AggregationBuffer buf) throws HiveException {
+ return terminate(buf);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void merge(final AggregationBuffer buf, final Object data) throws HiveException {
+ if (data == null) { return; }
+ final SketchState state = (SketchState) buf;
+ final BytesWritable serializedSketch =
+ (BytesWritable) inputInspector_.getPrimitiveWritableObject(data);
+ state.update(KllFloatsSketch.heapify(Memory.wrap(serializedSketch.getBytes())));
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public Object terminate(final AggregationBuffer buf) throws HiveException {
+ final SketchState state = (SketchState) buf;
+ final KllFloatsSketch resultSketch = state.getResult();
+ if (resultSketch == null) { return null; }
+ return new BytesWritable(resultSketch.toByteArray());
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ return new SketchState();
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/SketchState.java b/src/main/java/com/yahoo/sketches/hive/kll/SketchState.java
new file mode 100644
index 0000000..c5feb2d
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/SketchState.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AbstractAggregationBuffer;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+class SketchState extends AbstractAggregationBuffer {
+
+ private KllFloatsSketch state_;
+
+ // initialization is needed in the first phase (iterate) only
+ void init() {
+ state_ = new KllFloatsSketch();
+ }
+
+ void init(final int k) {
+ state_ = new KllFloatsSketch(k);
+ }
+
+ boolean isInitialized() {
+ return state_ != null;
+ }
+
+ void update(final float value) {
+ state_.update(value);
+ }
+
+ void update(final KllFloatsSketch sketch) {
+ if (state_ == null) {
+ state_ = sketch;
+ } else {
+ state_.merge(sketch);
+ }
+ }
+
+ public KllFloatsSketch getResult() {
+ return state_;
+ }
+
+ void reset() {
+ state_ = null;
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/SketchToStringUDF.java b/src/main/java/com/yahoo/sketches/hive/kll/SketchToStringUDF.java
new file mode 100644
index 0000000..6bb8cfb
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/SketchToStringUDF.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.io.BytesWritable;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+@Description(name = "SketchToString", value = "_FUNC_(sketch)",
+extended = " Returns a human-readable summary of a given KllFloatsSketch.")
+public class SketchToStringUDF extends UDF {
+
+ /**
+ * Returns a summary of a given sketch
+ * @param serializedSketch serialized sketch
+ * @return text summary
+ */
+ public String evaluate(final BytesWritable serializedSketch) {
+ if (serializedSketch == null) { return null; }
+ final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(serializedSketch.getBytes()));
+ return sketch.toString();
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/UnionSketchUDAF.java b/src/main/java/com/yahoo/sketches/hive/kll/UnionSketchUDAF.java
new file mode 100644
index 0000000..b3c6b41
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/UnionSketchUDAF.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+
+@Description(name = "UnionSketch", value = "_FUNC_(sketch) - "
+ + "Returns a KllFloatsSketch in a serialized form as a binary blob."
+ + " Input values are also serialized sketches.")
+public class UnionSketchUDAF extends AbstractGenericUDAFResolver {
+
+ @Override
+ public GenericUDAFEvaluator getEvaluator(final GenericUDAFParameterInfo info) throws SemanticException {
+ final ObjectInspector[] inspectors = info.getParameterObjectInspectors();
+ if (inspectors.length != 1 && inspectors.length != 2) {
+ throw new UDFArgumentException("One or two arguments expected");
+ }
+ ObjectInspectorValidator.validateGivenPrimitiveCategory(inspectors[0], 0, PrimitiveCategory.BINARY);
+ if (inspectors.length == 2) {
+ ObjectInspectorValidator.validateGivenPrimitiveCategory(inspectors[1], 1, PrimitiveCategory.INT);
+ }
+ return new UnionEvaluator();
+ }
+
+ static class UnionEvaluator extends SketchEvaluator {
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void iterate(final AggregationBuffer buf, final Object[] data) throws HiveException {
+ if (data[0] == null) { return; }
+ final SketchState state = (SketchState) buf;
+ if (!state.isInitialized()) {
+ if (kInspector_ != null) {
+ state.init(PrimitiveObjectInspectorUtils.getInt(data[1], kInspector_));
+ } else {
+ state.init();
+ }
+ }
+ merge(buf, data[0]);
+ }
+
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/Util.java b/src/main/java/com/yahoo/sketches/hive/kll/Util.java
new file mode 100644
index 0000000..a7acb14
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/Util.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import java.util.ArrayList;
+import java.util.List;
+
+final class Util {
+
+ static float[] objectsToPrimitives(final Float[] array) {
+ final float[] result = new float[array.length];
+ for (int i = 0; i < array.length; i++) {
+ result[i] = array[i];
+ }
+ return result;
+ }
+
+ static double[] objectsToPrimitives(final Double[] array) {
+ final double[] result = new double[array.length];
+ for (int i = 0; i < array.length; i++) {
+ result[i] = array[i];
+ }
+ return result;
+ }
+
+ static List<Float> primitivesToList(final float[] array) {
+ final List<Float> result = new ArrayList<Float>(array.length);
+ for (float item: array) { result.add(item); }
+ return result;
+ }
+
+ static List<Double> primitivesToList(final double[] array) {
+ final List<Double> result = new ArrayList<Double>(array.length);
+ for (double item: array) { result.add(item); }
+ return result;
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/package-info.java b/src/main/java/com/yahoo/sketches/hive/kll/package-info.java
new file mode 100644
index 0000000..b74f994
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/package-info.java
@@ -0,0 +1,11 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+/**
+ * Hive UDFs for KllFloatsSketch sketch.
+ *
+ * @author Alexander Saydakov
+ */
+package com.yahoo.sketches.hive.kll;
diff --git a/src/test/java/com/yahoo/sketches/hive/kll/DataToSketchUDAFTest.java b/src/test/java/com/yahoo/sketches/hive/kll/DataToSketchUDAFTest.java
new file mode 100644
index 0000000..94b7c2b
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/hive/kll/DataToSketchUDAFTest.java
@@ -0,0 +1,235 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+public class DataToSketchUDAFTest {
+
+ static final ObjectInspector floatInspector =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveCategory.FLOAT);
+
+ static final ObjectInspector intInspector =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveCategory.INT);
+
+ static final ObjectInspector binaryInspector =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveCategory.BINARY);
+
+ static final ObjectInspector structInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+ Arrays.asList("a"),
+ Arrays.asList(intInspector)
+ );
+
+ @Test(expectedExceptions = UDFArgumentException.class)
+ public void getEvaluatorTooFewInspectors() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ new DataToSketchUDAF().getEvaluator(info);
+ }
+
+ @Test(expectedExceptions = UDFArgumentException.class)
+ public void getEvaluatorTooManyInspectors() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { floatInspector, intInspector, intInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ new DataToSketchUDAF().getEvaluator(info);
+ }
+
+ @Test(expectedExceptions = UDFArgumentException.class)
+ public void getEvaluatorWrongCategoryArg1() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { structInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ new DataToSketchUDAF().getEvaluator(info);
+ }
+
+ @Test(expectedExceptions = UDFArgumentException.class)
+ public void getEvaluatorWrongTypeArg1() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { intInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ new DataToSketchUDAF().getEvaluator(info);
+ }
+
+ @Test(expectedExceptions = UDFArgumentException.class)
+ public void getEvaluatorWrongCategoryArg2() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { floatInspector, structInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ new DataToSketchUDAF().getEvaluator(info);
+ }
+
+ @Test(expectedExceptions = UDFArgumentException.class)
+ public void getEvaluatorWrongTypeArg2() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { floatInspector, floatInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ new DataToSketchUDAF().getEvaluator(info);
+ }
+
+ // PARTIAL1 mode (Map phase in Map-Reduce): iterate + terminatePartial
+ @Test
+ public void partial1ModeDefaultK() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { floatInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ try (GenericUDAFEvaluator eval = new DataToSketchUDAF().getEvaluator(info)) {
+ ObjectInspector resultInspector = eval.init(Mode.PARTIAL1, inspectors);
+ checkResultInspector(resultInspector);
+
+ SketchState state = (SketchState) eval.getNewAggregationBuffer();
+ eval.iterate(state, new Object[] { new FloatWritable(1) });
+ eval.iterate(state, new Object[] { new FloatWritable(2) });
+
+ BytesWritable bytes = (BytesWritable) eval.terminatePartial(state);
+ KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+ Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(200, false));
+ Assert.assertEquals(resultSketch.getNumRetained(), 2);
+ Assert.assertEquals(resultSketch.getMinValue(), 1f);
+ Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+ }
+ }
+
+ @Test
+ public void partial1ModeGivenK() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { floatInspector, intInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ try (GenericUDAFEvaluator eval = new DataToSketchUDAF().getEvaluator(info)) {
+ ObjectInspector resultInspector = eval.init(Mode.PARTIAL1, inspectors);
+ checkResultInspector(resultInspector);
+
+ SketchState state = (SketchState) eval.getNewAggregationBuffer();
+ eval.iterate(state, new Object[] { new FloatWritable(1), new IntWritable(400) });
+ eval.iterate(state, new Object[] { new FloatWritable(2), new IntWritable(400) });
+
+ BytesWritable bytes = (BytesWritable) eval.terminatePartial(state);
+ KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+ Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(400, false));
+ Assert.assertEquals(resultSketch.getNumRetained(), 2);
+ Assert.assertEquals(resultSketch.getMinValue(), 1f);
+ Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+ }
+ }
+
+ // PARTIAL2 mode (Combine phase in Map-Reduce): merge + terminatePartial
+ @Test
+ public void partial2Mode() throws Exception {
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(new ObjectInspector[] { floatInspector }, false, false);
+ try (GenericUDAFEvaluator eval = new DataToSketchUDAF().getEvaluator(info)) {
+ ObjectInspector resultInspector = eval.init(Mode.PARTIAL2, new ObjectInspector[] {binaryInspector});
+ checkResultInspector(resultInspector);
+
+ SketchState state = (SketchState) eval.getNewAggregationBuffer();
+
+ KllFloatsSketch sketch1 = new KllFloatsSketch();
+ sketch1.update(1);
+ eval.merge(state, new BytesWritable(sketch1.toByteArray()));
+
+ KllFloatsSketch sketch2 = new KllFloatsSketch();
+ sketch2.update(2);
+ eval.merge(state, new BytesWritable(sketch2.toByteArray()));
+
+ BytesWritable bytes = (BytesWritable) eval.terminate(state);
+ KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+ Assert.assertEquals(resultSketch.getNumRetained(), 2);
+ Assert.assertEquals(resultSketch.getMinValue(), 1f);
+ Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+ }
+ }
+
+ // FINAL mode (Reduce phase in Map-Reduce): merge + terminate
+ @Test
+ public void finalMode() throws Exception {
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(new ObjectInspector[] { floatInspector }, false, false);
+ try (GenericUDAFEvaluator eval = new DataToSketchUDAF().getEvaluator(info)) {
+ ObjectInspector resultInspector = eval.init(Mode.FINAL, new ObjectInspector[] {binaryInspector});
+ checkResultInspector(resultInspector);
+
+ SketchState state = (SketchState) eval.getNewAggregationBuffer();
+
+ KllFloatsSketch sketch1 = new KllFloatsSketch(400); // to check if K is preserved
+ sketch1.update(1);
+ eval.merge(state, new BytesWritable(sketch1.toByteArray()));
+
+ KllFloatsSketch sketch2 = new KllFloatsSketch(400);
+ sketch2.update(2);
+ eval.merge(state, new BytesWritable(sketch2.toByteArray()));
+
+ BytesWritable bytes = (BytesWritable) eval.terminate(state);
+ KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+ Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(400, false));
+ Assert.assertEquals(resultSketch.getNumRetained(), 2);
+ Assert.assertEquals(resultSketch.getMinValue(), 1f);
+ Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+ }
+ }
+
+ // COMPLETE mode (single mode, alternative to MapReduce): iterate + terminate
+ @Test
+ public void completeModeDefaultK() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { floatInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ try (GenericUDAFEvaluator eval = new DataToSketchUDAF().getEvaluator(info)) {
+ ObjectInspector resultInspector = eval.init(Mode.COMPLETE, inspectors);
+ checkResultInspector(resultInspector);
+
+ SketchState state = (SketchState) eval.getNewAggregationBuffer();
+ eval.iterate(state, new Object[] { new FloatWritable(1) });
+ eval.iterate(state, new Object[] { new FloatWritable(2) });
+
+ BytesWritable bytes = (BytesWritable) eval.terminate(state);
+ KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+ Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(200, false));
+ Assert.assertEquals(resultSketch.getNumRetained(), 2);
+ Assert.assertEquals(resultSketch.getMinValue(), 1f);
+ Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+ }
+ }
+
+ @Test
+ public void completeModeGivenK() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { floatInspector, intInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ try (GenericUDAFEvaluator eval = new DataToSketchUDAF().getEvaluator(info)) {
+ ObjectInspector resultInspector = eval.init(Mode.COMPLETE, inspectors);
+ checkResultInspector(resultInspector);
+
+ SketchState state = (SketchState) eval.getNewAggregationBuffer();
+ eval.iterate(state, new Object[] { new FloatWritable(1), new IntWritable(400) });
+ eval.iterate(state, new Object[] { new FloatWritable(2), new IntWritable(400) });
+
+ BytesWritable bytes = (BytesWritable) eval.terminate(state);
+ KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+ Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(400, false));
+ Assert.assertEquals(resultSketch.getNumRetained(), 2);
+ Assert.assertEquals(resultSketch.getMinValue(), 1f);
+ Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+ }
+ }
+
+ static void checkResultInspector(ObjectInspector resultInspector) {
+ Assert.assertNotNull(resultInspector);
+ Assert.assertEquals(resultInspector.getCategory(), ObjectInspector.Category.PRIMITIVE);
+ Assert.assertEquals(
+ ((PrimitiveObjectInspector) resultInspector).getPrimitiveCategory(),
+ PrimitiveObjectInspector.PrimitiveCategory.BINARY
+ );
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/hive/kll/GetPmfFromSketchUDFTest.java b/src/test/java/com/yahoo/sketches/hive/kll/GetPmfFromSketchUDFTest.java
new file mode 100644
index 0000000..33b3a00
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/hive/kll/GetPmfFromSketchUDFTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import java.util.List;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+public class GetPmfFromSketchUDFTest {
+
+ @Test
+ public void nullSketch() {
+ List<Double> result = new GetPmfFromSketchUDF().evaluate(null, 0f);
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void emptyListOfSplitPoints() {
+ KllFloatsSketch sketch = new KllFloatsSketch();
+ sketch.update(1);
+ sketch.update(2);
+ sketch.update(3);
+ List<Double> result = new GetPmfFromSketchUDF().evaluate(new BytesWritable(sketch.toByteArray()));
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0), 1.0);
+ }
+
+ @Test
+ public void emptySketch() {
+ KllFloatsSketch sketch = new KllFloatsSketch();
+ List<Double> result = new GetPmfFromSketchUDF().evaluate(new BytesWritable(sketch.toByteArray()), 0f);
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void normalCase() {
+ KllFloatsSketch sketch = new KllFloatsSketch();
+ sketch.update(1);
+ sketch.update(2);
+ sketch.update(3);
+ sketch.update(4);
+ List<Double> result = new GetPmfFromSketchUDF().evaluate(new BytesWritable(sketch.toByteArray()), 1f, 3f, 5f);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result.size(), 4);
+ Assert.assertEquals(result.get(0), 0.0);
+ Assert.assertEquals(result.get(1), 0.5);
+ Assert.assertEquals(result.get(2), 0.5);
+ Assert.assertEquals(result.get(3), 0.0);
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/hive/kll/GetQuantileFromSektchUDFTest.java b/src/test/java/com/yahoo/sketches/hive/kll/GetQuantileFromSektchUDFTest.java
new file mode 100644
index 0000000..ee2d1f8
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/hive/kll/GetQuantileFromSektchUDFTest.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+public class GetQuantileFromSektchUDFTest {
+
+ @Test
+ public void nullSketch() {
+ final Float result = new GetQuantileFromSketchUDF().evaluate(null, 0);
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void normalCase() {
+ KllFloatsSketch sketch = new KllFloatsSketch();
+ sketch.update(1);
+ sketch.update(2);
+ sketch.update(3);
+ final Float result = new GetQuantileFromSketchUDF().evaluate(new BytesWritable(sketch.toByteArray()), 0.5);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result, 2f);
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/hive/kll/GetQuantilesFromSketchUDFTest.java b/src/test/java/com/yahoo/sketches/hive/kll/GetQuantilesFromSketchUDFTest.java
new file mode 100644
index 0000000..08fa45f
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/hive/kll/GetQuantilesFromSketchUDFTest.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import java.util.List;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+public class GetQuantilesFromSketchUDFTest {
+
+ @Test
+ public void nullSketch() {
+ final List<Float> result = new GetQuantilesFromSketchUDF().evaluate(null, 0.0);
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void emptyListOfFractions() {
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ sketch.update(1);
+ sketch.update(2);
+ sketch.update(3);
+ final List<Float> result = new GetQuantilesFromSketchUDF().evaluate(new BytesWritable(sketch.toByteArray()));
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result.size(), 0);
+ }
+
+ @Test
+ public void fractionsNormalCase() {
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ sketch.update(1);
+ sketch.update(2);
+ sketch.update(3);
+ final List<Float> result = new GetQuantilesFromSketchUDF().evaluate(new BytesWritable(sketch.toByteArray()), 0.0, 0.5, 1.0);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result.size(), 3);
+ Assert.assertEquals(result.get(0), 1f);
+ Assert.assertEquals(result.get(1), 2f);
+ Assert.assertEquals(result.get(2), 3f);
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/hive/kll/GetRankFromSektchUDFTest.java b/src/test/java/com/yahoo/sketches/hive/kll/GetRankFromSektchUDFTest.java
new file mode 100644
index 0000000..30011fa
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/hive/kll/GetRankFromSektchUDFTest.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+public class GetRankFromSektchUDFTest {
+
+ @Test
+ public void nullSketch() {
+ final Double result = new GetRankFromSketchUDF().evaluate(null, 0);
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void normalCase() {
+ KllFloatsSketch sketch = new KllFloatsSketch();
+ sketch.update(1);
+ sketch.update(2);
+ sketch.update(3);
+ sketch.update(4);
+ final Double result = new GetRankFromSketchUDF().evaluate(new BytesWritable(sketch.toByteArray()), 3f);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result, 0.5);
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/hive/kll/SektchToStringUDFTest.java b/src/test/java/com/yahoo/sketches/hive/kll/SektchToStringUDFTest.java
new file mode 100644
index 0000000..e2c383e
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/hive/kll/SektchToStringUDFTest.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+public class SektchToStringUDFTest {
+
+ @Test
+ public void nullSketch() {
+ final String result = new SketchToStringUDF().evaluate(null);
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void normalCase() {
+ KllFloatsSketch sketch = new KllFloatsSketch();
+ sketch.update(1);
+ sketch.update(2);
+ sketch.update(3);
+ sketch.update(4);
+ final String result = new SketchToStringUDF().evaluate(new BytesWritable(sketch.toByteArray()));
+ Assert.assertNotNull(result);
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/hive/kll/UnionSketchUDAFTest.java b/src/test/java/com/yahoo/sketches/hive/kll/UnionSketchUDAFTest.java
new file mode 100644
index 0000000..6552389
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/hive/kll/UnionSketchUDAFTest.java
@@ -0,0 +1,225 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+public class UnionSketchUDAFTest {
+
+ static final ObjectInspector binaryInspector =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveCategory.BINARY);
+
+ static final ObjectInspector intInspector =
+ PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveCategory.INT);
+
+ static final ObjectInspector structInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+ Arrays.asList("a"),
+ Arrays.asList(binaryInspector)
+ );
+
+ @Test(expectedExceptions = UDFArgumentException.class)
+ public void getEvaluatorTooFewInspectors() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ new UnionSketchUDAF().getEvaluator(info);
+ }
+
+ @Test(expectedExceptions = UDFArgumentException.class)
+ public void getEvaluatorTooManyInspectors() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { binaryInspector, binaryInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ new UnionSketchUDAF().getEvaluator(info);
+ }
+
+ @Test(expectedExceptions = UDFArgumentTypeException.class)
+ public void getEvaluatorWrongCategoryArg1() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { structInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ new UnionSketchUDAF().getEvaluator(info);
+ }
+
+ @Test(expectedExceptions = UDFArgumentTypeException.class)
+ public void getEvaluatorWrongTypeArg1() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { intInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ new UnionSketchUDAF().getEvaluator(info);
+ }
+
+ @Test(expectedExceptions = UDFArgumentTypeException.class)
+ public void getEvaluatorWrongCategoryArg2() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { binaryInspector, structInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ new UnionSketchUDAF().getEvaluator(info);
+ }
+
+ @Test(expectedExceptions = UDFArgumentTypeException.class)
+ public void getEvaluatorWrongTypeArg2() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { binaryInspector, binaryInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ new UnionSketchUDAF().getEvaluator(info);
+ }
+
+ // PARTIAL1 mode (Map phase in Map-Reduce): iterate + terminatePartial
+ @Test
+ public void partia1ModelDefaultKDowsizeInput() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { binaryInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ try (GenericUDAFEvaluator eval = new UnionSketchUDAF().getEvaluator(info)) {
+ ObjectInspector resultInspector = eval.init(Mode.PARTIAL1, inspectors);
+ DataToSketchUDAFTest.checkResultInspector(resultInspector);
+
+ SketchState state = (SketchState) eval.getNewAggregationBuffer();
+
+ KllFloatsSketch sketch1 = new KllFloatsSketch(400);
+ sketch1.update(1);
+ eval.iterate(state, new Object[] { new BytesWritable(sketch1.toByteArray()) });
+
+ KllFloatsSketch sketch2 = new KllFloatsSketch(400);
+ sketch2.update(2);
+ eval.iterate(state, new Object[] { new BytesWritable(sketch2.toByteArray()) });
+
+ BytesWritable bytes = (BytesWritable) eval.terminatePartial(state);
+ KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+ Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(200, false));
+ Assert.assertEquals(resultSketch.getNumRetained(), 2);
+ Assert.assertEquals(resultSketch.getMinValue(), 1f);
+ Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+ }
+ }
+
+ @Test
+ public void partia1ModelGivenK() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { binaryInspector, intInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ try (GenericUDAFEvaluator eval = new UnionSketchUDAF().getEvaluator(info)) {
+ ObjectInspector resultInspector = eval.init(Mode.PARTIAL1, inspectors);
+ DataToSketchUDAFTest.checkResultInspector(resultInspector);
+
+ SketchState state = (SketchState) eval.getNewAggregationBuffer();
+
+ KllFloatsSketch sketch1 = new KllFloatsSketch(400);
+ sketch1.update(1);
+ eval.iterate(state, new Object[] { new BytesWritable(sketch1.toByteArray()), new IntWritable(400) });
+
+ KllFloatsSketch sketch2 = new KllFloatsSketch(400);
+ sketch2.update(2);
+ eval.iterate(state, new Object[] { new BytesWritable(sketch2.toByteArray()), new IntWritable(400) });
+
+ BytesWritable bytes = (BytesWritable) eval.terminatePartial(state);
+ KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+ Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(400, false));
+ Assert.assertEquals(resultSketch.getNumRetained(), 2);
+ Assert.assertEquals(resultSketch.getMinValue(), 1f);
+ Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+ }
+ }
+
+ // PARTIAL2 mode (Combine phase in Map-Reduce): merge + terminatePartial
+ @Test
+ public void partial2Mode() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { binaryInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ try (GenericUDAFEvaluator eval = new UnionSketchUDAF().getEvaluator(info)) {
+ ObjectInspector resultInspector = eval.init(Mode.PARTIAL2, inspectors);
+ DataToSketchUDAFTest.checkResultInspector(resultInspector);
+
+ SketchState state = (SketchState) eval.getNewAggregationBuffer();
+
+ KllFloatsSketch sketch1 = new KllFloatsSketch(400);
+ sketch1.update(1);
+ eval.merge(state, new BytesWritable(sketch1.toByteArray()));
+
+ KllFloatsSketch sketch2 = new KllFloatsSketch(400);
+ sketch2.update(2);
+ eval.merge(state, new BytesWritable(sketch2.toByteArray()));
+
+ BytesWritable bytes = (BytesWritable) eval.terminatePartial(state);
+ KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+ Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(400, false));
+ Assert.assertEquals(resultSketch.getNumRetained(), 2);
+ Assert.assertEquals(resultSketch.getMinValue(), 1f);
+ Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+ }
+ }
+
+ // FINAL mode (Reduce phase in Map-Reduce): merge + terminate
+ @Test
+ public void finalMode() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { binaryInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ try (GenericUDAFEvaluator eval = new UnionSketchUDAF().getEvaluator(info)) {
+ ObjectInspector resultInspector = eval.init(Mode.FINAL, inspectors);
+ DataToSketchUDAFTest.checkResultInspector(resultInspector);
+
+ SketchState state = (SketchState) eval.getNewAggregationBuffer();
+
+ KllFloatsSketch sketch1 = new KllFloatsSketch(400);
+ sketch1.update(1);
+ eval.merge(state, new BytesWritable(sketch1.toByteArray()));
+
+ KllFloatsSketch sketch2 = new KllFloatsSketch(400);
+ sketch2.update(2);
+ eval.merge(state, new BytesWritable(sketch2.toByteArray()));
+
+ BytesWritable bytes = (BytesWritable) eval.terminate(state);
+ KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+ Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(400, false));
+ Assert.assertEquals(resultSketch.getNumRetained(), 2);
+ Assert.assertEquals(resultSketch.getMinValue(), 1f);
+ Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+ }
+ }
+
+ // COMPLETE mode (single mode, alternative to MapReduce): iterate + terminate
+ @Test
+ public void completeModelDefaultK() throws Exception {
+ ObjectInspector[] inspectors = new ObjectInspector[] { binaryInspector };
+ GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+ try (GenericUDAFEvaluator eval = new UnionSketchUDAF().getEvaluator(info)) {
+ ObjectInspector resultInspector = eval.init(Mode.COMPLETE, inspectors);
+ DataToSketchUDAFTest.checkResultInspector(resultInspector);
+
+ SketchState state = (SketchState) eval.getNewAggregationBuffer();
+
+ KllFloatsSketch sketch1 = new KllFloatsSketch();
+ sketch1.update(1);
+ eval.iterate(state, new Object[] { new BytesWritable(sketch1.toByteArray()) });
+
+ KllFloatsSketch sketch2 = new KllFloatsSketch();
+ sketch2.update(2);
+ eval.iterate(state, new Object[] { new BytesWritable(sketch2.toByteArray()) });
+
+ BytesWritable bytes = (BytesWritable) eval.terminatePartial(state);
+ KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+ Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(200, false));
+ Assert.assertEquals(resultSketch.getNumRetained(), 2);
+ Assert.assertEquals(resultSketch.getMinValue(), 1f);
+ Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+
+ eval.reset(state);
+ Assert.assertNull(eval.terminate(state));
+ }
+ }
+
+}