Merge pull request #43 from DataSketches/hll-udfs

HLL sketch UDFs
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/AlgebraicFinal.java b/src/main/java/com/yahoo/sketches/pig/hll/AlgebraicFinal.java
new file mode 100644
index 0000000..edb84b7
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/AlgebraicFinal.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.hll.HllSketch;
+import com.yahoo.sketches.hll.TgtHllType;
+import com.yahoo.sketches.hll.Union;
+
+/**
+ * Class used to calculate the final pass of an <i>Algebraic</i> sketch
+ * operation. It will receive a bag of values returned by either the <i>Intermediate</i>
+ * stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
+ * interpret both types.
+ * 
+ * @author Alexander Saydakov
+ */
+abstract class AlgebraicFinal extends EvalFunc<DataByteArray> {
+
+  private final int lgK_;
+  private final TgtHllType tgtHllType_;
+  private DataByteArray emptySketch_; // this is to cash an empty sketch tuple
+  private boolean isFirstCall_ = true; // for logging
+
+  /**
+   * Constructor with primitives for the final passes of an Algebraic function.
+   *
+   * @param lgK parameter controlling the sketch size and accuracy
+   * @param tgtHllType HLL type of the resulting sketch
+   */
+  public AlgebraicFinal(final int lgK, final TgtHllType tgtHllType) {
+    lgK_ = lgK;
+    tgtHllType_ = tgtHllType;
+  }
+
+  @Override
+  public DataByteArray exec(final Tuple inputTuple) throws IOException {
+    if (isFirstCall_) {
+      Logger.getLogger(getClass()).info("Algebraic was used");
+      isFirstCall_ = false;
+    }
+    if (inputTuple == null || inputTuple.size() == 0) {
+      return getEmptySketch();
+    }
+    final Union union = new Union(lgK_);
+    final DataBag outerBag = (DataBag) inputTuple.get(0);
+    if (outerBag == null) {
+      return getEmptySketch();
+    }
+    for (final Tuple dataTuple: outerBag) {
+      final Object f0 = dataTuple.get(0); // inputTuple.bag0.dataTupleN.f0
+      if (f0 == null) {
+        continue;
+      }
+      if (f0 instanceof DataBag) {
+        final DataBag innerBag = (DataBag) f0; // inputTuple.bag0.dataTupleN.f0:bag
+        if (innerBag.size() == 0) { continue; }
+        // If field 0 of a dataTuple is a Bag, all innerTuples of this inner bag
+        // will be passed into the union.
+        // It is due to system bagged outputs from multiple mapper Initial functions.
+        // The Intermediate stage was bypassed.
+        updateUnion(innerBag, union);
+      } else if (f0 instanceof DataByteArray) { // inputTuple.bag0.dataTupleN.f0:DBA
+        // If field 0 of a dataTuple is a DataByteArray, we assume it is a sketch
+        // due to system bagged outputs from multiple mapper Intermediate functions.
+        // Each dataTuple.DBA:sketch will merged into the union.
+        final DataByteArray dba = (DataByteArray) f0;
+        union.update(HllSketch.heapify(Memory.wrap(dba.get())));
+      } else { // we should never get here
+        throw new IllegalArgumentException("dataTuple.Field0 is not a DataBag or DataByteArray: "
+            + f0.getClass().getName());
+      }
+    }
+    return new DataByteArray(union.getResult(tgtHllType_).toCompactByteArray());
+  }
+
+  abstract void updateUnion(DataBag bag, Union union) throws ExecException;
+
+  private DataByteArray getEmptySketch() {
+    if (emptySketch_ == null) {
+      emptySketch_ = new DataByteArray(
+          new HllSketch(lgK_, tgtHllType_).toCompactByteArray());
+    }
+    return emptySketch_;
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/AlgebraicInitial.java b/src/main/java/com/yahoo/sketches/pig/hll/AlgebraicInitial.java
new file mode 100644
index 0000000..9cf33f6
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/AlgebraicInitial.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Class used to calculate the initial pass of an Algebraic sketch operation.
+ *
+ * <p>The Initial class simply passes through all records unchanged so that they can be
+ * processed by the intermediate processor instead.
+ * 
+ * @author Alexander Saydakov
+ */
+public class AlgebraicInitial extends EvalFunc<Tuple> {
+
+  private boolean isFirstCall_ = true;
+
+  /**
+   * Default constructor for the initial pass of an Algebraic function.
+   */
+  public AlgebraicInitial() {}
+
+  /**
+   * Constructor for the initial pass of an Algebraic function. Pig will call this and pass the
+   * same constructor arguments as the original UDF. In this case the arguments are ignored.
+   *
+   * @param lgK in a form of a String
+   */
+  public AlgebraicInitial(final String lgK) {}
+
+  /**
+   * Constructor for the initial pass of an Algebraic function. Pig will call this and pass the
+   * same constructor arguments as the original UDF. In this case the arguments are ignored.
+   *
+   * @param lgK in a form of a String
+   * @param tgtHllType in a form of a String
+   */
+  public AlgebraicInitial(final String lgK, final String tgtHllType) {}
+
+  @Override
+  public Tuple exec(final Tuple inputTuple) throws IOException {
+    if (isFirstCall_) {
+      Logger.getLogger(getClass()).info("Algebraic was used");
+      isFirstCall_ = false;
+    }
+    return inputTuple;
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/AlgebraicIntermediate.java b/src/main/java/com/yahoo/sketches/pig/hll/AlgebraicIntermediate.java
new file mode 100644
index 0000000..48b49aa
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/AlgebraicIntermediate.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.hll.HllSketch;
+import com.yahoo.sketches.hll.TgtHllType;
+import com.yahoo.sketches.hll.Union;
+
+/**
+ * Class used to calculate the intermediate combiner pass of an <i>Algebraic</i> sketch
+ * operation. This is called from the combiner, and may be called multiple times (from a mapper
+ * and from a reducer). It will receive a bag of values returned by either the <i>Intermediate</i>
+ * stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
+ * interpret both types.
+ * 
+ * @author Alexander Saydakov
+ */
+abstract class AlgebraicIntermediate extends EvalFunc<Tuple> {
+
+  private static final TupleFactory tupleFactory_ = TupleFactory.getInstance();
+
+  private final int lgK_;
+  private final TgtHllType tgtHllType_;
+  private Tuple emptySketchTuple_; // this is to cash an empty sketch tuple
+  private boolean isFirstCall_ = true; // for logging
+
+  /**
+   * Constructor with primitives for the intermediate pass of an Algebraic function.
+   *
+   * @param lgK parameter controlling the sketch size and accuracy
+   * @param tgtHllType HLL type of the resulting sketch
+   */
+  public AlgebraicIntermediate(final int lgK, final TgtHllType tgtHllType) {
+    lgK_ = lgK;
+    tgtHllType_ = tgtHllType;
+  }
+
+  @Override
+  public Tuple exec(final Tuple inputTuple) throws IOException {
+    if (isFirstCall_) {
+      Logger.getLogger(getClass()).info("Algebraic was used");
+      isFirstCall_ = false;
+    }
+    if (inputTuple == null || inputTuple.size() == 0) {
+      return getEmptySketchTuple();
+    }
+    final DataBag outerBag = (DataBag) inputTuple.get(0);
+    if (outerBag == null) {
+      return getEmptySketchTuple();
+    }
+    final Union union = new Union(lgK_);
+    for (final Tuple dataTuple: outerBag) {
+      final Object f0 = dataTuple.get(0); // inputTuple.bag0.dataTupleN.f0
+      if (f0 == null) { continue; }
+      if (f0 instanceof DataBag) {
+        final DataBag innerBag = (DataBag) f0; // inputTuple.bag0.dataTupleN.f0:bag
+        if (innerBag.size() == 0) { continue; }
+        // If field 0 of a dataTuple is a Bag, all innerTuples of this inner bag
+        // will be passed into the union.
+        // It is due to system bagged outputs from multiple mapper Initial functions.
+        // The Intermediate stage was bypassed.
+        updateUnion(innerBag, union);
+      } else if (f0 instanceof DataByteArray) { // inputTuple.bag0.dataTupleN.f0:DBA
+        // If field 0 of a dataTuple is a DataByteArray, we assume it is a sketch
+        // due to system bagged outputs from multiple mapper Intermediate functions.
+        // Each dataTuple.DBA:sketch will merged into the union.
+        final DataByteArray dba = (DataByteArray) f0;
+        union.update(HllSketch.heapify(Memory.wrap(dba.get())));
+      } else { // we should never get here
+        throw new IllegalArgumentException("dataTuple.Field0 is not a DataBag or DataByteArray: "
+            + f0.getClass().getName());
+      }
+    }
+    return tupleFactory_.newTuple(new DataByteArray(union.getResult(tgtHllType_).toCompactByteArray()));
+  }
+
+  abstract void updateUnion(DataBag bag, Union union) throws ExecException;
+
+  private Tuple getEmptySketchTuple() {
+    if (emptySketchTuple_ == null) {
+      emptySketchTuple_ = tupleFactory_.newTuple(new DataByteArray(
+          new HllSketch(lgK_, tgtHllType_).toCompactByteArray()));
+    }
+    return emptySketchTuple_;
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/DataToSketch.java b/src/main/java/com/yahoo/sketches/pig/hll/DataToSketch.java
new file mode 100644
index 0000000..8944c51
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/DataToSketch.java
@@ -0,0 +1,236 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.hll.HllSketch;
+import com.yahoo.sketches.hll.TgtHllType;
+import com.yahoo.sketches.hll.Union;
+
+/**
+ * This is a Pig UDF that builds Sketches from data.
+ * This class implements both the <i>Accumulator</i> and <i>Algebraic</i> interfaces.
+ *
+ * @author Alexander Saydakov
+ */
+public class DataToSketch extends EvalFunc<DataByteArray> implements Accumulator<DataByteArray>, Algebraic {
+
+  static final int DEFAULT_LG_K = 12;
+  static final TgtHllType DEFAULT_HLL_TYPE = TgtHllType.HLL_4;
+
+  private DataByteArray emptySketch_; // this is to cash an empty sketch
+
+  private final int lgK_;
+  private final TgtHllType tgtHllType_;
+  private Union accumUnion_;
+  private boolean isFirstCall_;
+
+  /**
+   * Constructor with default lgK and target HLL type
+   */
+  public DataToSketch() {
+    this(DEFAULT_LG_K, DEFAULT_HLL_TYPE);
+  }
+
+  /**
+   * Constructor with given lgK as string and default target HLL type.
+   *
+   * @param lgK in a form of a String
+   */
+  public DataToSketch(final String lgK) {
+    this(Integer.parseInt(lgK), DEFAULT_HLL_TYPE);
+  }
+
+  /**
+   * Constructor with given lgK and target HLL type as strings
+   *
+   * @param lgK in a form of a String
+   * @param tgtHllType in a form of a String
+   */
+  public DataToSketch(final String lgK, final String tgtHllType) {
+    this(Integer.parseInt(lgK), TgtHllType.valueOf(tgtHllType));
+  }
+
+  /**
+   * Base constructor.
+   *
+   * @param lgK parameter controlling the sketch size and accuracy
+   * @param tgtHllType HLL type of the resulting sketch
+   */
+  public DataToSketch(final int lgK, final TgtHllType tgtHllType) {
+    super();
+    lgK_ = lgK;
+    tgtHllType_ = tgtHllType;
+  }
+
+  /**
+   * Top-level exec function.
+   * This method accepts an input Tuple containing a Bag of one or more inner <b>Datum Tuples</b>
+   * and returns a single serialized HllSketch as a DataByteArray.
+   *
+   * <b>Datum Tuple</b> is a Tuple containing a single field, which can be one of the following
+   * (Java type: Pig type):
+   * <ul>
+   *   <li>Byte: BYTE</li>
+   *   <li>Integer: INTEGER</li>
+   *   <li>Long: LONG</li>
+   *   <li>Float: FLOAT</li>
+   *   <li>Double: DOUBLE</li>
+   *   <li>String: CHARARRAY</li>
+   *   <li>DataByteArray: BYTEARRAY</li>
+   * </ul>
+   *
+   * @param inputTuple A tuple containing a single bag, containing Datum Tuples.
+   * @return serialized HllSketch
+   * @see "org.apache.pig.EvalFunc.exec(org.apache.pig.data.Tuple)"
+   * @throws IOException from Pig.
+   */
+
+  @Override
+  public DataByteArray exec(final Tuple inputTuple) throws IOException {
+    if (isFirstCall_) {
+      Logger.getLogger(getClass()).info("Exec was used");
+      isFirstCall_ = false;
+    }
+    if (inputTuple == null || inputTuple.size() == 0) {
+      if (emptySketch_ == null) {
+        emptySketch_ = new DataByteArray(new HllSketch(lgK_, tgtHllType_).toCompactByteArray());
+      }
+      return emptySketch_;
+    }
+    final Union union = new Union(lgK_);
+    final DataBag bag = (DataBag) inputTuple.get(0);
+    updateUnion(bag, union);
+    return new DataByteArray(union.getResult(tgtHllType_).toCompactByteArray());
+  }
+
+  /**
+   * An <i>Accumulator</i> version of the standard <i>exec()</i> method. Like <i>exec()</i>,
+   * accumulator is called with a bag of Datum Tuples. Unlike <i>exec()</i>, it doesn't serialize the
+   * result at the end. Instead, it can be called multiple times, each time with another bag of
+   * Datum Tuples to be input to the sketch.
+   *
+   * @param inputTuple A tuple containing a single bag, containing Datum Tuples.
+   * @see #exec
+   * @see "org.apache.pig.Accumulator.accumulate(org.apache.pig.data.Tuple)"
+   * @throws IOException by Pig
+   */
+  @Override
+  public void accumulate(final Tuple inputTuple) throws IOException {
+    if (isFirstCall_) {
+      Logger.getLogger(getClass()).info("Accumulator was used");
+      isFirstCall_ = false;
+    }
+    if (inputTuple == null || inputTuple.size() == 0) { return; }
+    final DataBag bag = (DataBag) inputTuple.get(0);
+    if (bag == null) { return; }
+    if (accumUnion_ == null) {
+      accumUnion_ = new Union(lgK_);
+    }
+    updateUnion(bag, accumUnion_);
+  }
+
+  /**
+   * Returns the sketch that has been built up by multiple calls to {@link #accumulate}.
+   *
+   * @return serialized HllSketch
+   * @see "org.apache.pig.Accumulator.getValue()"
+   */
+  @Override
+  public DataByteArray getValue() {
+    if (accumUnion_ == null) {
+      if (emptySketch_ == null) {
+        emptySketch_ = new DataByteArray(new HllSketch(lgK_, tgtHllType_).toCompactByteArray());
+      }
+      return emptySketch_;
+    }
+    return new DataByteArray(accumUnion_.getResult(tgtHllType_).toCompactByteArray());
+  }
+
+  /**
+   * Cleans up the UDF state after being called using the {@link Accumulator} interface.
+   *
+   * @see "org.apache.pig.Accumulator.cleanup()"
+   */
+  @Override
+  public void cleanup() {
+    accumUnion_ = null;
+  }
+
+  @Override
+  public String getInitial() {
+    return AlgebraicInitial.class.getName();
+  }
+
+  @Override
+  public String getIntermed() {
+    return DataToSketchAlgebraicIntermediate.class.getName();
+  }
+
+  @Override
+  public String getFinal() {
+    return DataToSketchAlgebraicFinal.class.getName();
+  }
+
+  static void updateUnion(final DataBag bag, final Union union) throws ExecException {
+    //Bag is not empty. process each innerTuple in the bag
+    for (final Tuple innerTuple: bag) {
+      final Object f0 = innerTuple.get(0); // consider only field 0
+      if (f0 == null) {
+        continue;
+      }
+      final byte type = innerTuple.getType(0);
+
+      switch (type) {
+        case DataType.NULL:
+          break;
+        case DataType.BYTE:
+          union.update((byte) f0);
+          break;
+        case DataType.INTEGER:
+          union.update((int) f0);
+          break;
+        case DataType.LONG:
+          union.update((long) f0);
+          break;
+        case DataType.FLOAT:
+          union.update((float) f0);
+          break;
+        case DataType.DOUBLE:
+          union.update((double) f0);
+          break;
+        case DataType.BYTEARRAY: {
+          final DataByteArray dba = (DataByteArray) f0;
+          union.update(dba.get());
+          break;
+        }
+        case DataType.CHARARRAY: {
+          final String str = (String) f0;
+          // conversion to char[] avoids costly UTF-8 encoding
+          union.update(str.toCharArray());
+          break;
+        }
+        default:
+          throw new IllegalArgumentException("Field 0 of innerTuple must be one of "
+              + "NULL, BYTE, INTEGER, LONG, FLOAT, DOUBLE, BYTEARRAY or CHARARRAY. "
+              + "Given Type = " + DataType.findTypeName(type)
+              + ", Object = " + f0.toString());
+      }
+    }
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/DataToSketchAlgebraicFinal.java b/src/main/java/com/yahoo/sketches/pig/hll/DataToSketchAlgebraicFinal.java
new file mode 100644
index 0000000..f34daeb
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/DataToSketchAlgebraicFinal.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_HLL_TYPE;
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_LG_K;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+
+import com.yahoo.sketches.hll.TgtHllType;
+import com.yahoo.sketches.hll.Union;
+
+public class DataToSketchAlgebraicFinal extends AlgebraicFinal {
+
+  /**
+   * Default constructor for the final pass of an Algebraic function.
+   * Assumes default lgK and target HLL type.
+   */
+  public DataToSketchAlgebraicFinal() {
+    super(DEFAULT_LG_K, DEFAULT_HLL_TYPE);
+  }
+
+  /**
+   * Constructor for the final pass of an Algebraic function. Pig will call
+   * this and pass the same constructor arguments as the base UDF.
+   * Assumes default HLL target type.
+   *
+   * @param lgK in a form of a String
+   */
+  public DataToSketchAlgebraicFinal(final String lgK) {
+    super(Integer.parseInt(lgK), DEFAULT_HLL_TYPE);
+  }
+
+  /**
+   * Constructor for the final pass of an Algebraic function. Pig will call
+   * this and pass the same constructor arguments as the base UDF.
+   * 
+   * @param lgK parameter controlling the sketch size and accuracy
+   * @param tgtHllType HLL type of the resulting sketch
+   */
+  public DataToSketchAlgebraicFinal(final String lgK, final String tgtHllType) {
+    super(Integer.parseInt(lgK), TgtHllType.valueOf(tgtHllType));
+  }
+
+  @Override
+  void updateUnion(final DataBag bag, final Union union) throws ExecException {
+    DataToSketch.updateUnion(bag, union);
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/DataToSketchAlgebraicIntermediate.java b/src/main/java/com/yahoo/sketches/pig/hll/DataToSketchAlgebraicIntermediate.java
new file mode 100644
index 0000000..2fd9cdb
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/DataToSketchAlgebraicIntermediate.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_HLL_TYPE;
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_LG_K;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+
+import com.yahoo.sketches.hll.TgtHllType;
+import com.yahoo.sketches.hll.Union;
+
+public class DataToSketchAlgebraicIntermediate extends AlgebraicIntermediate {
+
+  /**
+   * Default constructor for the intermediate pass of an Algebraic function.
+   * Assumes default lgK and target HLL type.
+   */
+  public DataToSketchAlgebraicIntermediate() {
+    super(DEFAULT_LG_K, DEFAULT_HLL_TYPE);
+  }
+
+  /**
+   * Constructor for the intermediate pass of an Algebraic function. Pig will call
+   * this and pass the same constructor arguments as the base UDF.
+   * Assumes default HLL target type.
+   *
+   * @param lgK in a form of a String
+   */
+  public DataToSketchAlgebraicIntermediate(final String lgK) {
+    super(Integer.parseInt(lgK), DEFAULT_HLL_TYPE);
+  }
+
+  /**
+   * Constructor for the intermediate pass of an Algebraic function. Pig will call
+   * this and pass the same constructor arguments as the base UDF.
+   * 
+   * @param lgK parameter controlling the sketch size and accuracy
+   * @param tgtHllType HLL type of the resulting sketch
+   */
+  public DataToSketchAlgebraicIntermediate(final String lgK, final String tgtHllType) {
+    super(Integer.parseInt(lgK), TgtHllType.valueOf(tgtHllType));
+  }
+
+  @Override
+  void updateUnion(final DataBag bag, final Union union) throws ExecException {
+    DataToSketch.updateUnion(bag, union);
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/SketchToEstimate.java b/src/main/java/com/yahoo/sketches/pig/hll/SketchToEstimate.java
new file mode 100644
index 0000000..1e4a3c7
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/SketchToEstimate.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.hll.HllSketch;
+
+/**
+ * This is a User Defined Function (UDF) for getting a unique count estimate from an HllSketch
+ *
+ * @author Alexander Saydakov
+ */
+public class SketchToEstimate extends EvalFunc<Double> {
+
+  @Override
+  public Double exec(final Tuple sketchTuple) throws IOException {
+    if ((sketchTuple == null) || (sketchTuple.size() == 0)) {
+      return null;
+    }
+    final DataByteArray dba = (DataByteArray) sketchTuple.get(0);
+    final HllSketch sketch = HllSketch.heapify(dba.get());
+    return sketch.getEstimate();
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/SketchToEstimateAndErrorBounds.java b/src/main/java/com/yahoo/sketches/pig/hll/SketchToEstimateAndErrorBounds.java
new file mode 100644
index 0000000..798cea2
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/SketchToEstimateAndErrorBounds.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import com.yahoo.sketches.hll.HllSketch;
+
+/**
+ * This is a User Defined Function (UDF) for obtaining the unique count estimate
+ * along with a lower and upper bound from an HllSketch.
+ *
+ * <p>The result is a tuple with three double values: estimate, lower bound and upper bound.
+ * The bounds are given at 95.5% confidence.
+ *
+ * @author Alexander Saydakov
+ */
+public class SketchToEstimateAndErrorBounds extends EvalFunc<Tuple> {
+
+  @Override
+  public Tuple exec(final Tuple sketchTuple) throws IOException {
+    if ((sketchTuple == null) || (sketchTuple.size() == 0)) {
+      return null;
+    }
+    final DataByteArray dba = (DataByteArray) sketchTuple.get(0);
+    final HllSketch sketch = HllSketch.heapify(dba.get());
+    final Tuple outputTuple = TupleFactory.getInstance().newTuple(3);
+    outputTuple.set(0, Double.valueOf(sketch.getEstimate()));
+    outputTuple.set(1, Double.valueOf(sketch.getLowerBound(2)));
+    outputTuple.set(2, Double.valueOf(sketch.getUpperBound(2)));
+    return outputTuple;
+  }
+
+  /**
+   * The output is a Sketch Result Tuple Schema.
+   */
+  @Override
+  public Schema outputSchema(final Schema input) {
+    if (input == null) { return null; }
+    try {
+      final Schema tupleSchema = new Schema();
+      tupleSchema.add(new Schema.FieldSchema("Estimate", DataType.DOUBLE));
+      tupleSchema.add(new Schema.FieldSchema("LowerBound", DataType.DOUBLE));
+      tupleSchema.add(new Schema.FieldSchema("UpperBound", DataType.DOUBLE));
+      return new Schema(new Schema.FieldSchema(getSchemaName(this
+          .getClass().getName().toLowerCase(), input), tupleSchema, DataType.TUPLE));
+    } catch (final FrontendException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/SketchToString.java b/src/main/java/com/yahoo/sketches/pig/hll/SketchToString.java
new file mode 100644
index 0000000..1c6c9de
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/SketchToString.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.hll.HllSketch;
+
+/**
+ * This is a User Defined Function (UDF) for "pretty printing" the summary of an HllSketch
+ *
+ * @author Alexander Saydakov
+ */
+public class SketchToString extends EvalFunc<String> {
+
+  private final boolean hllDetail_;
+  private final boolean auxDetail_;
+
+  /**
+   * Prints only the sketch summary.
+   */
+  public SketchToString() {
+    this(false, false);
+  }
+
+  /**
+   * Prints the summary and details based on given input parameters.
+   *
+   * @param hllDetail flag to print the HLL sketch detail
+   * @param auxDetail flag to print the auxiliary detail
+   */
+  public SketchToString(final String hllDetail, final String auxDetail) {
+    this(Boolean.parseBoolean(hllDetail), Boolean.parseBoolean(auxDetail));
+  }
+
+  /**
+   * Internal constructor with primitive parameters.
+   *
+   * @param hllDetail flag to print the HLL sketch detail
+   * @param auxDetail flag to print the auxiliary detail
+   */
+  private SketchToString(final boolean hllDetail, final boolean auxDetail) {
+    super();
+    hllDetail_ = hllDetail;
+    auxDetail_ = auxDetail;
+  }
+
+  @Override
+  public String exec(final Tuple sketchTuple) throws IOException {
+    if ((sketchTuple == null) || (sketchTuple.size() == 0)) {
+      return null;
+    }
+    final DataByteArray dba = (DataByteArray) sketchTuple.get(0);
+    final HllSketch sketch = HllSketch.heapify(dba.get());
+    return sketch.toString(true, hllDetail_, auxDetail_);
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/UnionSketch.java b/src/main/java/com/yahoo/sketches/pig/hll/UnionSketch.java
new file mode 100644
index 0000000..814c17b
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/UnionSketch.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_HLL_TYPE;
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_LG_K;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.hll.HllSketch;
+import com.yahoo.sketches.hll.TgtHllType;
+import com.yahoo.sketches.hll.Union;
+
+/**
+ * This is a Pig UDF that performs the Union operation on HllSketches.
+ * This class implements both the <i>Accumulator</i> and <i>Algebraic</i> interfaces.
+ *
+ * @author Alexander Saydakov
+ */
+public class UnionSketch extends EvalFunc<DataByteArray> implements Accumulator<DataByteArray>, Algebraic {
+
+  private DataByteArray emptySketch_; // this is to cash an empty sketch
+
+  private final int lgK_;
+  private final TgtHllType tgtHllType_;
+  private Union accumUnion_;
+  private boolean isFirstCall_;
+
+  /**
+   * Constructor with default lgK and target HLL type
+   */
+  public UnionSketch() {
+    this(DEFAULT_LG_K, DEFAULT_HLL_TYPE);
+  }
+
+  /**
+   * Constructor with given lgK as string and default target HLL type.
+   *
+   * @param lgK in a form of a String
+   */
+  public UnionSketch(final String lgK) {
+    this(Integer.parseInt(lgK), DEFAULT_HLL_TYPE);
+  }
+
+  /**
+   * Constructor with given lgK and target HLL type as strings
+   *
+   * @param lgK in a form of a String
+   * @param tgtHllType in a form of a String
+   */
+  public UnionSketch(final String lgK, final String tgtHllType) {
+    this(Integer.parseInt(lgK), TgtHllType.valueOf(tgtHllType));
+  }
+
+  /**
+   * Base constructor.
+   *
+   * @param lgK parameter controlling the sketch size and accuracy
+   * @param tgtHllType HLL type of the resulting sketch
+   */
+  public UnionSketch(final int lgK, final TgtHllType tgtHllType) {
+    super();
+    lgK_ = lgK;
+    tgtHllType_ = tgtHllType;
+  }
+
+  /**
+   * Top-level exec function.
+   * This method accepts an input Tuple containing a Bag of one or more inner <b>Sketch Tuples</b>
+   * and returns a single serialized HllSketch as a DataByteArray.
+   *
+   * <b>Sketch Tuple</b> is a Tuple containing a single DataByteArray (BYTEARRAY in Pig), which
+   * is a serialized HllSketch.
+   *
+   * @param inputTuple A tuple containing a single bag, containing Sketch Tuples.
+   * @return serialized HllSketch
+   * @see "org.apache.pig.EvalFunc.exec(org.apache.pig.data.Tuple)"
+   * @throws IOException from Pig.
+   */
+  @Override
+  public DataByteArray exec(final Tuple inputTuple) throws IOException {
+    if (isFirstCall_) {
+      Logger.getLogger(getClass()).info("Exec was used");
+      isFirstCall_ = false;
+    }
+    if (inputTuple == null || inputTuple.size() == 0) {
+      if (emptySketch_ == null) {
+        emptySketch_ = new DataByteArray(new HllSketch(lgK_, tgtHllType_).toCompactByteArray());
+      }
+      return emptySketch_;
+    }
+    final Union union = new Union(lgK_);
+    final DataBag bag = (DataBag) inputTuple.get(0);
+    updateUnion(bag, union);
+    return new DataByteArray(union.getResult(tgtHllType_).toCompactByteArray());
+  }
+
+  /**
+   * An <i>Accumulator</i> version of the standard <i>exec()</i> method. Like <i>exec()</i>,
+   * accumulator is called with a bag of Sketch Tuples. Unlike <i>exec()</i>, it doesn't serialize the
+   * result at the end. Instead, it can be called multiple times, each time with another bag of
+   * Sketch Tuples to be input to the union.
+   *
+   * @param inputTuple A tuple containing a single bag, containing Sketch Tuples.
+   * @see #exec
+   * @see "org.apache.pig.Accumulator.accumulate(org.apache.pig.data.Tuple)"
+   * @throws IOException by Pig
+   */
+  @Override
+  public void accumulate(final Tuple inputTuple) throws IOException {
+    if (isFirstCall_) {
+      Logger.getLogger(getClass()).info("Accumulator was used");
+      isFirstCall_ = false;
+    }
+    if (inputTuple == null || inputTuple.size() == 0) { return; }
+    final DataBag bag = (DataBag) inputTuple.get(0);
+    if (bag == null) { return; }
+    if (accumUnion_ == null) {
+      accumUnion_ = new Union(lgK_);
+    }
+    updateUnion(bag, accumUnion_);
+  }
+
+  /**
+   * Returns the sketch that has been built up by multiple calls to {@link #accumulate}.
+   *
+   * @return Sketch Tuple. (see {@link #exec} for return tuple format)
+   * @see "org.apache.pig.Accumulator.getValue()"
+   */
+  @Override
+  public DataByteArray getValue() {
+    if (accumUnion_ == null) {
+      if (emptySketch_ == null) {
+        emptySketch_ = new DataByteArray(
+            new HllSketch(lgK_, tgtHllType_).toCompactByteArray());
+      }
+      return emptySketch_;
+    }
+    return new DataByteArray(
+        accumUnion_.getResult(tgtHllType_).toCompactByteArray());
+  }
+
+  /**
+   * Cleans up the UDF state after being called using the {@link Accumulator} interface.
+   *
+   * @see "org.apache.pig.Accumulator.cleanup()"
+   */
+  @Override
+  public void cleanup() {
+    accumUnion_ = null;
+  }
+
+  @Override
+  public String getInitial() {
+    return AlgebraicInitial.class.getName();
+  }
+
+  @Override
+  public String getIntermed() {
+    return UnionSketchAlgebraicIntermediate.class.getName();
+  }
+
+  @Override
+  public String getFinal() {
+    return UnionSketchAlgebraicFinal.class.getName();
+  }
+
+  static void updateUnion(final DataBag bag, final Union union) throws ExecException {
+    // Bag is not empty. process each innerTuple in the bag
+    for (final Tuple innerTuple : bag) {
+      final Object f0 = innerTuple.get(0); // consider only field 0
+      if (f0 == null) {
+        continue;
+      }
+      final byte type = innerTuple.getType(0);
+      if (type == DataType.BYTEARRAY) {
+        final DataByteArray dba = (DataByteArray) f0;
+        union.update(HllSketch.heapify(Memory.wrap(dba.get())));
+      } else {
+        throw new IllegalArgumentException("Field type was not DataType.BYTEARRAY: " + type);
+      }
+    }
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/UnionSketchAlgebraicFinal.java b/src/main/java/com/yahoo/sketches/pig/hll/UnionSketchAlgebraicFinal.java
new file mode 100644
index 0000000..07d5681
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/UnionSketchAlgebraicFinal.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_HLL_TYPE;
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_LG_K;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+
+import com.yahoo.sketches.hll.TgtHllType;
+import com.yahoo.sketches.hll.Union;
+
+public class UnionSketchAlgebraicFinal extends AlgebraicFinal {
+
+  /**
+   * Default constructor for the final pass of an Algebraic function.
+   * Assumes default lgK and target HLL type.
+   */
+  public UnionSketchAlgebraicFinal() {
+    super(DEFAULT_LG_K, DEFAULT_HLL_TYPE);
+  }
+
+  /**
+   * Constructor for the final pass of an Algebraic function. Pig will call
+   * this and pass the same constructor arguments as the base UDF.
+   * Assumes default HLL target type.
+   *
+   * @param lgK in a form of a String
+   */
+  public UnionSketchAlgebraicFinal(final String lgK) {
+    super(Integer.parseInt(lgK), DEFAULT_HLL_TYPE);
+  }
+
+  /**
+   * Constructor for the final pass of an Algebraic function. Pig will call
+   * this and pass the same constructor arguments as the base UDF.
+   * 
+   * @param lgK parameter controlling the sketch size and accuracy
+   * @param tgtHllType HLL type of the resulting sketch
+   */
+  public UnionSketchAlgebraicFinal(final String lgK, final String tgtHllType) {
+    super(Integer.parseInt(lgK), TgtHllType.valueOf(tgtHllType));
+  }
+
+  @Override
+  void updateUnion(final DataBag bag, final Union union) throws ExecException {
+    UnionSketch.updateUnion(bag, union);
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/UnionSketchAlgebraicIntermediate.java b/src/main/java/com/yahoo/sketches/pig/hll/UnionSketchAlgebraicIntermediate.java
new file mode 100644
index 0000000..77b6335
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/UnionSketchAlgebraicIntermediate.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_HLL_TYPE;
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_LG_K;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+
+import com.yahoo.sketches.hll.TgtHllType;
+import com.yahoo.sketches.hll.Union;
+
+public class UnionSketchAlgebraicIntermediate extends AlgebraicIntermediate {
+
+  /**
+   * Default constructor of the intermediate pass of an Algebraic function.
+   * Assumes default lgK and target HLL type.
+   */
+  public UnionSketchAlgebraicIntermediate() {
+    super(DEFAULT_LG_K, DEFAULT_HLL_TYPE);
+  }
+
+  /**
+   * Constructor for the intermediate pass of an Algebraic function. Pig will call
+   * this and pass the same constructor arguments as the base UDF.
+   * Assumes default HLL target type.
+   *
+   * @param lgK in a form of a String
+   */
+  public UnionSketchAlgebraicIntermediate(final String lgK) {
+    super(Integer.parseInt(lgK), DEFAULT_HLL_TYPE);
+  }
+
+  /**
+   * Constructor for the intermediate pass of an Algebraic function. Pig will call
+   * this and pass the same constructor arguments as the base UDF.
+   * 
+   * @param lgK parameter controlling the sketch size and accuracy
+   * @param tgtHllType HLL type of the resulting sketch
+   */
+  public UnionSketchAlgebraicIntermediate(final String lgK, final String tgtHllType) {
+    super(Integer.parseInt(lgK), TgtHllType.valueOf(tgtHllType));
+  }
+
+  @Override
+  void updateUnion(final DataBag bag, final Union union) throws ExecException {
+    UnionSketch.updateUnion(bag, union);
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/package-info.java b/src/main/java/com/yahoo/sketches/pig/hll/package-info.java
new file mode 100644
index 0000000..f5f2f21
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2017, Yahoo! Inc. Licensed under the terms of the Apache License 2.0. See LICENSE file
+ * at the project root for terms.
+ */
+/**
+ * Pig UDFs for HLL sketches.
+ *
+ * These UDFs can be used as a replacement of corresponding Theta sketch UDFs.
+ * Notice that intersections and A-not-B operations are not supported by the HLL sketch.
+ * Also notice a small difference in the output type of DataToSketch and UnionSketch:
+ * HLL sketch UDFs return DataByteArray (BYTEARRAY in Pig), but corresponding Theta sketch
+ * UDFs return a Tuple with single DataByteArray inside. This was a historical accident,
+ * and we are reluctant to break the compatibility with existing scripts. HLL sketch UDFs
+ * don't have to keep this compatibility. As a result, HLL sketch UDFs don't need
+ * flatten() around them to remove the Tuple, and internally they don't have to spend extra
+ * resources to wrap every output DataByteArray into a Tuple.
+ *
+ * @author Alexander Saydakov
+ */
+package com.yahoo.sketches.pig.hll;
diff --git a/src/test/java/com/yahoo/sketches/pig/hll/DataToSketchTest.java b/src/test/java/com/yahoo/sketches/pig/hll/DataToSketchTest.java
new file mode 100644
index 0000000..2bf393d
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/hll/DataToSketchTest.java
@@ -0,0 +1,286 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.hll.HllSketch;
+import com.yahoo.sketches.hll.TgtHllType;
+
+public class DataToSketchTest {
+
+  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+  private static final BagFactory bagFactory = BagFactory.getInstance();
+
+  @Test
+  public void execNullInputTuple() throws Exception {
+    EvalFunc<DataByteArray> func = new DataToSketch();
+    DataByteArray result = func.exec(null);
+    HllSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void execEmptyInputTuple() throws Exception {
+    EvalFunc<DataByteArray> func = new DataToSketch("10");
+    DataByteArray result = func.exec(tupleFactory.newTuple());
+    HllSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+    Assert.assertEquals(sketch.getLgConfigK(), 10);
+  }
+
+  @Test
+  public void execEmptyBag() throws Exception {
+    EvalFunc<DataByteArray> func = new DataToSketch("10", "HLL_6");
+    DataByteArray result = func.exec(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+    HllSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+    Assert.assertEquals(sketch.getLgConfigK(), 10);
+    Assert.assertEquals(sketch.getTgtHllType(), TgtHllType.HLL_6);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void execUnsupportedType() throws Exception {
+    EvalFunc<DataByteArray> func = new DataToSketch();
+    DataBag bag = bagFactory.newDefaultBag();
+    bag.add(tupleFactory.newTuple(new Object()));
+    func.exec(tupleFactory.newTuple(bag));
+  }
+
+  @Test
+  public void execVariousTypesOfInput() throws Exception {
+    EvalFunc<DataByteArray> func = new DataToSketch();
+    DataBag bag = bagFactory.newDefaultBag();
+    Tuple tupleWithNull = tupleFactory.newTuple(1);
+    tupleWithNull.set(0, null);
+    bag.add(tupleWithNull);
+    bag.add(tupleFactory.newTuple(new Byte((byte) 1)));
+    bag.add(tupleFactory.newTuple(new Integer(2)));
+    bag.add(tupleFactory.newTuple(new Long(3)));
+    bag.add(tupleFactory.newTuple(new Float(1)));
+    bag.add(tupleFactory.newTuple(new Double(2)));
+    bag.add(tupleFactory.newTuple(new DataByteArray(new byte[] {(byte) 1})));
+    bag.add(tupleFactory.newTuple("a"));
+    DataByteArray result = func.exec(tupleFactory.newTuple(bag));
+    HllSketch sketch = getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 7.0, 0.01);
+  }
+
+  @Test
+  public void accumulator() throws Exception {
+    Accumulator<DataByteArray> func = new DataToSketch();
+
+    // no input yet
+    DataByteArray result = func.getValue();
+    HllSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // null input tuple
+    func.accumulate(null);
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // empty input tuple
+    func.accumulate(tupleFactory.newTuple());
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // empty bag
+    func.accumulate(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // normal case
+    DataBag bag = bagFactory.newDefaultBag();
+    bag.add(tupleFactory.newTuple("a"));
+    bag.add(tupleFactory.newTuple("b"));
+    func.accumulate(tupleFactory.newTuple(bag));
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+
+    // cleanup
+    func.cleanup();
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicInitial() throws Exception {
+    EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new DataToSketch().getInitial()).newInstance();
+    Tuple input = tupleFactory.newTuple();
+    Tuple result = func.exec(input);
+    Assert.assertEquals(result, input);
+  }
+
+  @Test
+  public void algebraicInitialWithLgK() throws Exception {
+    EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new DataToSketch().getInitial())
+        .getConstructor(String.class).newInstance("10");
+    Tuple input = tupleFactory.newTuple();
+    Tuple result = func.exec(input);
+    Assert.assertEquals(result, input);
+  }
+
+  @Test
+  public void algebraicInitialWithLgKAndTgtHllType() throws Exception {
+    EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new DataToSketch().getInitial())
+        .getConstructor(String.class, String.class).newInstance("10", "HLL_6");
+    Tuple input = tupleFactory.newTuple();
+    Tuple result = func.exec(input);
+    Assert.assertEquals(result, input);
+  }
+
+  @Test
+  public void algebraicIntermediateNullInputTuple() throws Exception {
+    EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+    Tuple result = func.exec(null);
+    HllSketch sketch = getSketch((DataByteArray) result.get(0));
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicIntermediateEmptyInputTuple() throws Exception {
+    EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed())
+        .getConstructor(String.class).newInstance("10");
+    Tuple result = func.exec(tupleFactory.newTuple());
+    HllSketch sketch = getSketch((DataByteArray) result.get(0));
+    Assert.assertTrue(sketch.isEmpty());
+    Assert.assertEquals(sketch.getLgConfigK(), 10);
+  }
+
+  @Test
+  public void algebraicIntermediateEmptyBag() throws Exception {
+    EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed())
+        .getConstructor(String.class, String.class).newInstance("10", "HLL_6");
+    Tuple result = func.exec(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+    HllSketch sketch = getSketch((DataByteArray) result.get(0));
+    Assert.assertTrue(sketch.isEmpty());
+    Assert.assertEquals(sketch.getLgConfigK(), 10);
+    Assert.assertEquals(sketch.getTgtHllType(), TgtHllType.HLL_6);
+  }
+
+  @Test
+  public void algebraicIntermediateFromInitial() throws Exception {
+    EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+    DataBag outerBag = bagFactory.newDefaultBag();
+    DataBag innerBag = bagFactory.newDefaultBag();
+    innerBag.add(tupleFactory.newTuple("a"));
+    innerBag.add(tupleFactory.newTuple("b"));
+    innerBag.add(tupleFactory.newTuple("c"));
+    outerBag.add(tupleFactory.newTuple(innerBag));
+    Tuple result = func.exec(tupleFactory.newTuple(outerBag));
+    HllSketch sketch = getSketch((DataByteArray) result.get(0));
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 3.0, 0.01);
+  }
+
+  @Test
+  public void algebraicIntermediateFromIntermediate() throws Exception {
+    EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+    HllSketch inputSketch = new HllSketch(12);
+    inputSketch.update("a");
+    inputSketch.update("b");
+    DataBag bag = bagFactory.newDefaultBag();
+    bag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toCompactByteArray())));
+    Tuple result = func.exec(tupleFactory.newTuple(bag));
+    HllSketch sketch = getSketch((DataByteArray) result.get(0));
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+  }
+
+  @Test
+  public void algebraicFinalNullInputTuple() throws Exception {
+    EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+    DataByteArray result = func.exec(null);
+    HllSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicFinalEmptyInputTuple() throws Exception {
+    EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal())
+        .getConstructor(String.class).newInstance("10");
+    DataByteArray result = func.exec(tupleFactory.newTuple());
+    HllSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+    Assert.assertEquals(sketch.getLgConfigK(), 10);
+  }
+
+  @Test
+  public void algebraicFinalEmptyBag() throws Exception {
+    EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal())
+        .getConstructor(String.class, String.class).newInstance("10", "HLL_6");
+    DataByteArray result = func.exec(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+    HllSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+    Assert.assertEquals(sketch.getLgConfigK(), 10);
+    Assert.assertEquals(sketch.getTgtHllType(), TgtHllType.HLL_6);
+  }
+
+  @Test
+  public void algebraicFinalFromInitial() throws Exception {
+    EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+    DataBag outerBag = bagFactory.newDefaultBag();
+    DataBag innerBag = bagFactory.newDefaultBag();
+    innerBag.add(tupleFactory.newTuple("a"));
+    innerBag.add(tupleFactory.newTuple("b"));
+    innerBag.add(tupleFactory.newTuple("c"));
+    outerBag.add(tupleFactory.newTuple(innerBag));
+    DataByteArray result = func.exec(tupleFactory.newTuple(outerBag));
+    HllSketch sketch = getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 3.0, 0.01);
+  }
+
+  @Test
+  public void algebraicFinalFromIntermediate() throws Exception {
+    EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+    HllSketch inputSketch = new HllSketch(12);
+    inputSketch.update("a");
+    inputSketch.update("b");
+    DataBag bag = bagFactory.newDefaultBag();
+    bag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toCompactByteArray())));
+    DataByteArray result = func.exec(tupleFactory.newTuple(bag));
+    HllSketch sketch = getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+  }
+
+  static HllSketch getSketch(DataByteArray dba) throws Exception {
+    Assert.assertNotNull(dba);
+    Assert.assertTrue(dba.size() > 0);
+    return HllSketch.heapify(dba.get()); 
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/hll/SketchToEstimateAndErrorBoundsTest.java b/src/test/java/com/yahoo/sketches/pig/hll/SketchToEstimateAndErrorBoundsTest.java
new file mode 100644
index 0000000..a59f6bb
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/hll/SketchToEstimateAndErrorBoundsTest.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.hll.HllSketch;
+
+public class SketchToEstimateAndErrorBoundsTest {
+
+  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+
+  @Test
+  public void nullInputTuple() throws Exception {
+    EvalFunc<Tuple> func = new SketchToEstimateAndErrorBounds();
+    Tuple result = func.exec(null);
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void emptyInputTuple() throws Exception {
+    EvalFunc<Tuple> func = new SketchToEstimateAndErrorBounds();
+    Tuple result = func.exec(tupleFactory.newTuple());
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void normalCase() throws Exception {
+    EvalFunc<Tuple> func = new SketchToEstimateAndErrorBounds();
+    HllSketch sketch = new HllSketch(12);
+    sketch.update(1);
+    sketch.update(2);
+    Tuple result = func.exec(tupleFactory.newTuple(new DataByteArray(sketch.toCompactByteArray())));
+    Assert.assertNotNull(result);
+    Assert.assertEquals((Double) result.get(0), 2.0, 0.01);
+    Assert.assertTrue((Double) result.get(1) <= 2.0);
+    Assert.assertTrue((Double) result.get(2) >= 2.0);
+  }
+
+  @Test
+  public void schema() throws Exception {
+    EvalFunc<Tuple> func = new SketchToEstimateAndErrorBounds();
+    Schema inputSchema = new Schema(new Schema.FieldSchema("Sketch", DataType.BYTEARRAY));
+    Schema outputSchema = func.outputSchema(inputSchema);
+    Assert.assertNotNull(outputSchema);
+    Assert.assertEquals(outputSchema.size(), 1);
+    Assert.assertEquals(DataType.findTypeName(outputSchema.getField(0).type), "tuple");
+    Schema innerSchema = outputSchema.getField(0).schema;
+    Assert.assertEquals(innerSchema.size(), 3);
+    Assert.assertEquals(DataType.findTypeName(innerSchema.getField(0).type), "double");
+    Assert.assertEquals(DataType.findTypeName(innerSchema.getField(1).type), "double");
+    Assert.assertEquals(DataType.findTypeName(innerSchema.getField(2).type), "double");
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/hll/SketchToEstimateTest.java b/src/test/java/com/yahoo/sketches/pig/hll/SketchToEstimateTest.java
new file mode 100644
index 0000000..7d5a949
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/hll/SketchToEstimateTest.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.TupleFactory;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.hll.HllSketch;
+
+import junit.framework.Assert;
+
+public class SketchToEstimateTest {
+
+  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+
+  @Test
+  public void nullInputTuple() throws Exception {
+    EvalFunc<Double> func = new SketchToEstimate();
+    Double result = func.exec(null);
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void emptyInputTuple() throws Exception {
+    EvalFunc<Double> func = new SketchToEstimate();
+    Double result = func.exec(tupleFactory.newTuple());
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void normalCase() throws Exception {
+    EvalFunc<Double> func = new SketchToEstimate();
+    HllSketch sketch = new HllSketch(12);
+    sketch.update(1);
+    sketch.update(2);
+    Double result = func.exec(tupleFactory.newTuple(new DataByteArray(sketch.toCompactByteArray())));
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result, 2.0, 0.01);
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/hll/SketchToStringTest.java b/src/test/java/com/yahoo/sketches/pig/hll/SketchToStringTest.java
new file mode 100644
index 0000000..f6185e8
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/hll/SketchToStringTest.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.TupleFactory;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.hll.HllSketch;
+
+import junit.framework.Assert;
+
+public class SketchToStringTest {
+
+  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+
+  @Test
+  public void nullInputTuple() throws Exception {
+    EvalFunc<String> func = new SketchToString();
+    String result = func.exec(null);
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void emptyInputTuple() throws Exception {
+    EvalFunc<String> func = new SketchToString();
+    String result = func.exec(tupleFactory.newTuple());
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void normalCase() throws Exception {
+    EvalFunc<String> func = new SketchToString();
+    HllSketch sketch = new HllSketch(12);
+    String result = func.exec(tupleFactory.newTuple(new DataByteArray(sketch.toCompactByteArray())));
+    Assert.assertNotNull(result);
+    Assert.assertTrue(result.length() > 0);
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/hll/UnionSketchTest.java b/src/test/java/com/yahoo/sketches/pig/hll/UnionSketchTest.java
new file mode 100644
index 0000000..74dd855
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/hll/UnionSketchTest.java
@@ -0,0 +1,234 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.hll.HllSketch;
+import com.yahoo.sketches.hll.TgtHllType;
+
+public class UnionSketchTest {
+
+  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+  private static final BagFactory bagFactory = BagFactory.getInstance();
+
+  @Test
+  public void execNullInputTuple() throws Exception {
+    EvalFunc<DataByteArray> func = new UnionSketch();
+    DataByteArray result = func.exec(null);
+    HllSketch sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void execEmptyInputTuple() throws Exception {
+    EvalFunc<DataByteArray> func = new UnionSketch("10");
+    DataByteArray result = func.exec(tupleFactory.newTuple());
+    HllSketch sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+    Assert.assertEquals(sketch.getLgConfigK(), 10);
+  }
+
+  @Test
+  public void execEmptyBag() throws Exception {
+    EvalFunc<DataByteArray> func = new UnionSketch("10", "HLL_6");
+    DataByteArray result = func.exec(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+    HllSketch sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+    Assert.assertEquals(sketch.getLgConfigK(), 10);
+    Assert.assertEquals(sketch.getTgtHllType(), TgtHllType.HLL_6);
+  }
+
+  @Test
+  public void execNormalCase() throws Exception {
+    EvalFunc<DataByteArray> func = new UnionSketch();
+    HllSketch inputSketch = new HllSketch(12);
+    inputSketch.update(1);
+    inputSketch.update(2);
+    DataBag bag = bagFactory.newDefaultBag();
+    bag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toCompactByteArray())));
+    DataByteArray result = func.exec(tupleFactory.newTuple(bag));
+    HllSketch sketch = DataToSketchTest.getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+  }
+
+  @Test
+  public void accumulator() throws Exception {
+    Accumulator<DataByteArray> func = new UnionSketch();
+
+    // no input yet
+    DataByteArray result = func.getValue();
+    HllSketch sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // null input tuple
+    func.accumulate(null);
+    result = func.getValue();
+    sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // empty input tuple
+    func.accumulate(tupleFactory.newTuple());
+    result = func.getValue();
+    sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // empty bag
+    func.accumulate(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+    result = func.getValue();
+    sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // normal case
+    HllSketch inputSketch = new HllSketch(12);
+    inputSketch.update(1);
+    inputSketch.update(2);
+    DataBag bag = bagFactory.newDefaultBag();
+    bag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toCompactByteArray())));
+    func.accumulate(tupleFactory.newTuple(bag));
+    result = func.getValue();
+    sketch = DataToSketchTest.getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+
+    // cleanup
+    func.cleanup();
+    result = func.getValue();
+    sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicInitial() throws Exception {
+    EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new UnionSketch().getInitial()).newInstance();
+    Tuple input = tupleFactory.newTuple();
+    Tuple result = func.exec(input);
+    Assert.assertEquals(result, input);
+  }
+
+  @Test
+  public void algebraicIntermediateNullInputTuple() throws Exception {
+    EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed()).newInstance();
+    Tuple result = func.exec(null);
+    HllSketch sketch = DataToSketchTest.getSketch((DataByteArray) result.get(0));
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicIntermediateEmptyInputTuple() throws Exception {
+    EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed())
+        .getConstructor(String.class).newInstance("10");
+    Tuple result = func.exec(tupleFactory.newTuple());
+    HllSketch sketch = DataToSketchTest.getSketch((DataByteArray) result.get(0));
+    Assert.assertTrue(sketch.isEmpty());
+    Assert.assertEquals(sketch.getLgConfigK(), 10);
+  }
+
+  @Test
+  public void algebraicIntermediateFromInitial() throws Exception {
+    EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed())
+        .getConstructor(String.class, String.class).newInstance("10", "HLL_6");
+    HllSketch inputSketch = new HllSketch(12);
+    inputSketch.update(1);
+    inputSketch.update(2);
+    inputSketch.update(3);
+    DataBag outerBag = bagFactory.newDefaultBag();
+    DataBag innerBag = bagFactory.newDefaultBag();
+    innerBag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toCompactByteArray())));
+    outerBag.add(tupleFactory.newTuple(innerBag));
+    Tuple result = func.exec(tupleFactory.newTuple(outerBag));
+    HllSketch sketch = DataToSketchTest.getSketch((DataByteArray) result.get(0));
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 3.0, 0.01);
+    Assert.assertEquals(sketch.getLgConfigK(), 10);
+    Assert.assertEquals(sketch.getTgtHllType(), TgtHllType.HLL_6);
+  }
+
+  @Test
+  public void algebraicIntermediateFromIntermediate() throws Exception {
+    EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed()).newInstance();
+    HllSketch inputSketch = new HllSketch(12);
+    inputSketch.update("a");
+    inputSketch.update("b");
+    DataBag bag = bagFactory.newDefaultBag();
+    bag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toCompactByteArray())));
+    Tuple result = func.exec(tupleFactory.newTuple(bag));
+    HllSketch sketch = DataToSketchTest.getSketch((DataByteArray) result.get(0));
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+  }
+
+  @Test
+  public void algebraicFinalNullInputTuple() throws Exception {
+    EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal()).newInstance();
+    DataByteArray result = func.exec(null);
+    HllSketch sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicFinalEmptyInputTuple() throws Exception {
+    EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal())
+        .getConstructor(String.class).newInstance("10");
+    DataByteArray result = func.exec(tupleFactory.newTuple());
+    HllSketch sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+    Assert.assertEquals(sketch.getLgConfigK(), 10);
+  }
+
+  @Test
+  public void algebraicFinalFromInitial() throws Exception {
+    EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal())
+        .getConstructor(String.class, String.class).newInstance("10", "HLL_6");
+    HllSketch inputSketch = new HllSketch(12);
+    inputSketch.update(1);
+    inputSketch.update(2);
+    inputSketch.update(3);
+    DataBag outerBag = bagFactory.newDefaultBag();
+    DataBag innerBag = bagFactory.newDefaultBag();
+    innerBag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toCompactByteArray())));
+    outerBag.add(tupleFactory.newTuple(innerBag));
+    DataByteArray result = func.exec(tupleFactory.newTuple(outerBag));
+    HllSketch sketch = DataToSketchTest.getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 3.0, 0.01);
+    Assert.assertEquals(sketch.getLgConfigK(), 10);
+    Assert.assertEquals(sketch.getTgtHllType(), TgtHllType.HLL_6);
+  }
+
+  @Test
+  public void algebraicFinalFromIntermediate() throws Exception {
+    EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal()).newInstance();
+    HllSketch inputSketch = new HllSketch(12);
+    inputSketch.update("a");
+    inputSketch.update("b");
+    DataBag bag = bagFactory.newDefaultBag();
+    bag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toCompactByteArray())));
+    DataByteArray result = func.exec(tupleFactory.newTuple(bag));
+    HllSketch sketch = DataToSketchTest.getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+  }
+
+}