Merge pull request #35 from DataSketches/kll-udfs

KllFloatsSketch UDFs
diff --git a/pom.xml b/pom.xml
index 6126d25..88f90b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,7 +152,7 @@
     <dependency>
       <groupId>com.yahoo.datasketches</groupId>
       <artifactId>sketches-core</artifactId>
-      <version>0.11.0</version>
+      <version>0.12.0</version>
     </dependency>
 
     <!-- Hive Dependencies (provided scope) -->
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/DataToSketchUDAF.java b/src/main/java/com/yahoo/sketches/hive/kll/DataToSketchUDAF.java
new file mode 100644
index 0000000..f7348f7
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/DataToSketchUDAF.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+
+@Description(name = "DataToSketch", value = "_FUNC_(value, k) - "
+  + "Returns a KllFloatsSketch in a serialized form as a binary blob."
+  + " Values must be of type float."
+  + " Parameter k controls the accuracy and the size of the sketch."
+  + " If k is ommitted, the default is used.")
+public class DataToSketchUDAF extends AbstractGenericUDAFResolver {
+
+  @Override
+  public GenericUDAFEvaluator getEvaluator(final GenericUDAFParameterInfo info)
+      throws SemanticException {
+    final ObjectInspector[] inspectors = info.getParameterObjectInspectors();
+    if (inspectors.length != 1 && inspectors.length != 2) {
+      throw new UDFArgumentException("One or two arguments expected");
+    }
+    ObjectInspectorValidator.validateGivenPrimitiveCategory(inspectors[0], 0,
+        PrimitiveCategory.FLOAT);
+    if (inspectors.length == 2) {
+      ObjectInspectorValidator.validateGivenPrimitiveCategory(inspectors[1], 1,
+          PrimitiveCategory.INT);
+    }
+    return new DataToSketchEvaluator();
+  }
+
+  static class DataToSketchEvaluator extends SketchEvaluator {
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public void iterate(final AggregationBuffer buf, final Object[] data) throws HiveException {
+      if (data[0] == null) { return; }
+      final SketchState state = (SketchState) buf;
+      if (!state.isInitialized()) {
+        if (kInspector_ != null) {
+          state.init(PrimitiveObjectInspectorUtils.getInt(data[1], kInspector_));
+        } else {
+          state.init();
+        }
+      }
+      final float value = (float) inputInspector_.getPrimitiveJavaObject(data[0]);
+      state.update(value);
+    }
+
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/GetPmfFromSketchUDF.java b/src/main/java/com/yahoo/sketches/hive/kll/GetPmfFromSketchUDF.java
new file mode 100644
index 0000000..e3fb773
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/GetPmfFromSketchUDF.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.io.BytesWritable;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+@Description(
+  name = "GetPMF",
+  value = "_FUNC_(sketch, split points...)",
+  extended = "Returns an approximation to the Probability Mass Function (PMF)"
+  + " from a sketch given a set of split points (values)."
+  + " Split points are an array of M unique, monotonically increasing values"
+  + " that divide the real number line into M+1 consecutive disjoint intervals."
+  + " The function returns an array of M+1 doubles, each of which is an approximation"
+  + " to the fraction of the values that fell into one of those intervals."
+  + " The definition of an interval is inclusive of the left split point and exclusive"
+  + " of the right split point")
+public class GetPmfFromSketchUDF extends UDF {
+
+  /**
+   * Returns a list of fractions (PMF) from a given sketch
+   * @param serializedSketch serialized sketch
+   * @param splitPoints list of unique and monotonically increasing values
+   * @return list of fractions from 0 to 1
+   */
+  public List<Double> evaluate(final BytesWritable serializedSketch, final Float... splitPoints) {
+    if (serializedSketch == null) { return null; }
+    final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(serializedSketch.getBytes()));
+    final double[] pmf = sketch.getPMF(Util.objectsToPrimitives(splitPoints));
+    if (pmf == null) { return null; }
+    return Util.primitivesToList(pmf);
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/GetQuantileFromSketchUDF.java b/src/main/java/com/yahoo/sketches/hive/kll/GetQuantileFromSketchUDF.java
new file mode 100644
index 0000000..e05a7a1
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/GetQuantileFromSketchUDF.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.io.BytesWritable;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+@Description(name = "GetQuantile", value = "_FUNC_(sketch, fraction)",
+extended = " Returns a quantile value from a given KllFloatsSketch."
++ " A single value for a given fraction is returned."
++ " The fraction represents a normalized rank, and must be from 0 to 1 inclusive."
++ " For example, a fraction of 0.5 corresponds to 50th percentile, which is"
++ " the median value of the distribution (the number separating the higher half"
++ " of the probability distribution from the lower half).")
+public class GetQuantileFromSketchUDF extends UDF {
+
+  /**
+   * Returns a quantile value from a given sketch
+   * @param serializedSketch serialized sketch
+   * @param fraction value from 0 to 1 inclusive
+   * @return quantile value
+   */
+  public Float evaluate(final BytesWritable serializedSketch, final double fraction) {
+    if (serializedSketch == null) { return null; }
+    final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(serializedSketch.getBytes()));
+    return sketch.getQuantile(fraction);
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/GetQuantilesFromSketchUDF.java b/src/main/java/com/yahoo/sketches/hive/kll/GetQuantilesFromSketchUDF.java
new file mode 100644
index 0000000..5bc117c
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/GetQuantilesFromSketchUDF.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.io.BytesWritable;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+@Description(
+  name = "GetQuantiles",
+  value = "_FUNC_(sketch, fractions...)",
+  extended = "Returns quantile values from a given KllFloatsSketch based on a given list of fractions."
+  + " The fractions represent normalized ranks, and must be from 0 to 1 inclusive."
+  + " For example, a fraction of 0.5 corresponds to 50th percentile,"
+  + " which is the median value of the distribution (the number separating the higher"
+  + " half of the probability distribution from the lower half).")
+public class GetQuantilesFromSketchUDF extends UDF {
+
+  /**
+   * Returns a list of quantile values from a given sketch
+   * @param serializedSketch serialized sketch
+   * @param fractions list of values from 0 to 1 inclusive
+   * @return list of quantile values
+   */
+  public List<Float> evaluate(final BytesWritable serializedSketch, final Double... fractions) {
+    if (serializedSketch == null) { return null; }
+    final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(serializedSketch.getBytes()));
+    return Util.primitivesToList(sketch.getQuantiles(Util.objectsToPrimitives(fractions)));
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/GetRankFromSketchUDF.java b/src/main/java/com/yahoo/sketches/hive/kll/GetRankFromSketchUDF.java
new file mode 100644
index 0000000..c8a2707
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/GetRankFromSketchUDF.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.io.BytesWritable;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+@Description(name = "GetRank", value = "_FUNC_(sketch, value)",
+extended = " Returns a normalized rank of a given value from a given KllFloatsSketch."
++ " The returned rank is an approximation to the fraction of values of the distribution"
++ " that are less than the given value (mass of the distribution below the given value).")
+public class GetRankFromSketchUDF extends UDF {
+
+  /**
+   * Returns a normalized rank of a given value from a given sketch
+   * @param serializedSketch serialized sketch
+   * @param value
+   * @return rank
+   */
+  public Double evaluate(final BytesWritable serializedSketch, final float value) {
+    if (serializedSketch == null) { return null; }
+    final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(serializedSketch.getBytes()));
+    return sketch.getRank(value);
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/ObjectInspectorValidator.java b/src/main/java/com/yahoo/sketches/hive/kll/ObjectInspectorValidator.java
new file mode 100644
index 0000000..b82e39a
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/ObjectInspectorValidator.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+
+final class ObjectInspectorValidator {
+
+  static void validateCategoryPrimitive(
+      final ObjectInspector inspector, final int index) throws SemanticException {
+    if (inspector.getCategory() != ObjectInspector.Category.PRIMITIVE) {
+      throw new UDFArgumentTypeException(index, "Primitive argument expected, but "
+          + inspector.getCategory().name() + " was recieved");
+    }
+  }
+
+  static void validateGivenPrimitiveCategory(final ObjectInspector inspector, final int index,
+      final PrimitiveObjectInspector.PrimitiveCategory category) throws SemanticException
+  {
+    validateCategoryPrimitive(inspector, index);
+    final PrimitiveObjectInspector primitiveInspector = (PrimitiveObjectInspector) inspector;
+    if (primitiveInspector.getPrimitiveCategory() != category) {
+      throw new UDFArgumentTypeException(index, category.name() + " value expected as the argument "
+          + (index + 1) + " but " + primitiveInspector.getPrimitiveCategory().name() + " was received");
+    }
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/SketchEvaluator.java b/src/main/java/com/yahoo/sketches/hive/kll/SketchEvaluator.java
new file mode 100644
index 0000000..ba1af04
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/SketchEvaluator.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+abstract class SketchEvaluator extends GenericUDAFEvaluator {
+
+  protected PrimitiveObjectInspector inputInspector_;
+  protected PrimitiveObjectInspector kInspector_;
+
+  @Override
+  public ObjectInspector init(final Mode mode, final ObjectInspector[] parameters) throws HiveException {
+    super.init(mode, parameters);
+    inputInspector_ = (PrimitiveObjectInspector) parameters[0];
+
+    // Parameters:
+    // In PARTIAL1 and COMPLETE mode, the parameters are original data.
+    // In PARTIAL2 and FINAL mode, the parameters are partial aggregations.
+    if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {
+      if (parameters.length > 1) {
+        kInspector_ = (PrimitiveObjectInspector) parameters[1];
+      }
+    }
+
+    return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveCategory.BINARY);
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public void reset(final AggregationBuffer buf) throws HiveException {
+    final SketchState state = (SketchState) buf;
+    state.reset();
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public Object terminatePartial(final AggregationBuffer buf) throws HiveException {
+    return terminate(buf);
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public void merge(final AggregationBuffer buf, final Object data) throws HiveException {
+    if (data == null) { return; }
+    final SketchState state = (SketchState) buf;
+    final BytesWritable serializedSketch =
+        (BytesWritable) inputInspector_.getPrimitiveWritableObject(data);
+    state.update(KllFloatsSketch.heapify(Memory.wrap(serializedSketch.getBytes())));
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public Object terminate(final AggregationBuffer buf) throws HiveException {
+    final SketchState state = (SketchState) buf;
+    final KllFloatsSketch resultSketch = state.getResult();
+    if (resultSketch == null) { return null; }
+    return new BytesWritable(resultSketch.toByteArray());
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+    return new SketchState();
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/SketchState.java b/src/main/java/com/yahoo/sketches/hive/kll/SketchState.java
new file mode 100644
index 0000000..c5feb2d
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/SketchState.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AbstractAggregationBuffer;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+class SketchState extends AbstractAggregationBuffer {
+
+  private KllFloatsSketch state_;
+
+  // initialization is needed in the first phase (iterate) only
+  void init() {
+    state_ = new KllFloatsSketch();
+  }
+
+  void init(final int k) {
+    state_ = new KllFloatsSketch(k);
+  }
+
+  boolean isInitialized() {
+    return state_ != null;
+  }
+
+  void update(final float value) {
+    state_.update(value);
+  }
+
+  void update(final KllFloatsSketch sketch) {
+    if (state_ == null) {
+      state_ = sketch;
+    } else {
+      state_.merge(sketch);
+    }
+  }
+
+  public KllFloatsSketch getResult() {
+    return state_;
+  }
+
+  void reset() {
+    state_ = null;
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/SketchToStringUDF.java b/src/main/java/com/yahoo/sketches/hive/kll/SketchToStringUDF.java
new file mode 100644
index 0000000..6bb8cfb
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/SketchToStringUDF.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.io.BytesWritable;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+@Description(name = "SketchToString", value = "_FUNC_(sketch)",
+extended = " Returns a human-readable summary of a given KllFloatsSketch.")
+public class SketchToStringUDF extends UDF {
+
+  /**
+   * Returns a summary of a given sketch
+   * @param serializedSketch serialized sketch
+   * @return text summary
+   */
+  public String evaluate(final BytesWritable serializedSketch) {
+    if (serializedSketch == null) { return null; }
+    final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(serializedSketch.getBytes()));
+    return sketch.toString();
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/UnionSketchUDAF.java b/src/main/java/com/yahoo/sketches/hive/kll/UnionSketchUDAF.java
new file mode 100644
index 0000000..b3c6b41
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/UnionSketchUDAF.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+
+@Description(name = "UnionSketch", value = "_FUNC_(sketch) - "
+    + "Returns a KllFloatsSketch in a serialized form as a binary blob."
+    + " Input values are also serialized sketches.")
+public class UnionSketchUDAF extends AbstractGenericUDAFResolver {
+
+  @Override
+  public GenericUDAFEvaluator getEvaluator(final GenericUDAFParameterInfo info) throws SemanticException {
+    final ObjectInspector[] inspectors = info.getParameterObjectInspectors();
+    if (inspectors.length != 1 && inspectors.length != 2) {
+      throw new UDFArgumentException("One or two arguments expected");
+    }
+    ObjectInspectorValidator.validateGivenPrimitiveCategory(inspectors[0], 0, PrimitiveCategory.BINARY);
+    if (inspectors.length == 2) {
+      ObjectInspectorValidator.validateGivenPrimitiveCategory(inspectors[1], 1, PrimitiveCategory.INT);
+    }
+    return new UnionEvaluator();
+  }
+
+  static class UnionEvaluator extends SketchEvaluator {
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public void iterate(final AggregationBuffer buf, final Object[] data) throws HiveException {
+      if (data[0] == null) { return; }
+      final SketchState state = (SketchState) buf;
+      if (!state.isInitialized()) {
+        if (kInspector_ != null) {
+          state.init(PrimitiveObjectInspectorUtils.getInt(data[1], kInspector_));
+        } else {
+          state.init();
+        }
+      }
+      merge(buf, data[0]);
+    }
+
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/Util.java b/src/main/java/com/yahoo/sketches/hive/kll/Util.java
new file mode 100644
index 0000000..a7acb14
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/Util.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import java.util.ArrayList;
+import java.util.List;
+
+final class Util {
+
+  static float[] objectsToPrimitives(final Float[] array) {
+    final float[] result = new float[array.length];
+    for (int i = 0; i < array.length; i++) {
+      result[i] = array[i];
+    }
+    return result;
+  }
+
+  static double[] objectsToPrimitives(final Double[] array) {
+    final double[] result = new double[array.length];
+    for (int i = 0; i < array.length; i++) {
+      result[i] = array[i];
+    }
+    return result;
+  }
+
+  static List<Float> primitivesToList(final float[] array) {
+    final List<Float> result = new ArrayList<Float>(array.length);
+    for (float item: array) { result.add(item); }
+    return result;
+  }
+
+  static List<Double> primitivesToList(final double[] array) {
+    final List<Double> result = new ArrayList<Double>(array.length);
+    for (double item: array) { result.add(item); }
+    return result;
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/hive/kll/package-info.java b/src/main/java/com/yahoo/sketches/hive/kll/package-info.java
new file mode 100644
index 0000000..b74f994
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/hive/kll/package-info.java
@@ -0,0 +1,11 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+/**
+ * Hive UDFs for KllFloatsSketch sketch.
+ * 
+ * @author Alexander Saydakov
+ */
+package com.yahoo.sketches.hive.kll;
diff --git a/src/test/java/com/yahoo/sketches/hive/kll/DataToSketchUDAFTest.java b/src/test/java/com/yahoo/sketches/hive/kll/DataToSketchUDAFTest.java
new file mode 100644
index 0000000..94b7c2b
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/hive/kll/DataToSketchUDAFTest.java
@@ -0,0 +1,235 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+public class DataToSketchUDAFTest {
+
+  static final ObjectInspector floatInspector =
+    PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveCategory.FLOAT);
+
+  static final ObjectInspector intInspector =
+      PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveCategory.INT);
+
+  static final ObjectInspector binaryInspector =
+      PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveCategory.BINARY);
+
+  static final ObjectInspector structInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+      Arrays.asList("a"),
+      Arrays.asList(intInspector)
+    );
+
+  @Test(expectedExceptions = UDFArgumentException.class)
+  public void getEvaluatorTooFewInspectors() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    new DataToSketchUDAF().getEvaluator(info);
+  }
+
+  @Test(expectedExceptions = UDFArgumentException.class)
+  public void getEvaluatorTooManyInspectors() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { floatInspector, intInspector, intInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    new DataToSketchUDAF().getEvaluator(info);
+  }
+
+  @Test(expectedExceptions = UDFArgumentException.class)
+  public void getEvaluatorWrongCategoryArg1() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { structInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    new DataToSketchUDAF().getEvaluator(info);
+  }
+
+  @Test(expectedExceptions = UDFArgumentException.class)
+  public void getEvaluatorWrongTypeArg1() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { intInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    new DataToSketchUDAF().getEvaluator(info);
+  }
+
+  @Test(expectedExceptions = UDFArgumentException.class)
+  public void getEvaluatorWrongCategoryArg2() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { floatInspector, structInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    new DataToSketchUDAF().getEvaluator(info);
+  }
+
+  @Test(expectedExceptions = UDFArgumentException.class)
+  public void getEvaluatorWrongTypeArg2() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { floatInspector, floatInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    new DataToSketchUDAF().getEvaluator(info);
+  }
+
+  // PARTIAL1 mode (Map phase in Map-Reduce): iterate + terminatePartial
+  @Test
+  public void partial1ModeDefaultK() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { floatInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    try (GenericUDAFEvaluator eval = new DataToSketchUDAF().getEvaluator(info)) {
+      ObjectInspector resultInspector = eval.init(Mode.PARTIAL1, inspectors);
+      checkResultInspector(resultInspector);
+  
+      SketchState state = (SketchState) eval.getNewAggregationBuffer();
+      eval.iterate(state, new Object[] { new FloatWritable(1) });
+      eval.iterate(state, new Object[] { new FloatWritable(2) });
+  
+      BytesWritable bytes = (BytesWritable) eval.terminatePartial(state);
+      KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+      Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(200, false));
+      Assert.assertEquals(resultSketch.getNumRetained(), 2);
+      Assert.assertEquals(resultSketch.getMinValue(), 1f);
+      Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+    }
+  }
+
+  @Test
+  public void partial1ModeGivenK() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { floatInspector, intInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    try (GenericUDAFEvaluator eval = new DataToSketchUDAF().getEvaluator(info)) {
+      ObjectInspector resultInspector = eval.init(Mode.PARTIAL1, inspectors);
+      checkResultInspector(resultInspector);
+  
+      SketchState state = (SketchState) eval.getNewAggregationBuffer();
+      eval.iterate(state, new Object[] { new FloatWritable(1), new IntWritable(400) });
+      eval.iterate(state, new Object[] { new FloatWritable(2), new IntWritable(400) });
+  
+      BytesWritable bytes = (BytesWritable) eval.terminatePartial(state);
+      KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+      Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(400, false));
+      Assert.assertEquals(resultSketch.getNumRetained(), 2);
+      Assert.assertEquals(resultSketch.getMinValue(), 1f);
+      Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+    }
+  }
+
+  // PARTIAL2 mode (Combine phase in Map-Reduce): merge + terminatePartial
+  @Test
+  public void partial2Mode() throws Exception {
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(new ObjectInspector[] { floatInspector }, false, false);
+    try (GenericUDAFEvaluator eval = new DataToSketchUDAF().getEvaluator(info)) {
+      ObjectInspector resultInspector = eval.init(Mode.PARTIAL2, new ObjectInspector[] {binaryInspector});
+      checkResultInspector(resultInspector);
+  
+      SketchState state = (SketchState) eval.getNewAggregationBuffer();
+  
+      KllFloatsSketch sketch1 = new KllFloatsSketch();
+      sketch1.update(1);
+      eval.merge(state, new BytesWritable(sketch1.toByteArray()));
+  
+      KllFloatsSketch sketch2 = new KllFloatsSketch();
+      sketch2.update(2);
+      eval.merge(state, new BytesWritable(sketch2.toByteArray()));
+  
+      BytesWritable bytes = (BytesWritable) eval.terminate(state);
+      KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+      Assert.assertEquals(resultSketch.getNumRetained(), 2);
+      Assert.assertEquals(resultSketch.getMinValue(), 1f);
+      Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+    }
+  }
+
+  // FINAL mode (Reduce phase in Map-Reduce): merge + terminate
+  @Test
+  public void finalMode() throws Exception {
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(new ObjectInspector[] { floatInspector }, false, false);
+    try (GenericUDAFEvaluator eval = new DataToSketchUDAF().getEvaluator(info)) {
+      ObjectInspector resultInspector = eval.init(Mode.FINAL, new ObjectInspector[] {binaryInspector});
+      checkResultInspector(resultInspector);
+  
+      SketchState state = (SketchState) eval.getNewAggregationBuffer();
+  
+      KllFloatsSketch sketch1 = new KllFloatsSketch(400); // to check if K is preserved
+      sketch1.update(1);
+      eval.merge(state, new BytesWritable(sketch1.toByteArray()));
+  
+      KllFloatsSketch sketch2 = new KllFloatsSketch(400);
+      sketch2.update(2);
+      eval.merge(state, new BytesWritable(sketch2.toByteArray()));
+  
+      BytesWritable bytes = (BytesWritable) eval.terminate(state);
+      KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+      Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(400, false));
+      Assert.assertEquals(resultSketch.getNumRetained(), 2);
+      Assert.assertEquals(resultSketch.getMinValue(), 1f);
+      Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+    }
+  }
+
+  // COMPLETE mode (single mode, alternative to MapReduce): iterate + terminate
+  @Test
+  public void completeModeDefaultK() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { floatInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    try (GenericUDAFEvaluator eval = new DataToSketchUDAF().getEvaluator(info)) {
+      ObjectInspector resultInspector = eval.init(Mode.COMPLETE, inspectors);
+      checkResultInspector(resultInspector);
+  
+      SketchState state = (SketchState) eval.getNewAggregationBuffer();
+      eval.iterate(state, new Object[] { new FloatWritable(1) });
+      eval.iterate(state, new Object[] { new FloatWritable(2) });
+  
+      BytesWritable bytes = (BytesWritable) eval.terminate(state);
+      KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+      Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(200, false));
+      Assert.assertEquals(resultSketch.getNumRetained(), 2);
+      Assert.assertEquals(resultSketch.getMinValue(), 1f);
+      Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+    }
+  }
+
+  @Test
+  public void completeModeGivenK() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { floatInspector, intInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    try (GenericUDAFEvaluator eval = new DataToSketchUDAF().getEvaluator(info)) {
+      ObjectInspector resultInspector = eval.init(Mode.COMPLETE, inspectors);
+      checkResultInspector(resultInspector);
+  
+      SketchState state = (SketchState) eval.getNewAggregationBuffer();
+      eval.iterate(state, new Object[] { new FloatWritable(1), new IntWritable(400) });
+      eval.iterate(state, new Object[] { new FloatWritable(2), new IntWritable(400) });
+  
+      BytesWritable bytes = (BytesWritable) eval.terminate(state);
+      KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+      Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(400, false));
+      Assert.assertEquals(resultSketch.getNumRetained(), 2);
+      Assert.assertEquals(resultSketch.getMinValue(), 1f);
+      Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+    }
+  }
+
+  static void checkResultInspector(ObjectInspector resultInspector) {
+    Assert.assertNotNull(resultInspector);
+    Assert.assertEquals(resultInspector.getCategory(), ObjectInspector.Category.PRIMITIVE);
+    Assert.assertEquals(
+      ((PrimitiveObjectInspector) resultInspector).getPrimitiveCategory(),
+      PrimitiveObjectInspector.PrimitiveCategory.BINARY
+    );
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/hive/kll/GetPmfFromSketchUDFTest.java b/src/test/java/com/yahoo/sketches/hive/kll/GetPmfFromSketchUDFTest.java
new file mode 100644
index 0000000..33b3a00
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/hive/kll/GetPmfFromSketchUDFTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import java.util.List;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+public class GetPmfFromSketchUDFTest {
+
+  @Test
+  public void nullSketch() {
+    List<Double> result = new GetPmfFromSketchUDF().evaluate(null, 0f);
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void emptyListOfSplitPoints() {
+    KllFloatsSketch sketch = new KllFloatsSketch();
+    sketch.update(1);
+    sketch.update(2);
+    sketch.update(3);
+    List<Double> result = new GetPmfFromSketchUDF().evaluate(new BytesWritable(sketch.toByteArray()));
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result.size(), 1);
+    Assert.assertEquals(result.get(0), 1.0);
+  }
+
+  @Test
+  public void emptySketch() {
+    KllFloatsSketch sketch = new KllFloatsSketch();
+    List<Double> result = new GetPmfFromSketchUDF().evaluate(new BytesWritable(sketch.toByteArray()), 0f);
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void normalCase() {
+    KllFloatsSketch sketch = new KllFloatsSketch();
+    sketch.update(1);
+    sketch.update(2);
+    sketch.update(3);
+    sketch.update(4);
+    List<Double> result = new GetPmfFromSketchUDF().evaluate(new BytesWritable(sketch.toByteArray()), 1f, 3f, 5f);
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result.size(), 4);
+    Assert.assertEquals(result.get(0), 0.0);
+    Assert.assertEquals(result.get(1), 0.5);
+    Assert.assertEquals(result.get(2), 0.5);
+    Assert.assertEquals(result.get(3), 0.0);
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/hive/kll/GetQuantileFromSektchUDFTest.java b/src/test/java/com/yahoo/sketches/hive/kll/GetQuantileFromSektchUDFTest.java
new file mode 100644
index 0000000..ee2d1f8
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/hive/kll/GetQuantileFromSektchUDFTest.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+public class GetQuantileFromSektchUDFTest {
+
+  @Test
+  public void nullSketch() {
+    final Float result = new GetQuantileFromSketchUDF().evaluate(null, 0);
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void normalCase() {
+    KllFloatsSketch sketch = new KllFloatsSketch();
+    sketch.update(1);
+    sketch.update(2);
+    sketch.update(3);
+    final Float result = new GetQuantileFromSketchUDF().evaluate(new BytesWritable(sketch.toByteArray()), 0.5);
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result, 2f);
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/hive/kll/GetQuantilesFromSketchUDFTest.java b/src/test/java/com/yahoo/sketches/hive/kll/GetQuantilesFromSketchUDFTest.java
new file mode 100644
index 0000000..08fa45f
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/hive/kll/GetQuantilesFromSketchUDFTest.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import java.util.List;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+public class GetQuantilesFromSketchUDFTest {
+
+  @Test
+  public void nullSketch() {
+    final List<Float> result = new GetQuantilesFromSketchUDF().evaluate(null, 0.0);
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void emptyListOfFractions() {
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    sketch.update(1);
+    sketch.update(2);
+    sketch.update(3);
+    final List<Float> result = new GetQuantilesFromSketchUDF().evaluate(new BytesWritable(sketch.toByteArray()));
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result.size(), 0);
+  }
+
+  @Test
+  public void fractionsNormalCase() {
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    sketch.update(1);
+    sketch.update(2);
+    sketch.update(3);
+    final List<Float> result = new GetQuantilesFromSketchUDF().evaluate(new BytesWritable(sketch.toByteArray()), 0.0, 0.5, 1.0);
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result.size(), 3);
+    Assert.assertEquals(result.get(0), 1f);
+    Assert.assertEquals(result.get(1), 2f);
+    Assert.assertEquals(result.get(2), 3f);
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/hive/kll/GetRankFromSektchUDFTest.java b/src/test/java/com/yahoo/sketches/hive/kll/GetRankFromSektchUDFTest.java
new file mode 100644
index 0000000..30011fa
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/hive/kll/GetRankFromSektchUDFTest.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+public class GetRankFromSektchUDFTest {
+
+  @Test
+  public void nullSketch() {
+    final Double result = new GetRankFromSketchUDF().evaluate(null, 0);
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void normalCase() {
+    KllFloatsSketch sketch = new KllFloatsSketch();
+    sketch.update(1);
+    sketch.update(2);
+    sketch.update(3);
+    sketch.update(4);
+    final Double result = new GetRankFromSketchUDF().evaluate(new BytesWritable(sketch.toByteArray()), 3f);
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result, 0.5);
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/hive/kll/SektchToStringUDFTest.java b/src/test/java/com/yahoo/sketches/hive/kll/SektchToStringUDFTest.java
new file mode 100644
index 0000000..e2c383e
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/hive/kll/SektchToStringUDFTest.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+public class SektchToStringUDFTest {
+
+  @Test
+  public void nullSketch() {
+    final String result = new SketchToStringUDF().evaluate(null);
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void normalCase() {
+    KllFloatsSketch sketch = new KllFloatsSketch();
+    sketch.update(1);
+    sketch.update(2);
+    sketch.update(3);
+    sketch.update(4);
+    final String result = new SketchToStringUDF().evaluate(new BytesWritable(sketch.toByteArray()));
+    Assert.assertNotNull(result);
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/hive/kll/UnionSketchUDAFTest.java b/src/test/java/com/yahoo/sketches/hive/kll/UnionSketchUDAFTest.java
new file mode 100644
index 0000000..6552389
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/hive/kll/UnionSketchUDAFTest.java
@@ -0,0 +1,225 @@
+/*
+ * Copyright 2018, Oath Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.hive.kll;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+public class UnionSketchUDAFTest {
+
+  static final ObjectInspector binaryInspector =
+    PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveCategory.BINARY);
+
+  static final  ObjectInspector intInspector =
+      PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveCategory.INT);
+
+  static final ObjectInspector structInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+    Arrays.asList("a"),
+    Arrays.asList(binaryInspector)
+  );
+
+  @Test(expectedExceptions = UDFArgumentException.class)
+  public void getEvaluatorTooFewInspectors() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    new UnionSketchUDAF().getEvaluator(info);
+  }
+
+  @Test(expectedExceptions = UDFArgumentException.class)
+  public void getEvaluatorTooManyInspectors() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { binaryInspector, binaryInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    new UnionSketchUDAF().getEvaluator(info);
+  }
+
+  @Test(expectedExceptions = UDFArgumentTypeException.class)
+  public void getEvaluatorWrongCategoryArg1() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { structInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    new UnionSketchUDAF().getEvaluator(info);
+  }
+
+  @Test(expectedExceptions = UDFArgumentTypeException.class)
+  public void getEvaluatorWrongTypeArg1() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { intInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    new UnionSketchUDAF().getEvaluator(info);
+  }
+
+  @Test(expectedExceptions = UDFArgumentTypeException.class)
+  public void getEvaluatorWrongCategoryArg2() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { binaryInspector, structInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    new UnionSketchUDAF().getEvaluator(info);
+  }
+
+  @Test(expectedExceptions = UDFArgumentTypeException.class)
+  public void getEvaluatorWrongTypeArg2() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { binaryInspector, binaryInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    new UnionSketchUDAF().getEvaluator(info);
+  }
+
+  // PARTIAL1 mode (Map phase in Map-Reduce): iterate + terminatePartial
+  @Test
+  public void partia1ModelDefaultKDowsizeInput() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { binaryInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    try (GenericUDAFEvaluator eval = new UnionSketchUDAF().getEvaluator(info)) {
+      ObjectInspector resultInspector = eval.init(Mode.PARTIAL1, inspectors);
+      DataToSketchUDAFTest.checkResultInspector(resultInspector);
+  
+      SketchState state = (SketchState) eval.getNewAggregationBuffer();
+  
+      KllFloatsSketch sketch1 = new KllFloatsSketch(400);
+      sketch1.update(1);
+      eval.iterate(state, new Object[] { new BytesWritable(sketch1.toByteArray()) });
+  
+      KllFloatsSketch sketch2 = new KllFloatsSketch(400);
+      sketch2.update(2);
+      eval.iterate(state, new Object[] { new BytesWritable(sketch2.toByteArray()) });
+  
+      BytesWritable bytes = (BytesWritable) eval.terminatePartial(state);
+      KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+      Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(200, false));
+      Assert.assertEquals(resultSketch.getNumRetained(), 2);
+      Assert.assertEquals(resultSketch.getMinValue(), 1f);
+      Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+    }
+  }
+
+  @Test
+  public void partia1ModelGivenK() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { binaryInspector, intInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    try (GenericUDAFEvaluator eval = new UnionSketchUDAF().getEvaluator(info)) {
+      ObjectInspector resultInspector = eval.init(Mode.PARTIAL1, inspectors);
+      DataToSketchUDAFTest.checkResultInspector(resultInspector);
+  
+      SketchState state = (SketchState) eval.getNewAggregationBuffer();
+  
+      KllFloatsSketch sketch1 = new KllFloatsSketch(400);
+      sketch1.update(1);
+      eval.iterate(state, new Object[] { new BytesWritable(sketch1.toByteArray()), new IntWritable(400) });
+  
+      KllFloatsSketch sketch2 = new KllFloatsSketch(400);
+      sketch2.update(2);
+      eval.iterate(state, new Object[] { new BytesWritable(sketch2.toByteArray()), new IntWritable(400) });
+  
+      BytesWritable bytes = (BytesWritable) eval.terminatePartial(state);
+      KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+      Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(400, false));
+      Assert.assertEquals(resultSketch.getNumRetained(), 2);
+      Assert.assertEquals(resultSketch.getMinValue(), 1f);
+      Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+    }
+  }
+
+  // PARTIAL2 mode (Combine phase in Map-Reduce): merge + terminatePartial
+  @Test
+  public void partial2Mode() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { binaryInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    try (GenericUDAFEvaluator eval = new UnionSketchUDAF().getEvaluator(info)) {
+      ObjectInspector resultInspector = eval.init(Mode.PARTIAL2, inspectors);
+      DataToSketchUDAFTest.checkResultInspector(resultInspector);
+  
+      SketchState state = (SketchState) eval.getNewAggregationBuffer();
+  
+      KllFloatsSketch sketch1 = new KllFloatsSketch(400);
+      sketch1.update(1);
+      eval.merge(state, new BytesWritable(sketch1.toByteArray()));
+  
+      KllFloatsSketch sketch2 = new KllFloatsSketch(400);
+      sketch2.update(2);
+      eval.merge(state, new BytesWritable(sketch2.toByteArray()));
+  
+      BytesWritable bytes = (BytesWritable) eval.terminatePartial(state);
+      KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+      Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(400, false));
+      Assert.assertEquals(resultSketch.getNumRetained(), 2);
+      Assert.assertEquals(resultSketch.getMinValue(), 1f);
+      Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+    }
+  }
+
+  // FINAL mode (Reduce phase in Map-Reduce): merge + terminate
+  @Test
+  public void finalMode() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { binaryInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    try (GenericUDAFEvaluator eval = new UnionSketchUDAF().getEvaluator(info)) {
+      ObjectInspector resultInspector = eval.init(Mode.FINAL, inspectors);
+      DataToSketchUDAFTest.checkResultInspector(resultInspector);
+  
+      SketchState state = (SketchState) eval.getNewAggregationBuffer();
+  
+      KllFloatsSketch sketch1 = new KllFloatsSketch(400);
+      sketch1.update(1);
+      eval.merge(state, new BytesWritable(sketch1.toByteArray()));
+  
+      KllFloatsSketch sketch2 = new KllFloatsSketch(400);
+      sketch2.update(2);
+      eval.merge(state, new BytesWritable(sketch2.toByteArray()));
+  
+      BytesWritable bytes = (BytesWritable) eval.terminate(state);
+      KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+      Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(400, false));
+      Assert.assertEquals(resultSketch.getNumRetained(), 2);
+      Assert.assertEquals(resultSketch.getMinValue(), 1f);
+      Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+    }
+  }
+
+  // COMPLETE mode (single mode, alternative to MapReduce): iterate + terminate
+  @Test
+  public void completeModelDefaultK() throws Exception {
+    ObjectInspector[] inspectors = new ObjectInspector[] { binaryInspector };
+    GenericUDAFParameterInfo info = new SimpleGenericUDAFParameterInfo(inspectors, false, false);
+    try (GenericUDAFEvaluator eval = new UnionSketchUDAF().getEvaluator(info)) {
+      ObjectInspector resultInspector = eval.init(Mode.COMPLETE, inspectors);
+      DataToSketchUDAFTest.checkResultInspector(resultInspector);
+  
+      SketchState state = (SketchState) eval.getNewAggregationBuffer();
+  
+      KllFloatsSketch sketch1 = new KllFloatsSketch();
+      sketch1.update(1);
+      eval.iterate(state, new Object[] { new BytesWritable(sketch1.toByteArray()) });
+  
+      KllFloatsSketch sketch2 = new KllFloatsSketch();
+      sketch2.update(2);
+      eval.iterate(state, new Object[] { new BytesWritable(sketch2.toByteArray()) });
+  
+      BytesWritable bytes = (BytesWritable) eval.terminatePartial(state);
+      KllFloatsSketch resultSketch = KllFloatsSketch.heapify(Memory.wrap(bytes.getBytes()));
+      Assert.assertEquals(resultSketch.getNormalizedRankError(false), KllFloatsSketch.getNormalizedRankError(200, false));
+      Assert.assertEquals(resultSketch.getNumRetained(), 2);
+      Assert.assertEquals(resultSketch.getMinValue(), 1f);
+      Assert.assertEquals(resultSketch.getMaxValue(), 2f);
+  
+      eval.reset(state);
+      Assert.assertNull(eval.terminate(state));
+    }
+  }
+
+}