Merge pull request #55 from DataSketches/kll-cpc-udfs

KLL and CPC UDFs
diff --git a/pom.xml b/pom.xml
index 6c2db9d..675a21a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,15 +145,14 @@
     <dependency>
       <groupId>com.yahoo.datasketches</groupId>
       <artifactId>sketches-core</artifactId>
-      <version>0.11.0</version>
+      <version>0.13.0</version>
     </dependency>
 
     <!-- Pig -->
     <dependency>
       <groupId>org.apache.pig</groupId>
       <artifactId>pig</artifactId>
-      <version>0.16.0</version>
-      <classifier>h2</classifier>
+      <version>0.17.0</version>
       <scope>provided</scope>
     </dependency>
 
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/AlgebraicFinal.java b/src/main/java/com/yahoo/sketches/pig/cpc/AlgebraicFinal.java
new file mode 100644
index 0000000..aacc381
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/AlgebraicFinal.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+import com.yahoo.sketches.cpc.CpcUnion;
+
+/**
+ * Class used to calculate the final pass of an <i>Algebraic</i> sketch
+ * operation. It will receive a bag of values returned by either the <i>Intermediate</i>
+ * stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
+ * interpret both types.
+ * 
+ * @author Alexander Saydakov
+ */
+abstract class AlgebraicFinal extends EvalFunc<DataByteArray> {
+
+  private final int lgK_;
+  private final long seed_;
+  private DataByteArray emptySketch_; // this is to cash an empty sketch tuple
+  private boolean isFirstCall_ = true; // for logging
+
+  /**
+   * Constructor with primitives for the final passes of an Algebraic function.
+   *
+   * @param lgK parameter controlling the sketch size and accuracy
+   * @param seed for the hash function
+   */
+  public AlgebraicFinal(final int lgK, final long seed) {
+    lgK_ = lgK;
+    seed_ = seed;
+  }
+
+  @Override
+  public DataByteArray exec(final Tuple inputTuple) throws IOException {
+    if (isFirstCall_) {
+      Logger.getLogger(getClass()).info("Algebraic was used");
+      isFirstCall_ = false;
+    }
+    final DataByteArray dba = process(inputTuple, lgK_, seed_, isInputRaw());
+    if (dba == null) {
+      return getEmptySketch();
+    }
+    return dba;
+  }
+
+  static DataByteArray process(final Tuple inputTuple, final int lgK, final long seed, boolean isInputRaw) throws IOException {
+    if (inputTuple == null || inputTuple.size() == 0) {
+      return null;
+    }
+    CpcSketch sketch = null;
+    CpcUnion union = null;
+    final DataBag outerBag = (DataBag) inputTuple.get(0);
+    if (outerBag == null) {
+      return null;
+    }
+    for (final Tuple dataTuple: outerBag) {
+      final Object f0 = dataTuple.get(0); // inputTuple.bag0.dataTupleN.f0
+      if (f0 == null) {
+        continue;
+      }
+      if (f0 instanceof DataBag) {
+        final DataBag innerBag = (DataBag) f0; // inputTuple.bag0.dataTupleN.f0:bag
+        if (innerBag.size() == 0) { continue; }
+        // If field 0 of a dataTuple is a Bag, all innerTuples of this inner bag
+        // will be passed into the union.
+        // It is due to system bagged outputs from multiple mapper Initial functions.
+        // The Intermediate stage was bypassed.
+        if (isInputRaw) {
+          if (sketch == null) {
+            sketch = new CpcSketch(lgK, seed);
+          }
+          DataToSketch.updateSketch(innerBag, sketch);
+        } else {
+          if (union == null) {
+            union = new CpcUnion(lgK, seed);
+          }
+          UnionSketch.updateUnion(innerBag, union, seed);
+        }
+      } else if (f0 instanceof DataByteArray) { // inputTuple.bag0.dataTupleN.f0:DBA
+        // If field 0 of a dataTuple is a DataByteArray, we assume it is a sketch
+        // due to system bagged outputs from multiple mapper Intermediate functions.
+        // Each dataTuple.DBA:sketch will merged into the union.
+        final DataByteArray dba = (DataByteArray) f0;
+        if (union == null) {
+          union = new CpcUnion(lgK, seed);
+        }
+        union.update(CpcSketch.heapify(dba.get(), seed));
+      } else { // we should never get here
+        throw new IllegalArgumentException("dataTuple.Field0 is not a DataBag or DataByteArray: "
+            + f0.getClass().getName());
+      }
+    }
+    if (sketch != null && union != null) {
+      union.update(sketch);
+      sketch = null;
+    }
+    if (sketch != null) {
+      return new DataByteArray(sketch.toByteArray());
+    } else if (union != null) {
+      return new DataByteArray(union.getResult().toByteArray());
+    }
+    return null;
+  }
+
+  abstract boolean isInputRaw();
+
+  private DataByteArray getEmptySketch() {
+    if (emptySketch_ == null) {
+      emptySketch_ = new DataByteArray(new CpcSketch(lgK_, seed_).toByteArray());
+    }
+    return emptySketch_;
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/AlgebraicInitial.java b/src/main/java/com/yahoo/sketches/pig/cpc/AlgebraicInitial.java
new file mode 100644
index 0000000..f332dd0
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/AlgebraicInitial.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Class used to calculate the initial pass of an Algebraic sketch operation.
+ *
+ * <p>The Initial class simply passes through all records unchanged so that they can be
+ * processed by the intermediate processor instead.
+ * 
+ * @author Alexander Saydakov
+ */
+public class AlgebraicInitial extends EvalFunc<Tuple> {
+
+  private boolean isFirstCall_ = true;
+
+  /**
+   * Default constructor for the initial pass of an Algebraic function.
+   */
+  public AlgebraicInitial() {}
+
+  /**
+   * Constructor for the initial pass of an Algebraic function. Pig will call this and pass the
+   * same constructor arguments as the original UDF. In this case the arguments are ignored.
+   *
+   * @param lgK in a form of a String
+   */
+  public AlgebraicInitial(final String lgK) {}
+
+  /**
+   * Constructor for the initial pass of an Algebraic function. Pig will call this and pass the
+   * same constructor arguments as the original UDF. In this case the arguments are ignored.
+   *
+   * @param lgK in a form of a String
+   * @param seed in a form of a String
+   */
+  public AlgebraicInitial(final String lgK, final String seed) {}
+
+  @Override
+  public Tuple exec(final Tuple inputTuple) throws IOException {
+    if (isFirstCall_) {
+      Logger.getLogger(getClass()).info("Algebraic was used");
+      isFirstCall_ = false;
+    }
+    return inputTuple;
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/AlgebraicIntermediate.java b/src/main/java/com/yahoo/sketches/pig/cpc/AlgebraicIntermediate.java
new file mode 100644
index 0000000..45473ad
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/AlgebraicIntermediate.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+/**
+ * Class used to calculate the intermediate combiner pass of an <i>Algebraic</i> sketch
+ * operation. This is called from the combiner, and may be called multiple times (from a mapper
+ * and from a reducer). It will receive a bag of values returned by either <i>Intermediate</i>
+ * or <i>Initial</i> stages, so it needs to be able to differentiate between and
+ * interpret both types.
+ * 
+ * @author Alexander Saydakov
+ */
+abstract class AlgebraicIntermediate extends EvalFunc<Tuple> {
+
+  private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+  private final int lgK_;
+  private final long seed_;
+  private Tuple emptySketchTuple_; // this is to cash an empty sketch tuple
+  private boolean isFirstCall_ = true; // for logging
+
+  /**
+   * Constructor with primitives for the intermediate pass of an Algebraic function.
+   *
+   * @param lgK parameter controlling the sketch size and accuracy
+   * @param seed
+   */
+  public AlgebraicIntermediate(final int lgK, final long seed) {
+    lgK_ = lgK;
+    seed_ = seed;
+  }
+
+  @Override
+  public Tuple exec(final Tuple inputTuple) throws IOException {
+    if (isFirstCall_) {
+      Logger.getLogger(getClass()).info("Algebraic was used");
+      isFirstCall_ = false;
+    }
+    final DataByteArray dba = AlgebraicFinal.process(inputTuple, lgK_, seed_, isInputRaw());
+    if (dba == null) {
+      return getEmptySketchTuple();
+    }
+    return TUPLE_FACTORY.newTuple(dba);
+  }
+
+  abstract boolean isInputRaw();
+
+  private Tuple getEmptySketchTuple() {
+    if (emptySketchTuple_ == null) {
+      emptySketchTuple_ = TUPLE_FACTORY.newTuple(new DataByteArray(
+          new CpcSketch(lgK_, seed_).toByteArray()));
+    }
+    return emptySketchTuple_;
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/DataToSketch.java b/src/main/java/com/yahoo/sketches/pig/cpc/DataToSketch.java
new file mode 100644
index 0000000..5d8aa10
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/DataToSketch.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+/**
+ * This is a Pig UDF that builds sketches from data.
+ * This class implements both the <i>Accumulator</i> and <i>Algebraic</i> interfaces.
+ *
+ * @author Alexander Saydakov
+ */
+public class DataToSketch extends EvalFunc<DataByteArray> implements Accumulator<DataByteArray>, Algebraic {
+
+  private DataByteArray emptySketch_; // this is to cash an empty sketch
+
+  private final int lgK_;
+  private final long seed_;
+  private CpcSketch accumSketch_;
+  private boolean isFirstCall_; // for logging
+
+  /**
+   * Constructor with default lgK and seed
+   */
+  public DataToSketch() {
+    this(CpcSketch.DEFAULT_LG_K, DEFAULT_UPDATE_SEED);
+  }
+
+  /**
+   * Constructor with given lgK as string and default seed
+   *
+   * @param lgK in a form of a String
+   */
+  public DataToSketch(final String lgK) {
+    this(Integer.parseInt(lgK), DEFAULT_UPDATE_SEED);
+  }
+
+  /**
+   * Constructor with given lgK and seed as strings
+   *
+   * @param lgK in a form of a String
+   * @param seed in a form of a String
+   */
+  public DataToSketch(final String lgK, final String seed) {
+    this(Integer.parseInt(lgK), Long.parseLong(seed));
+  }
+
+  /**
+   * Base constructor.
+   *
+   * @param lgK parameter controlling the sketch size and accuracy
+   * @param seed parameter to use during hashing
+   */
+  public DataToSketch(final int lgK, final long seed) {
+    super();
+    lgK_ = lgK;
+    seed_ = seed;
+  }
+
+  /**
+   * Top-level exec function.
+   * This method accepts an input Tuple containing a Bag of one or more inner <b>Datum Tuples</b>
+   * and returns a single serialized CpcSketch as a DataByteArray.
+   *
+   * <b>Datum Tuple</b> is a Tuple containing a single field, which can be one of the following
+   * (Java type: Pig type):
+   * <ul>
+   *   <li>Byte: BYTE</li>
+   *   <li>Integer: INTEGER</li>
+   *   <li>Long: LONG</li>
+   *   <li>Float: FLOAT</li>
+   *   <li>Double: DOUBLE</li>
+   *   <li>String: CHARARRAY</li>
+   *   <li>DataByteArray: BYTEARRAY</li>
+   * </ul>
+   *
+   * @param inputTuple A tuple containing a single bag, containing Datum Tuples.
+   * @return serialized CpcSketch
+   * @see "org.apache.pig.EvalFunc.exec(org.apache.pig.data.Tuple)"
+   * @throws IOException from Pig
+   */
+
+  @Override
+  public DataByteArray exec(final Tuple inputTuple) throws IOException {
+    if (isFirstCall_) {
+      Logger.getLogger(getClass()).info("Exec was used");
+      isFirstCall_ = false;
+    }
+    if (inputTuple == null || inputTuple.size() == 0) {
+      if (emptySketch_ == null) {
+        emptySketch_ = new DataByteArray(new CpcSketch(lgK_, seed_).toByteArray());
+      }
+      return emptySketch_;
+    }
+    final CpcSketch sketch = new CpcSketch(lgK_, seed_);
+    final DataBag bag = (DataBag) inputTuple.get(0);
+    updateSketch(bag, sketch);
+    return new DataByteArray(sketch.toByteArray());
+  }
+
+  /**
+   * An <i>Accumulator</i> version of the standard <i>exec()</i> method. Like <i>exec()</i>,
+   * accumulator is called with a bag of Datum Tuples. Unlike <i>exec()</i>, it doesn't serialize the
+   * result at the end. Instead, it can be called multiple times, each time with another bag of
+   * Datum Tuples to be input to the sketch.
+   *
+   * @param inputTuple A tuple containing a single bag, containing Datum Tuples.
+   * @see #exec
+   * @see "org.apache.pig.Accumulator.accumulate(org.apache.pig.data.Tuple)"
+   * @throws IOException by Pig
+   */
+  @Override
+  public void accumulate(final Tuple inputTuple) throws IOException {
+    if (isFirstCall_) {
+      Logger.getLogger(getClass()).info("Accumulator was used");
+      isFirstCall_ = false;
+    }
+    if (inputTuple == null || inputTuple.size() == 0) { return; }
+    final DataBag bag = (DataBag) inputTuple.get(0);
+    if (bag == null) { return; }
+    if (accumSketch_ == null) {
+      accumSketch_ = new CpcSketch(lgK_);
+    }
+    updateSketch(bag, accumSketch_);
+  }
+
+  /**
+   * Returns the sketch that has been built up by multiple calls to {@link #accumulate}.
+   *
+   * @return serialized CpcSketch
+   * @see "org.apache.pig.Accumulator.getValue()"
+   */
+  @Override
+  public DataByteArray getValue() {
+    if (accumSketch_ == null) {
+      if (emptySketch_ == null) {
+        emptySketch_ = new DataByteArray(new CpcSketch(lgK_, seed_).toByteArray());
+      }
+      return emptySketch_;
+    }
+    return new DataByteArray(accumSketch_.toByteArray());
+  }
+
+  /**
+   * Cleans up the UDF state after being called using the {@link Accumulator} interface.
+   *
+   * @see "org.apache.pig.Accumulator.cleanup()"
+   */
+  @Override
+  public void cleanup() {
+    accumSketch_ = null;
+  }
+
+  @Override
+  public String getInitial() {
+    return AlgebraicInitial.class.getName();
+  }
+
+  @Override
+  public String getIntermed() {
+    return DataToSketchAlgebraicIntermediate.class.getName();
+  }
+
+  @Override
+  public String getFinal() {
+    return DataToSketchAlgebraicFinal.class.getName();
+  }
+
+  static void updateSketch(final DataBag bag, final CpcSketch sketch) throws ExecException {
+    // bag is not empty, process each innerTuple in the bag
+    for (final Tuple innerTuple: bag) {
+      final Object f0 = innerTuple.get(0); // consider only field 0
+      if (f0 == null) {
+        continue;
+      }
+      final byte type = innerTuple.getType(0);
+
+      switch (type) {
+        case DataType.NULL:
+          break;
+        case DataType.BYTE:
+          sketch.update((byte) f0);
+          break;
+        case DataType.INTEGER:
+          sketch.update((int) f0);
+          break;
+        case DataType.LONG:
+          sketch.update((long) f0);
+          break;
+        case DataType.FLOAT:
+          sketch.update((float) f0);
+          break;
+        case DataType.DOUBLE:
+          sketch.update((double) f0);
+          break;
+        case DataType.BYTEARRAY: {
+          final DataByteArray dba = (DataByteArray) f0;
+          sketch.update(dba.get());
+          break;
+        }
+        case DataType.CHARARRAY: {
+          final String str = (String) f0;
+          // conversion to char[] avoids costly UTF-8 encoding
+          sketch.update(str.toCharArray());
+          break;
+        }
+        default:
+          throw new IllegalArgumentException("Field 0 of innerTuple must be one of "
+              + "NULL, BYTE, INTEGER, LONG, FLOAT, DOUBLE, BYTEARRAY or CHARARRAY. "
+              + "Given Type = " + DataType.findTypeName(type)
+              + ", Object = " + f0.toString());
+      }
+    }
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/DataToSketchAlgebraicFinal.java b/src/main/java/com/yahoo/sketches/pig/cpc/DataToSketchAlgebraicFinal.java
new file mode 100644
index 0000000..e425d77
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/DataToSketchAlgebraicFinal.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+public class DataToSketchAlgebraicFinal extends AlgebraicFinal {
+
+  /**
+   * Default constructor for the final pass of an Algebraic function.
+   * Assumes default lgK and seed.
+   */
+  public DataToSketchAlgebraicFinal() {
+    super(CpcSketch.DEFAULT_LG_K, DEFAULT_UPDATE_SEED);
+  }
+
+  /**
+   * Constructor for the final pass of an Algebraic function. Pig will call
+   * this and pass the same constructor arguments as the base UDF.
+   * Assumes default seed.
+   *
+   * @param lgK in a form of a String
+   */
+  public DataToSketchAlgebraicFinal(final String lgK) {
+    super(Integer.parseInt(lgK), DEFAULT_UPDATE_SEED);
+  }
+
+  /**
+   * Constructor for the final pass of an Algebraic function. Pig will call
+   * this and pass the same constructor arguments as the base UDF.
+   * 
+   * @param lgK parameter controlling the sketch size and accuracy
+   * @param seed for the hash function
+   */
+  public DataToSketchAlgebraicFinal(final String lgK, final String seed) {
+    super(Integer.parseInt(lgK), Long.parseLong(seed));
+  }
+
+  @Override
+  boolean isInputRaw() {
+    return true;
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/DataToSketchAlgebraicIntermediate.java b/src/main/java/com/yahoo/sketches/pig/cpc/DataToSketchAlgebraicIntermediate.java
new file mode 100644
index 0000000..dea8591
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/DataToSketchAlgebraicIntermediate.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+public class DataToSketchAlgebraicIntermediate extends AlgebraicIntermediate {
+
+  /**
+   * Default constructor for the intermediate pass of an Algebraic function.
+   * Assumes default lgK and seed.
+   */
+  public DataToSketchAlgebraicIntermediate() {
+    super(CpcSketch.DEFAULT_LG_K, DEFAULT_UPDATE_SEED);
+  }
+
+  /**
+   * Constructor for the intermediate pass of an Algebraic function. Pig will call
+   * this and pass the same constructor arguments as the base UDF.
+   * Assumes default seed.
+   *
+   * @param lgK in a form of a String
+   */
+  public DataToSketchAlgebraicIntermediate(final String lgK) {
+    super(Integer.parseInt(lgK), DEFAULT_UPDATE_SEED);
+  }
+
+  /**
+   * Constructor for the intermediate pass of an Algebraic function. Pig will call
+   * this and pass the same constructor arguments as the base UDF.
+   * 
+   * @param lgK parameter controlling the sketch size and accuracy
+   * @param seed for the hash function
+   */
+  public DataToSketchAlgebraicIntermediate(final String lgK, final String seed) {
+    super(Integer.parseInt(lgK), Long.parseLong(seed));
+  }
+
+  @Override
+  boolean isInputRaw() {
+    return true;
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/GetEstimate.java b/src/main/java/com/yahoo/sketches/pig/cpc/GetEstimate.java
new file mode 100644
index 0000000..88ce2a5
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/GetEstimate.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+/**
+ * This is a User Defined Function (UDF) for getting a distinct count estimate from a given CpcdSketch
+ *
+ * @author Alexander Saydakov
+ */
+public class GetEstimate extends EvalFunc<Double> {
+
+  private final long seed_;
+
+  /**
+   * Constructor with default seed
+   */
+  public GetEstimate() {
+    this(DEFAULT_UPDATE_SEED);
+  }
+
+  /**
+   * Constructor with given seed
+   * @param seed in a form of a String
+   */
+  public GetEstimate(final String seed) {
+    this(Long.parseLong(seed));
+  }
+
+  /**
+   * Base constructor
+   * @param seed parameter for the hash function
+   */
+  GetEstimate(final long seed) {
+    seed_ = seed;
+  }
+
+  @Override
+  public Double exec(final Tuple sketchTuple) throws IOException {
+    if ((sketchTuple == null) || (sketchTuple.size() == 0)) {
+      return null;
+    }
+    final DataByteArray dba = (DataByteArray) sketchTuple.get(0);
+    final CpcSketch sketch = CpcSketch.heapify(dba.get(), seed_);
+    return sketch.getEstimate();
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/GetEstimateAndErrorBounds.java b/src/main/java/com/yahoo/sketches/pig/cpc/GetEstimateAndErrorBounds.java
new file mode 100644
index 0000000..be87f1b
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/GetEstimateAndErrorBounds.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+/**
+ * This is a User Defined Function (UDF) for obtaining the distinct count estimate
+ * along with a lower and upper bound from a given CpcSketch.
+ *
+ * <p>The result is a tuple with three double values: estimate, lower bound and upper bound.
+ * The bounds are best estimates for the confidence interval given <i>kappa</i>, which represents
+ * the number of standard deviations from the mean (1, 2 or 3).
+ *
+ * @author Alexander Saydakov
+ */
+public class GetEstimateAndErrorBounds extends EvalFunc<Tuple> {
+
+  private static int DEFAULT_KAPPA = 2;
+
+  private final int kappa_;
+  private final long seed_;
+
+  /**
+   * Constructor with default kappa and seed
+   */
+  public GetEstimateAndErrorBounds() {
+    this(DEFAULT_KAPPA, DEFAULT_UPDATE_SEED);
+  }
+
+  /**
+   * Constructor with given kappa and default seed
+   * @param kappa in a form of a String
+   */
+  public GetEstimateAndErrorBounds(final String kappa) {
+    this(Integer.parseInt(kappa), DEFAULT_UPDATE_SEED);
+  }
+
+  /**
+   * Constructor with given kappa and seed
+   * @param kappa in a form of a String
+   * @param seed in a form of a String
+   */
+  public GetEstimateAndErrorBounds(final String kappa, final String seed) {
+    this(Integer.parseInt(kappa), Long.parseLong(seed));
+  }
+
+  /**
+   * Base constructor
+   * @param kappa the given number of standard deviations from the mean: 1, 2 or 3
+   * @param seed parameter for the hash function
+   */
+  GetEstimateAndErrorBounds(final int kappa, final long seed) {
+    kappa_ = kappa;
+    seed_ = seed;
+  }
+
+  @Override
+  public Tuple exec(final Tuple sketchTuple) throws IOException {
+    if ((sketchTuple == null) || (sketchTuple.size() == 0)) {
+      return null;
+    }
+    final DataByteArray dba = (DataByteArray) sketchTuple.get(0);
+    final CpcSketch sketch = CpcSketch.heapify(dba.get(), seed_);
+    final Tuple outputTuple = TupleFactory.getInstance().newTuple(3);
+    outputTuple.set(0, Double.valueOf(sketch.getEstimate()));
+    outputTuple.set(1, Double.valueOf(sketch.getLowerBound(kappa_)));
+    outputTuple.set(2, Double.valueOf(sketch.getUpperBound(kappa_)));
+    return outputTuple;
+  }
+
+  /**
+   * The output is a Sketch Result Tuple Schema.
+   */
+  @Override
+  public Schema outputSchema(final Schema input) {
+    if (input == null) { return null; }
+    try {
+      final Schema tupleSchema = new Schema();
+      tupleSchema.add(new Schema.FieldSchema("Estimate", DataType.DOUBLE));
+      tupleSchema.add(new Schema.FieldSchema("LowerBound", DataType.DOUBLE));
+      tupleSchema.add(new Schema.FieldSchema("UpperBound", DataType.DOUBLE));
+      return new Schema(new Schema.FieldSchema(getSchemaName(this
+          .getClass().getName().toLowerCase(), input), tupleSchema, DataType.TUPLE));
+    } catch (final FrontendException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/SketchToString.java b/src/main/java/com/yahoo/sketches/pig/cpc/SketchToString.java
new file mode 100644
index 0000000..b82aa24
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/SketchToString.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+/**
+ * This is a User Defined Function (UDF) for printing a human-readable summary of a given CpcSketch
+ * @author Alexander Saydakov
+ */
+public class SketchToString extends EvalFunc<String> {
+
+  private final boolean isDetailed_;
+  private final long seed_;
+
+  /**
+   * Prints only the sketch summary using the default seed.
+   */
+  public SketchToString() {
+    this(false, DEFAULT_UPDATE_SEED);
+  }
+
+  /**
+   * Prints the summary and details using the default seed.
+   *
+   * @param isDetailed flag to print the sketch detail
+   */
+  public SketchToString(final String isDetailed) {
+    this(Boolean.parseBoolean(isDetailed), DEFAULT_UPDATE_SEED);
+  }
+
+  /**
+   * Prints the summary and details using a given seed.
+   *
+   * @param isDetailed flag to print the sketch detail
+   * @param seed parameter for the hash function
+   */
+  public SketchToString(final String isDetailed, final String seed) {
+    this(Boolean.parseBoolean(isDetailed), Long.parseLong(seed));
+  }
+
+  /**
+   * Internal constructor with primitive parameters.
+   *
+   * @param isDetailed flag to print the sketch detail
+   * @param seed parameter for the hash function
+   */
+  private SketchToString(final boolean isDetailed, final long seed) {
+    isDetailed_ = isDetailed;
+    seed_ = seed;
+  }
+
+  @Override
+  public String exec(final Tuple sketchTuple) throws IOException {
+    if ((sketchTuple == null) || (sketchTuple.size() == 0)) {
+      return null;
+    }
+    final DataByteArray dba = (DataByteArray) sketchTuple.get(0);
+    final CpcSketch sketch = CpcSketch.heapify(dba.get(), seed_);
+    return sketch.toString(isDetailed_);
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/UnionSketch.java b/src/main/java/com/yahoo/sketches/pig/cpc/UnionSketch.java
new file mode 100644
index 0000000..c4a9c9d
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/UnionSketch.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+import com.yahoo.sketches.cpc.CpcUnion;
+
+/**
+ * This is a Pig UDF that performs the Union operation on CpcSketches.
+ * This class implements both the <i>Accumulator</i> and <i>Algebraic</i> interfaces.
+ *
+ * @author Alexander Saydakov
+ */
+public class UnionSketch extends EvalFunc<DataByteArray> implements Accumulator<DataByteArray>, Algebraic {
+
+  private DataByteArray emptySketch_; // this is to cash an empty sketch
+
+  private final int lgK_;
+  private final long seed_;
+  private CpcUnion accumUnion_;
+  private boolean isFirstCall_;
+
+  /**
+   * Constructor with default lgK and target HLL type
+   */
+  public UnionSketch() {
+    this(CpcSketch.DEFAULT_LG_K, DEFAULT_UPDATE_SEED);
+  }
+
+  /**
+   * Constructor with given lgK as string and default seed.
+   *
+   * @param lgK in a form of a String
+   */
+  public UnionSketch(final String lgK) {
+    this(Integer.parseInt(lgK), DEFAULT_UPDATE_SEED);
+  }
+
+  /**
+   * Constructor with given lgK and seed as strings
+   *
+   * @param lgK in a form of a String
+   * @param seed in a form of a String
+   */
+  public UnionSketch(final String lgK, final String seed) {
+    this(Integer.parseInt(lgK), Long.parseLong(seed));
+  }
+
+  /**
+   * Base constructor.
+   *
+   * @param lgK parameter controlling the sketch size and accuracy
+   * @param seed for the hash function
+   */
+  public UnionSketch(final int lgK, final long seed) {
+    super();
+    lgK_ = lgK;
+    seed_ = seed;
+  }
+
+  /**
+   * Top-level exec function.
+   * This method accepts an input Tuple containing a Bag of one or more inner <b>Sketch Tuples</b>
+   * and returns a single serialized CpcSketch as a DataByteArray.
+   *
+   * <b>Sketch Tuple</b> is a Tuple containing a single DataByteArray (BYTEARRAY in Pig), which
+   * is a serialized HllSketch.
+   *
+   * @param inputTuple A tuple containing a single bag, containing Sketch Tuples.
+   * @return serialized CpcSketch
+   * @see "org.apache.pig.EvalFunc.exec(org.apache.pig.data.Tuple)"
+   * @throws IOException from Pig.
+   */
+  @Override
+  public DataByteArray exec(final Tuple inputTuple) throws IOException {
+    if (isFirstCall_) {
+      Logger.getLogger(getClass()).info("Exec was used");
+      isFirstCall_ = false;
+    }
+    if (inputTuple == null || inputTuple.size() == 0) {
+      if (emptySketch_ == null) {
+        emptySketch_ = new DataByteArray(new CpcSketch(lgK_, seed_).toByteArray());
+      }
+      return emptySketch_;
+    }
+    final CpcUnion union = new CpcUnion(lgK_, seed_);
+    final DataBag bag = (DataBag) inputTuple.get(0);
+    updateUnion(bag, union, seed_);
+    return new DataByteArray(union.getResult().toByteArray());
+  }
+
+  /**
+   * An <i>Accumulator</i> version of the standard <i>exec()</i> method. Like <i>exec()</i>,
+   * accumulator is called with a bag of Sketch Tuples. Unlike <i>exec()</i>, it doesn't serialize the
+   * result at the end. Instead, it can be called multiple times, each time with another bag of
+   * Sketch Tuples to be input to the union.
+   *
+   * @param inputTuple A tuple containing a single bag, containing Sketch Tuples.
+   * @see #exec
+   * @see "org.apache.pig.Accumulator.accumulate(org.apache.pig.data.Tuple)"
+   * @throws IOException by Pig
+   */
+  @Override
+  public void accumulate(final Tuple inputTuple) throws IOException {
+    if (isFirstCall_) {
+      Logger.getLogger(getClass()).info("Accumulator was used");
+      isFirstCall_ = false;
+    }
+    if (inputTuple == null || inputTuple.size() == 0) { return; }
+    final DataBag bag = (DataBag) inputTuple.get(0);
+    if (bag == null) { return; }
+    if (accumUnion_ == null) {
+      accumUnion_ = new CpcUnion(lgK_, seed_);
+    }
+    updateUnion(bag, accumUnion_, seed_);
+  }
+
+  /**
+   * Returns the sketch that has been built up by multiple calls to {@link #accumulate}.
+   *
+   * @return Sketch Tuple. (see {@link #exec} for return tuple format)
+   * @see "org.apache.pig.Accumulator.getValue()"
+   */
+  @Override
+  public DataByteArray getValue() {
+    if (accumUnion_ == null) {
+      if (emptySketch_ == null) {
+        emptySketch_ = new DataByteArray(
+            new CpcSketch(lgK_, seed_).toByteArray());
+      }
+      return emptySketch_;
+    }
+    return new DataByteArray(accumUnion_.getResult().toByteArray());
+  }
+
+  /**
+   * Cleans up the UDF state after being called using the {@link Accumulator} interface.
+   *
+   * @see "org.apache.pig.Accumulator.cleanup()"
+   */
+  @Override
+  public void cleanup() {
+    accumUnion_ = null;
+  }
+
+  @Override
+  public String getInitial() {
+    return AlgebraicInitial.class.getName();
+  }
+
+  @Override
+  public String getIntermed() {
+    return UnionSketchAlgebraicIntermediate.class.getName();
+  }
+
+  @Override
+  public String getFinal() {
+    return UnionSketchAlgebraicFinal.class.getName();
+  }
+
+  static void updateUnion(final DataBag bag, final CpcUnion union, final long seed) throws ExecException {
+    // Bag is not empty. process each innerTuple in the bag
+    for (final Tuple innerTuple : bag) {
+      final Object f0 = innerTuple.get(0); // consider only field 0
+      if (f0 == null) {
+        continue;
+      }
+      final byte type = innerTuple.getType(0);
+      if (type == DataType.BYTEARRAY) {
+        final DataByteArray dba = (DataByteArray) f0;
+        union.update(CpcSketch.heapify(dba.get(), seed));
+      } else {
+        throw new IllegalArgumentException("Field type was not DataType.BYTEARRAY: " + type);
+      }
+    }
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/UnionSketchAlgebraicFinal.java b/src/main/java/com/yahoo/sketches/pig/cpc/UnionSketchAlgebraicFinal.java
new file mode 100644
index 0000000..9cbd22d
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/UnionSketchAlgebraicFinal.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+public class UnionSketchAlgebraicFinal extends AlgebraicFinal {
+
+  /**
+   * Default constructor for the final pass of an Algebraic function.
+   * Assumes default lgK and seed.
+   */
+  public UnionSketchAlgebraicFinal() {
+    super(CpcSketch.DEFAULT_LG_K, DEFAULT_UPDATE_SEED);
+  }
+
+  /**
+   * Constructor for the final pass of an Algebraic function. Pig will call
+   * this and pass the same constructor arguments as the base UDF.
+   * Assumes default seed.
+   *
+   * @param lgK in a form of a String
+   */
+  public UnionSketchAlgebraicFinal(final String lgK) {
+    super(Integer.parseInt(lgK), DEFAULT_UPDATE_SEED);
+  }
+
+  /**
+   * Constructor for the final pass of an Algebraic function. Pig will call
+   * this and pass the same constructor arguments as the base UDF.
+   * 
+   * @param lgK parameter controlling the sketch size and accuracy
+   * @param seed for the hash function
+   */
+  public UnionSketchAlgebraicFinal(final String lgK, final String seed) {
+    super(Integer.parseInt(lgK), Long.parseLong(seed));
+  }
+
+  @Override
+  boolean isInputRaw() {
+    return false;
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/UnionSketchAlgebraicIntermediate.java b/src/main/java/com/yahoo/sketches/pig/cpc/UnionSketchAlgebraicIntermediate.java
new file mode 100644
index 0000000..f5ab147
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/UnionSketchAlgebraicIntermediate.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+public class UnionSketchAlgebraicIntermediate extends AlgebraicIntermediate {
+
+  /**
+   * Default constructor of the intermediate pass of an Algebraic function.
+   * Assumes default lgK and seed.
+   */
+  public UnionSketchAlgebraicIntermediate() {
+    super(CpcSketch.DEFAULT_LG_K, DEFAULT_UPDATE_SEED);
+  }
+
+  /**
+   * Constructor for the intermediate pass of an Algebraic function. Pig will call
+   * this and pass the same constructor arguments as the base UDF.
+   * Assumes default seed.
+   *
+   * @param lgK in a form of a String
+   */
+  public UnionSketchAlgebraicIntermediate(final String lgK) {
+    super(Integer.parseInt(lgK), DEFAULT_UPDATE_SEED);
+  }
+
+  /**
+   * Constructor for the intermediate pass of an Algebraic function. Pig will call
+   * this and pass the same constructor arguments as the base UDF.
+   * 
+   * @param lgK parameter controlling the sketch size and accuracy
+   * @param seed for the hash function
+   */
+  public UnionSketchAlgebraicIntermediate(final String lgK, final String seed) {
+    super(Integer.parseInt(lgK), Long.parseLong(seed));
+  }
+
+  @Override
+  boolean isInputRaw() {
+    return false;
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/package-info.java b/src/main/java/com/yahoo/sketches/pig/cpc/package-info.java
new file mode 100644
index 0000000..c50fe23
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/package-info.java
@@ -0,0 +1,16 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+/**
+ * Pig UDFs for CPC sketches.
+ * This is a distinct-counting sketch that implements the
+ * <i>Compressed Probabilistic Counting (CPC, a.k.a FM85)</i> algorithms developed by Kevin Lang in
+ * his paper
+ * <a href="https://arxiv.org/abs/1708.06839">Back to the Future: an Even More Nearly
+ * Optimal Cardinality Estimation Algorithm</a>.
+ *
+ * @author Alexander Saydakov
+ */
+package com.yahoo.sketches.pig.cpc;
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/DataToSketch.java b/src/main/java/com/yahoo/sketches/pig/kll/DataToSketch.java
new file mode 100644
index 0000000..dedf4ec
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/DataToSketch.java
@@ -0,0 +1,345 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.io.IOException;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+/**
+ * This UDF is to build sketches from data.
+ * This class implements both the <i>Accumulator</i> and <i>Algebraic</i> interfaces for performance optimization.
+ */
+public class DataToSketch extends EvalFunc<DataByteArray> implements Accumulator<DataByteArray>, Algebraic {
+
+  private static final TupleFactory TUPLE_FACTORY_ = TupleFactory.getInstance();
+
+  // With the single exception of the Accumulator interface, UDFs are stateless.
+  // All parameters kept at the class level must be final, except for the accumSketch.
+  private final int k_;
+  private KllFloatsSketch accumSketch_;
+
+  // TOP LEVEL API
+
+  /**
+   * Default constructor. Assumes default k.
+   */
+  public DataToSketch() {
+    this(KllFloatsSketch.DEFAULT_K);
+  }
+
+  /**
+   * String constructor.
+   *
+   * @param kStr string representation of k
+   */
+  public DataToSketch(final String kStr) {
+    this(Integer.parseInt(kStr));
+  }
+
+  /**
+   * Base constructor.
+   *
+   * @param k parameter that determines the accuracy and size of the sketch.
+   */
+  private DataToSketch(final int k) {
+    super();
+    k_ = k;
+  }
+
+  //@formatter:off
+  /**
+   * Top-level exec function.
+   * This method accepts an input Tuple containing a Bag of one or more inner <b>Datum Tuples</b>
+   * and returns a single updated <b>Sketch</b> as a DataByteArray.
+   *
+   * <p>Types are in the form: Java data type: Pig DataType
+   *
+   * <p><b>Input Tuple</b>
+   * <ul>
+   *   <li>Tuple: TUPLE (Must contain only one field)
+   *     <ul>
+   *       <li>index 0: DataBag: BAG (May contain 0 or more Inner Tuples)
+   *         <ul>
+   *           <li>index 0: Tuple: TUPLE <b>Datum Tuple</b></li>
+   *           <li>...</li>
+   *           <li>index n-1: Tuple: TUPLE <b>Datum Tuple</b></li>
+   *         </ul>
+   *       </li>
+   *     </ul>
+   *   </li>
+   * </ul>
+   *
+   * <b>Datum Tuple</b>
+   * <ul>
+   *   <li>Tuple: TUPLE (Must contain only one field)
+   *     <ul>
+   *       <li>index 0: Float: FLOAT</li>
+   *     </ul>
+   *   </li>
+   * </ul>
+   *
+   * <b>Sketch Tuple</b>
+   * <ul>
+   *   <li>Tuple: TUPLE (Contains exactly 1 field)
+   *     <ul>
+   *       <li>index 0: DataByteArray: BYTEARRAY = a serialized KllFloatsSketch object.</li>
+   *     </ul>
+   *   </li>
+   * </ul>
+   *
+   * @param inputTuple A tuple containing a single bag, containing Datum Tuples
+   * @return serialized sketch
+   * @see "org.apache.pig.EvalFunc.exec(org.apache.pig.data.Tuple)"
+   * @throws IOException from Pig
+   */
+  // @formatter:on
+
+  @Override // TOP LEVEL EXEC
+  public DataByteArray exec(final Tuple inputTuple) throws IOException {
+    //The exec is a stateless function. It operates on the input and returns a result.
+    final KllFloatsSketch sketch = new KllFloatsSketch(k_);
+    if (inputTuple != null && inputTuple.size() > 0) {
+      final DataBag bag = (DataBag) inputTuple.get(0);
+      for (final Tuple innerTuple: bag) {
+        sketch.update((Float) innerTuple.get(0));
+      }
+    }
+    return new DataByteArray(sketch.toByteArray());
+  }
+
+  // ACCUMULATOR INTERFACE
+
+  /**
+   * An <i>Accumulator</i> version of the standard <i>exec()</i> method. Like <i>exec()</i>,
+   * accumulator is called with a bag of Datum Tuples. Unlike <i>exec()</i>, it doesn't serialize the
+   * sketch at the end. Instead, it can be called multiple times, each time with another bag of Datum Tuples.
+   *
+   * @param inputTuple A tuple containing a single bag, containing Datum Tuples.
+   * @see #exec
+   * @see "org.apache.pig.Accumulator.accumulate(org.apache.pig.data.Tuple)"
+   * @throws IOException by Pig
+   */
+  @Override
+  public void accumulate(final Tuple inputTuple) throws IOException {
+    if (inputTuple == null || inputTuple.size() == 0) { return; }
+    final DataBag bag = (DataBag) inputTuple.get(0);
+    if (bag == null) { return; }
+    if (accumSketch_ == null) {
+      accumSketch_ = new KllFloatsSketch(k_);
+    }
+    for (final Tuple innerTuple: bag) {
+      accumSketch_.update((Float) innerTuple.get(0));
+    }
+  }
+
+  /**
+   * Returns the result that has been built up by multiple calls to {@link #accumulate}.
+   *
+   * @return serialized sketch
+   * @see "org.apache.pig.Accumulator.getValue()"
+   */
+  @Override
+  public DataByteArray getValue() {
+    if (accumSketch_ != null) {
+      return new DataByteArray(accumSketch_.toByteArray());
+    }
+    // return empty sketch
+    return new DataByteArray(new KllFloatsSketch(k_).toByteArray());
+  }
+
+  /**
+   * Cleans up the UDF state after being called using the {@link Accumulator} interface.
+   *
+   * @see "org.apache.pig.Accumulator.cleanup()"
+   */
+  @Override
+  public void cleanup() {
+    accumSketch_ = null;
+  }
+
+  // ALGEBRAIC INTERFACE
+
+  @Override
+  public String getInitial() {
+    return Initial.class.getName();
+  }
+
+  @Override
+  public String getIntermed() {
+    return Intermediate.class.getName();
+  }
+
+  @Override
+  public String getFinal() {
+    return Final.class.getName();
+  }
+
+  // STATIC Initial Class only called by Pig
+
+  /**
+   * Class used to calculate the initial pass of an Algebraic sketch operation.
+   *
+   * <p>
+   * The Initial class simply passes through all records unchanged so that they can be
+   * processed by the intermediate processor instead.</p>
+   */
+  public static class Initial extends EvalFunc<Tuple> {
+    // The Algebraic worker classes (Initial, Intermediate and Final) are static and stateless.
+    // The constructors and parameters must mirror the parent class as there is no linkage
+    // between them.
+    /**
+     * Default constructor.
+     */
+    public Initial() {}
+
+    /**
+     * Constructor with explicit k as string.
+     *
+     * @param kStr string representation of k
+     */
+    public Initial(final String kStr) {}
+
+    @Override
+    public Tuple exec(final Tuple inputTuple) throws IOException {
+      return inputTuple;
+    }
+  }
+
+  // STATIC Intermediate Class only called by Pig
+
+  /**
+   * Class used to calculate the intermediate pass of an <i>Algebraic</i> sketch operation.
+   * It will receive a bag of values returned by either the <i>Intermediate</i>
+   * stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
+   * interpret both types.
+   */
+  public static class Intermediate extends EvalFunc<Tuple> {
+    // The Algebraic worker classes (Initial, Intermediate and Final) are static and stateless.
+    // The constructors and parameters must mirror the parent class as there is no linkage
+    // between them.
+    private final int k_;
+
+    /**
+     * Default constructor. Assumes default k.
+     */
+    public Intermediate() {
+      this(KllFloatsSketch.DEFAULT_K);
+    }
+
+    /**
+     * Constructor with explicit k as string. Pig will call.
+     * this and pass the same constructor arguments as the base UDF.
+     *
+     * @param kStr string representation of k
+     */
+    public Intermediate(final String kStr) {
+      this(Integer.parseInt(kStr));
+    }
+
+    /**
+     * Constructor with primitive k.
+     *
+     * @param k parameter that determines the accuracy and size of the sketch.
+     */
+    private Intermediate(final int k) {
+      k_ = k;
+    }
+
+    @Override
+    public Tuple exec(final Tuple inputTuple) throws IOException { //throws is in API
+      return TUPLE_FACTORY_.newTuple(process(inputTuple, k_));
+    }
+  }
+
+  // STATIC Final Class only called by Pig
+
+  /**
+   * Class used to calculate the final pass of an <i>Algebraic</i> sketch operation.
+   * It will receive a bag of values returned by either the <i>Intermediate</i>
+   * stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
+   * interpret both types.
+   */
+  public static class Final extends EvalFunc<DataByteArray> {
+    // The Algebraic worker classes (Initial, Intermediate and Final) are static and stateless.
+    // The constructors and parameters must mirror the parent class as there is no linkage
+    // between them.
+    private final int k_;
+
+    /**
+     * Default constructor. Assumes default k.
+     */
+    public Final() {
+      this(KllFloatsSketch.DEFAULT_K);
+    }
+
+    /**
+     * Constructor with explicit k as string. Pig will call
+     * this and pass the same constructor arguments as the base UDF.
+     *
+     * @param kStr string representation of k
+     */
+    public Final(final String kStr) {
+      this(Integer.parseInt(kStr));
+    }
+
+    /**
+     * Constructor with primitive k.
+     *
+     * @param k parameter that determines the accuracy and size of the sketch.
+     */
+    private Final(final int k) {
+      k_ = k;
+    }
+
+    @Override
+    public DataByteArray exec(final Tuple inputTuple) throws IOException {
+      return process(inputTuple, k_);
+    }
+  }
+
+  private static DataByteArray process(final Tuple inputTuple, final int k) throws IOException {
+    final KllFloatsSketch sketch = new KllFloatsSketch(k);
+    if (inputTuple != null && inputTuple.size() > 0) {
+      final DataBag outerBag = (DataBag) inputTuple.get(0);
+      for (final Tuple dataTuple: outerBag) {
+        final Object f0 = dataTuple.get(0);
+        if (f0 == null) { continue; }
+        if (f0 instanceof DataBag) {
+          final DataBag innerBag = (DataBag) f0; // inputTuple.bag0.dataTupleN.f0:bag
+          if (innerBag.size() == 0) { continue; }
+          // If field 0 of a dataTuple is a Bag all innerTuples of this inner bag
+          // will be passed into the union.
+          // It is due to system bagged outputs from multiple mapper Initial functions.
+          // The Intermediate stage was bypassed.
+          for (final Tuple innerTuple: innerBag) {
+            sketch.update((Float) innerTuple.get(0));
+          }
+        } else if (f0 instanceof DataByteArray) { // inputTuple.bag0.dataTupleN.f0:DBA
+          // If field 0 of a dataTuple is a DataByteArray we assume it is a sketch
+          // due to system bagged outputs from multiple mapper Intermediate functions.
+          // Each dataTuple.DBA:sketch will merged into the union.
+          final DataByteArray dba = (DataByteArray) f0;
+          sketch.merge(KllFloatsSketch.heapify(Memory.wrap(dba.get())));
+        } else {
+          throw new IllegalArgumentException("dataTuple.Field0: Is not a DataByteArray: "
+              + f0.getClass().getName());
+        }
+      }
+    }
+    return new DataByteArray(sketch.toByteArray());
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/GetCdf.java b/src/main/java/com/yahoo/sketches/pig/kll/GetCdf.java
new file mode 100644
index 0000000..1cfe22a
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/GetCdf.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+/**
+ * This UDF is to get an approximation to the Cumulative Distribution Function (CDF) of the input stream
+ * given a sketch and a set of split points - an array of <i>m</i> unique, monotonically increasing
+ * float values that divide the real number line into <i>m+1</i> consecutive disjoint intervals.
+ * The function returns an array of <i>m+1</i> double values, the first <i>m</i> of which are approximations
+ * to the ranks of the corresponding split points (fraction of input stream values that are less than
+ * a split point). The last value is always 1. CDF can also be viewed as a cumulative version of PMF.
+ */
+public class GetCdf extends EvalFunc<Tuple> {
+
+  @Override
+  public Tuple exec(final Tuple input) throws IOException {
+    if (input.size() < 2) {
+      throw new IllegalArgumentException(
+          "expected two or more inputs: sketch and list of split points");
+    }
+
+    if (!(input.get(0) instanceof DataByteArray)) {
+      throw new IllegalArgumentException("expected a DataByteArray as a sketch, got "
+          + input.get(0).getClass().getSimpleName());
+    }
+    final DataByteArray dba = (DataByteArray) input.get(0);
+    final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(dba.get()));
+
+    final float[] splitPoints = new float[input.size() - 1];
+    for (int i = 1; i < input.size(); i++) {
+      if (!(input.get(i) instanceof Float)) {
+        throw new IllegalArgumentException("expected a float value as a split point, got "
+            + input.get(i).getClass().getSimpleName());
+      }
+      splitPoints[i - 1] = (float) input.get(i);
+    }
+    final double[] cdf = sketch.getCDF(splitPoints);
+    if (cdf == null) { return null; }
+    return GetPmf.doubleArrayToTuple(cdf);
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/GetK.java b/src/main/java/com/yahoo/sketches/pig/kll/GetK.java
new file mode 100644
index 0000000..2507325
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/GetK.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+/**
+ * This UDF is to get the parameter K from a given sketch.
+ * This can be useful for debugging a work flow to make sure that resulting sketches
+ * have the intended K, and, therefore, the intended accuracy
+ */
+public class GetK extends EvalFunc<Integer> {
+
+  @Override
+  public Integer exec(final Tuple input) throws IOException {
+    if (input.size() != 1) {
+      throw new IllegalArgumentException("expected one input");
+    }
+
+    if (!(input.get(0) instanceof DataByteArray)) {
+      throw new IllegalArgumentException("expected a DataByteArray as a sketch, got "
+          + input.get(0).getClass().getSimpleName());
+    }
+    final DataByteArray dba = (DataByteArray) input.get(0);
+    final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(dba.get()));
+
+    return sketch.getK();
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/GetPmf.java b/src/main/java/com/yahoo/sketches/pig/kll/GetPmf.java
new file mode 100644
index 0000000..59b9014
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/GetPmf.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+/**
+ * This UDF is to get an approximation to the Probability Mass Function (PMF) of the input stream
+ * given a sketch and a set of split points - an array of <i>m</i> unique, monotonically increasing
+ * float values that divide the real number line into <i>m+1</i> consecutive disjoint intervals.
+ * The function returns an array of m+1 doubles each of which is an approximation to the fraction
+ * of the input stream values that fell into one of those intervals. Intervals are inclusive of
+ * the left split point and exclusive of the right split point.
+ */
+public class GetPmf extends EvalFunc<Tuple> {
+
+  @Override
+  public Tuple exec(final Tuple input) throws IOException {
+    if (input.size() < 2) {
+      throw new IllegalArgumentException(
+          "expected two or more inputs: sketch and list of split points");
+    }
+
+    if (!(input.get(0) instanceof DataByteArray)) {
+      throw new IllegalArgumentException("expected a DataByteArray as a sketch, got "
+          + input.get(0).getClass().getSimpleName());
+    }
+    final DataByteArray dba = (DataByteArray) input.get(0);
+    final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(dba.get()));
+
+    final float[] splitPoints = new float[input.size() - 1];
+    for (int i = 1; i < input.size(); i++) {
+      if (!(input.get(i) instanceof Float)) {
+        throw new IllegalArgumentException("expected a float value as a split point, got "
+            + input.get(i).getClass().getSimpleName());
+      }
+      splitPoints[i - 1] = (float) input.get(i);
+    }
+    final double[] pmf = sketch.getPMF(splitPoints);
+    if (pmf == null) { return null; }
+    return doubleArrayToTuple(pmf);
+  }
+
+  static Tuple doubleArrayToTuple(final double[] array) throws ExecException {
+    final Tuple tuple = TupleFactory.getInstance().newTuple(array.length);
+    for (int i = 0; i < array.length; i++) {
+      tuple.set(i, array[i]);
+    }
+    return tuple;
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/GetQuantile.java b/src/main/java/com/yahoo/sketches/pig/kll/GetQuantile.java
new file mode 100644
index 0000000..cb58cbe
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/GetQuantile.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+/**
+ * This UDF is to get a quantile value from a given sketch. A single value for a
+ * given fraction is returned. The fraction represents a normalized rank and must be
+ * from 0 to 1 inclusive. For example, the fraction of 0.5 corresponds to 50th percentile,
+ * which is the median value of the distribution (the number separating the higher half
+ * of the probability distribution from the lower half).
+ */
+public class GetQuantile extends EvalFunc<Float> {
+
+  @Override
+  public Float exec(final Tuple input) throws IOException {
+    if (input.size() != 2) {
+      throw new IllegalArgumentException("expected two inputs: sketch and fraction");
+    }
+
+    if (!(input.get(0) instanceof DataByteArray)) {
+      throw new IllegalArgumentException("expected a DataByteArray as a sketch, got "
+          + input.get(0).getClass().getSimpleName());
+    }
+    final DataByteArray dba = (DataByteArray) input.get(0);
+    final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(dba.get()));
+
+    if (!(input.get(1) instanceof Double)) {
+      throw new IllegalArgumentException("expected a double value as a fraction, got "
+          + input.get(1).getClass().getSimpleName());
+    }
+    final double fraction = (double) input.get(1);
+    return sketch.getQuantile(fraction);
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/GetQuantiles.java b/src/main/java/com/yahoo/sketches/pig/kll/GetQuantiles.java
new file mode 100644
index 0000000..59c7d8c
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/GetQuantiles.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+/**
+ * This UDF is to get a list of quantile values from a given sketch and a list of
+ * fractions or a number of evenly spaced intervals. The fractions represent normalized ranks and
+ * must be from 0 to 1 inclusive. For example, the fraction of 0.5 corresponds to 50th percentile,
+ * which is the median value of the distribution (the number separating the higher half
+ * of the probability distribution from the lower half).
+ */
+public class GetQuantiles extends EvalFunc<Tuple> {
+
+  @Override
+  public Tuple exec(final Tuple input) throws IOException {
+    if (input.size() < 2) {
+      throw new IllegalArgumentException("expected two or more inputs: sketch and list of fractions");
+    }
+
+    if (!(input.get(0) instanceof DataByteArray)) {
+      throw new IllegalArgumentException("expected a DataByteArray as a sketch, got "
+          + input.get(0).getClass().getSimpleName());
+    }
+    final DataByteArray dba = (DataByteArray) input.get(0);
+    final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(dba.get()));
+
+    if (input.size() == 2) {
+      final Object arg = input.get(1);
+      if (arg instanceof Integer) { // number of evenly spaced intervals
+        return floatArrayToTuple(sketch.getQuantiles((int) arg));
+      } else if (arg instanceof Double) { // just one fraction
+        return TupleFactory.getInstance().newTuple(Arrays.asList(sketch.getQuantile((double) arg)));
+      } else {
+        throw new IllegalArgumentException("expected a double value as a fraction or an integer value"
+            + " as a number of evenly spaced intervals, got " + arg.getClass().getSimpleName());
+      }
+    }
+    // more than one number - must be double fractions
+    final double[] fractions = new double[input.size() - 1];
+    for (int i = 1; i < input.size(); i++) {
+      if (!(input.get(i) instanceof Double)) {
+        throw new IllegalArgumentException("expected a double value as a fraction, got "
+            + input.get(i).getClass().getSimpleName());
+      }
+      fractions[i - 1] = (double) input.get(i);
+    }
+    return floatArrayToTuple(sketch.getQuantiles(fractions));
+  }
+
+  private static Tuple floatArrayToTuple(final float[] array) throws ExecException {
+    final Tuple tuple = TupleFactory.getInstance().newTuple(array.length);
+    for (int i = 0; i < array.length; i++) {
+      tuple.set(i, array[i]);
+    }
+    return tuple;
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/GetRank.java b/src/main/java/com/yahoo/sketches/pig/kll/GetRank.java
new file mode 100644
index 0000000..4b0a50e
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/GetRank.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+/**
+ * This UDF is to get a normalized rank for a given value from a given sketch. A single
+ * rank for a given value is returned. The normalized rank is a double value
+ * from 0 to 1 inclusive. For example, the rank of 0.5 corresponds to 50th percentile,
+ * which is the median value of the distribution (the number separating the higher half
+ * of the probability distribution from the lower half).
+ */
+public class GetRank extends EvalFunc<Double> {
+
+  @Override
+  public Double exec(final Tuple input) throws IOException {
+    if (input.size() != 2) {
+      throw new IllegalArgumentException("expected two inputs: sketch and value");
+    }
+
+    if (!(input.get(0) instanceof DataByteArray)) {
+      throw new IllegalArgumentException("expected a DataByteArray as a sketch, got "
+          + input.get(0).getClass().getSimpleName());
+    }
+    final DataByteArray dba = (DataByteArray) input.get(0);
+    final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(dba.get()));
+
+    if (!(input.get(1) instanceof Float)) {
+      throw new IllegalArgumentException("expected a float value, got "
+          + input.get(1).getClass().getSimpleName());
+    }
+    final float value = (float) input.get(1);
+    return sketch.getRank(value);
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/SketchToString.java b/src/main/java/com/yahoo/sketches/pig/kll/SketchToString.java
new file mode 100644
index 0000000..6e23a5a
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/SketchToString.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+/**
+ * This UDF is to get a human-readable summary of a given sketch.
+ */
+public class SketchToString extends EvalFunc<String> {
+
+  @Override
+  public String exec(final Tuple input) throws IOException {
+    if (input.size() != 1) {
+      throw new IllegalArgumentException("expected one input");
+    }
+
+    if (!(input.get(0) instanceof DataByteArray)) {
+      throw new IllegalArgumentException("expected a DataByteArray as a sketch, got "
+          + input.get(0).getClass().getSimpleName());
+    }
+    final DataByteArray dba = (DataByteArray) input.get(0);
+    final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(dba.get()));
+
+    return sketch.toString();
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/UnionSketch.java b/src/main/java/com/yahoo/sketches/pig/kll/UnionSketch.java
new file mode 100644
index 0000000..1ce5425
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/UnionSketch.java
@@ -0,0 +1,353 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.io.IOException;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+/**
+ * This UDF is to merge sketches.
+ * This class implements both the <i>Accumulator</i> and <i>Algebraic</i> interfaces for performance optimization.
+ */
+public class UnionSketch extends EvalFunc<DataByteArray> implements Accumulator<DataByteArray>, Algebraic {
+
+  private static final TupleFactory TUPLE_FACTORY_ = TupleFactory.getInstance();
+
+  // With the single exception of the Accumulator interface, UDFs are stateless.
+  // All parameters kept at the class level must be final, except for the accumSketch.
+  private final int k_;
+  private KllFloatsSketch accumSketch_;
+
+  //TOP LEVEL API
+
+  /**
+   * Default constructor. Assumes default k.
+   */
+  public UnionSketch() {
+    this(KllFloatsSketch.DEFAULT_K);
+  }
+
+  /**
+   * Constructor with explicit k as string.
+   *
+   * @param kStr string representation of k
+   */
+  public UnionSketch(final String kStr) {
+    this(Integer.parseInt(kStr));
+  }
+
+  /**
+   * Base constructor.
+   *
+   * @param k parameter that determines the accuracy and size of the sketch.
+   */
+  private UnionSketch(final int k) {
+    super();
+    k_ = k;
+  }
+
+  //@formatter:off
+  /**
+   * Top-level exec function.
+   * This method accepts an input Tuple containing a Bag of one or more inner <b>Sketch Tuples</b>
+   * and returns a single updated <b>Sketch</b> as a DataByteArray.
+   *
+   * <p>Types are in the form: Java data type: Pig DataType
+   *
+   * <p><b>Input Tuple</b>
+   * <ul>
+   *   <li>Tuple: TUPLE (Must contain only one field)
+   *     <ul>
+   *       <li>index 0: DataBag: BAG (May contain 0 or more Inner Tuples)
+   *         <ul>
+   *           <li>index 0: Tuple: TUPLE <b>Sketch Tuple</b></li>
+   *           <li>...</li>
+   *           <li>index n-1: Tuple: TUPLE <b>Sketch Tuple</b></li>
+   *         </ul>
+   *       </li>
+   *     </ul>
+   *   </li>
+   * </ul>
+   *
+   * <b>Sketch Tuple</b>
+   * <ul>
+   *   <li>Tuple: TUPLE (Contains exactly 1 field)
+   *     <ul>
+   *       <li>index 0: DataByteArray: BYTEARRAY = The serialization of a Sketch object.</li>
+   *     </ul>
+   *   </li>
+   * </ul>
+   *
+   * @param inputTuple A tuple containing a single bag, containing Sketch Tuples
+   * @return serialized sketch
+   * @see "org.apache.pig.EvalFunc.exec(org.apache.pig.data.Tuple)"
+   */
+  //@formatter:on
+
+  @Override // TOP LEVEL EXEC
+  public DataByteArray exec(final Tuple inputTuple) throws IOException {
+    //The exec is a stateless function.  It operates on the input and returns a result.
+    final KllFloatsSketch sketch = new KllFloatsSketch(k_);
+    if (inputTuple != null && inputTuple.size() > 0) {
+      final DataBag bag = (DataBag) inputTuple.get(0);
+      updateUnion(bag, sketch);
+    }
+    return new DataByteArray(sketch.toByteArray());
+  }
+
+  //ACCUMULATOR INTERFACE
+
+  /**
+   * An <i>Accumulator</i> version of the standard <i>exec()</i> method. Like <i>exec()</i>,
+   * accumulator is called with a bag of Sketch Tuples. Unlike <i>exec()</i>, it doesn't serialize the
+   * sketch at the end. Instead, it can be called multiple times, each time with another bag of
+   * Sketch Tuples to be input to the Union.
+   *
+   * @param inputTuple A tuple containing a single bag, containing Sketch Tuples.
+   * @see #exec
+   * @see "org.apache.pig.Accumulator.accumulate(org.apache.pig.data.Tuple)"
+   * @throws IOException by Pig
+   */
+  @Override
+  public void accumulate(final Tuple inputTuple) throws IOException {
+    if (inputTuple == null || inputTuple.size() == 0) { return; }
+    final DataBag bag = (DataBag) inputTuple.get(0);
+    if (bag == null) { return; }
+    if (accumSketch_ == null) {
+      accumSketch_ = new KllFloatsSketch(k_);
+    }
+    updateUnion(bag, accumSketch_);
+  }
+
+  /**
+   * Returns the result of the Union that has been built up by multiple calls to {@link #accumulate}.
+   *
+   * @return serialized sketch
+   * @see "org.apache.pig.Accumulator.getValue()"
+   */
+  @Override
+  public DataByteArray getValue() {
+    if (accumSketch_ != null) {
+      return new DataByteArray(accumSketch_.toByteArray());
+    }
+    // return empty sketch
+    return new DataByteArray(new KllFloatsSketch(k_).toByteArray());
+  }
+
+  /**
+   * Cleans up the UDF state after being called using the {@link Accumulator} interface.
+   *
+   * @see "org.apache.pig.Accumulator.cleanup()"
+   */
+  @Override
+  public void cleanup() {
+    accumSketch_ = null;
+  }
+
+  //ALGEBRAIC INTERFACE
+
+  @Override
+  public String getInitial() {
+    return Initial.class.getName();
+  }
+
+  @Override
+  public String getIntermed() {
+    return Intermediate.class.getName();
+  }
+
+  @Override
+  public String getFinal() {
+    return Final.class.getName();
+  }
+
+  //TOP LEVEL PRIVATE STATIC METHODS
+
+  /**
+   * Updates a union from a bag of sketches
+   *
+   * @param bag A bag of sketchTuples.
+   * @param union The union to update
+   */
+  private static void updateUnion(final DataBag bag, final KllFloatsSketch union) throws ExecException {
+    for (Tuple innerTuple: bag) {
+      final Object f0 = innerTuple.get(0);
+      if (f0 == null) { continue; }
+      if (f0 instanceof DataByteArray) {
+        final DataByteArray dba = (DataByteArray) f0;
+        if (dba.size() > 0) {
+          union.merge(KllFloatsSketch.heapify(Memory.wrap(dba.get())));
+        }
+      } else {
+        throw new IllegalArgumentException("Field type was not DataType.BYTEARRAY: " + innerTuple.getType(0));
+      }
+    }
+  }
+
+  //STATIC Initial Class only called by Pig
+
+  /**
+   * Class used to calculate the initial pass of an Algebraic sketch operation.
+   *
+   * <p>
+   * The Initial class simply passes through all records unchanged so that they can be
+   * processed by the intermediate processor instead.</p>
+   */
+  public static class Initial extends EvalFunc<Tuple> {
+    // The Algebraic worker classes (Initial, Intermediate and Final) are static and stateless.
+    // The constructors and parameters must mirror the parent class as there is no linkage
+    // between them.
+    /**
+     * Default constructor.
+     */
+    public Initial() {}
+
+    /**
+     * Constructor for the initial pass of an Algebraic function. Pig will call this and pass the
+     * same constructor arguments as the base UDF. In this case the arguments are ignored.
+     *
+     * @param kStr string representation of k
+     */
+    public Initial(final String kStr) {}
+
+    @Override
+    public Tuple exec(final Tuple inputTuple) throws IOException {
+      return inputTuple;
+    }
+  }
+
+  // STATIC Intermediate Class only called by Pig
+
+  /**
+   * Class used to calculate the intermediate pass of an <i>Algebraic</i> union operation.
+   * It will receive a bag of values returned by either the <i>Intermediate</i>
+   * stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
+   * interpret both types.
+   */
+  public static class Intermediate extends EvalFunc<Tuple> {
+    // The Algebraic worker classes (Initial, Intermediate and Final) are static and stateless.
+    // The constructors and final parameters must mirror the parent class as there is no linkage
+    // between them.
+    private final int k_;
+
+    /**
+     * Default constructor. Assumes default k.
+     */
+    public Intermediate() {
+      this(KllFloatsSketch.DEFAULT_K);
+    }
+
+    /**
+     * Constructor with explicit k as string.
+     *
+     * @param kStr string representation of k
+     */
+    public Intermediate(final String kStr) {
+      this(Integer.parseInt(kStr));
+    }
+
+    /**
+     * Constructor with primitive k.
+     *
+     * @param k parameter that determines the accuracy and size of the sketch.
+     */
+    private Intermediate(final int k) {
+      k_ = k;
+    }
+
+    @Override
+    public Tuple exec(final Tuple inputTuple) throws IOException {
+      return TUPLE_FACTORY_.newTuple(process(inputTuple, k_));
+    }
+  }
+
+  // STATIC Final Class only called by Pig
+
+  /**
+   * Class used to calculate the final pass of an <i>Algebraic</i> union operation.
+   * It will receive a bag of values returned by either the <i>Intermediate</i>
+   * stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
+   * interpret both types.
+   */
+  public static class Final extends EvalFunc<DataByteArray> {
+    // The Algebraic worker classes (Initial, Intermediate and Final) are static and stateless.
+    // The constructors and final parameters must mirror the parent class as there is no linkage
+    // between them.
+    private final int k_;
+
+    /**
+     * Default constructor. Assumes default k.
+     */
+    public Final() {
+      this(KllFloatsSketch.DEFAULT_K);
+    }
+
+    /**
+     * Constructor with explicit k as string.
+     *
+     * @param kStr string representation of k
+     */
+    public Final(final String kStr) {
+      this(Integer.parseInt(kStr));
+    }
+
+    /**
+     * Constructor with primitive k.
+     *
+     * @param k parameter that determines the accuracy and size of the sketch.
+     */
+    private Final(final int k) {
+      k_ = k;
+    }
+
+    @Override
+    public DataByteArray exec(final Tuple inputTuple) throws IOException {
+      return process(inputTuple, k_);
+    }
+  }
+
+  private static DataByteArray process(final Tuple inputTuple, final int k) throws IOException {
+    final KllFloatsSketch union = new KllFloatsSketch(k);
+    if (inputTuple != null && inputTuple.size() > 0) {
+      final DataBag outerBag = (DataBag) inputTuple.get(0);
+      for (final Tuple dataTuple: outerBag) {
+        final Object f0 = dataTuple.get(0);
+        if (f0 == null) { continue; }
+        if (f0 instanceof DataBag) {
+          final DataBag innerBag = (DataBag) f0; //inputTuple.bag0.dataTupleN.f0:bag
+          if (innerBag.size() == 0) { continue; }
+          // If field 0 of a dataTuple is again a Bag all tuples of this inner bag
+          // will be passed into the union.
+          // It is due to system bagged outputs from multiple mapper Initial functions.
+          // The Intermediate stage was bypassed.
+          updateUnion(innerBag, union);
+        } else if (f0 instanceof DataByteArray) { //inputTuple.bag0.dataTupleN.f0:DBA
+          // If field 0 of a dataTuple is a DataByteArray we assume it is a sketch from a prior call
+          // It is due to system bagged outputs from multiple mapper Intermediate functions.
+          // Each dataTuple.DBA:sketch will merged into the union.
+          final DataByteArray dba = (DataByteArray) f0;
+          union.merge(KllFloatsSketch.heapify(Memory.wrap(dba.get())));
+        } else {
+          throw new IllegalArgumentException("dataTuple.Field0: Is not a DataByteArray: "
+            + f0.getClass().getName());
+        }
+      }
+    }
+    return new DataByteArray(union.toByteArray());
+  }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/package-info.java b/src/main/java/com/yahoo/sketches/pig/kll/package-info.java
new file mode 100644
index 0000000..9cae236
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/package-info.java
@@ -0,0 +1,12 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+/**
+ * Pig UDFs for KLL quantiles sketches.
+ * See https://datasketches.github.io/docs/Quantiles/KLLSketch.html
+ * 
+ * @author Alexander Saydakov
+ */
+package com.yahoo.sketches.pig.kll;
diff --git a/src/test/java/com/yahoo/sketches/pig/cpc/DataToSketchTest.java b/src/test/java/com/yahoo/sketches/pig/cpc/DataToSketchTest.java
new file mode 100644
index 0000000..4ee2ac0
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/cpc/DataToSketchTest.java
@@ -0,0 +1,326 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+public class DataToSketchTest {
+
+  private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+  private static final BagFactory BAG_FACTORY = BagFactory.getInstance();
+
+  @Test
+  public void execNullInputTuple() throws Exception {
+    final EvalFunc<DataByteArray> func = new DataToSketch();
+    final DataByteArray result = func.exec(null);
+    final CpcSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void execEmptyInputTupleCustomLgK() throws Exception {
+    final EvalFunc<DataByteArray> func = new DataToSketch("10");
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple());
+    final CpcSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+    Assert.assertEquals(sketch.getLgK(), 10);
+  }
+
+  @Test
+  public void execEmptyBag() throws Exception {
+    final EvalFunc<DataByteArray> func = new DataToSketch();
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+    final CpcSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void execUnsupportedType() throws Exception {
+    final EvalFunc<DataByteArray> func = new DataToSketch();
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple(new Object()));
+    func.exec(TUPLE_FACTORY.newTuple(bag));
+  }
+
+  @Test
+  public void execVariousTypesOfInput() throws Exception {
+    final EvalFunc<DataByteArray> func = new DataToSketch();
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+    final Tuple tupleWithNull = TUPLE_FACTORY.newTuple(1);
+    tupleWithNull.set(0, null);
+    bag.add(tupleWithNull);
+    bag.add(TUPLE_FACTORY.newTuple(new Byte((byte) 1)));
+    bag.add(TUPLE_FACTORY.newTuple(new Integer(2)));
+    bag.add(TUPLE_FACTORY.newTuple(new Long(3)));
+    bag.add(TUPLE_FACTORY.newTuple(new Float(1)));
+    bag.add(TUPLE_FACTORY.newTuple(new Double(2)));
+    bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(new byte[] {(byte) 1})));
+    bag.add(TUPLE_FACTORY.newTuple("a"));
+    final CpcSketch sketch = getSketch(func.exec(TUPLE_FACTORY.newTuple(bag)));
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 7.0, 0.01);
+  }
+
+  @Test
+  public void accumulator() throws Exception {
+    final Accumulator<DataByteArray> func = new DataToSketch();
+
+    // no input yet
+    DataByteArray result = func.getValue();
+    CpcSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // null input tuple
+    func.accumulate(null);
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // empty input tuple
+    func.accumulate(TUPLE_FACTORY.newTuple());
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // empty bag
+    func.accumulate(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // normal case
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple("a"));
+    bag.add(TUPLE_FACTORY.newTuple("b"));
+    func.accumulate(TUPLE_FACTORY.newTuple(bag));
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+
+    // cleanup
+    func.cleanup();
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicInitial() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new DataToSketch().getInitial()).newInstance();
+    final Tuple input = TUPLE_FACTORY.newTuple();
+    final Tuple result = func.exec(input);
+    Assert.assertEquals(result, input);
+  }
+
+  @Test
+  public void algebraicInitialWithLgK() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new DataToSketch().getInitial())
+        .getConstructor(String.class).newInstance("10");
+    final Tuple input = TUPLE_FACTORY.newTuple();
+    final Tuple result = func.exec(input);
+    Assert.assertEquals(result, input);
+  }
+
+  @Test
+  public void algebraicInitialWithLgKAndSeed() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new DataToSketch().getInitial())
+        .getConstructor(String.class, String.class).newInstance("10", "123");
+    final Tuple input = TUPLE_FACTORY.newTuple();
+    final Tuple result = func.exec(input);
+    Assert.assertEquals(result, input);
+  }
+
+  @Test
+  public void algebraicIntermediateNullInputTuple() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+    final Tuple result = func.exec(null);
+    final CpcSketch sketch = getSketch((DataByteArray) result.get(0));
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicIntermediateEmptyInputTuple() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+    final Tuple result = func.exec(TUPLE_FACTORY.newTuple());
+    final CpcSketch sketch = getSketch((DataByteArray) result.get(0));
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicIntermediateEmptyBag() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+    Tuple result = func.exec(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+    CpcSketch sketch = getSketch((DataByteArray) result.get(0));
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicIntermediateFromInitial() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+    final DataBag outerBag = BAG_FACTORY.newDefaultBag();
+    final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+    innerBag.add(TUPLE_FACTORY.newTuple("a"));
+    innerBag.add(TUPLE_FACTORY.newTuple("b"));
+    innerBag.add(TUPLE_FACTORY.newTuple("c"));
+    outerBag.add(TUPLE_FACTORY.newTuple(innerBag));
+    final Tuple result = func.exec(TUPLE_FACTORY.newTuple(outerBag));
+    final CpcSketch sketch = getSketch((DataByteArray) result.get(0));
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 3.0, 0.01);
+  }
+
+  @Test
+  public void algebraicIntermediateFromIntermediate() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+    final CpcSketch inputSketch = new CpcSketch();
+    inputSketch.update("a");
+    inputSketch.update("b");
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+    final Tuple result = func.exec(TUPLE_FACTORY.newTuple(bag));
+    final CpcSketch sketch = getSketch((DataByteArray) result.get(0));
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+  }
+
+  @Test
+  public void algebraicIntermediateFromIntermediateCustomLgKAndSeed() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed())
+        .getConstructor(String.class, String.class).newInstance("10", "123");
+    final CpcSketch inputSketch = new CpcSketch(10, 123);
+    inputSketch.update("a");
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+    final Tuple result = func.exec(TUPLE_FACTORY.newTuple(bag));
+    final CpcSketch sketch = getSketch((DataByteArray) result.get(0), 123);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 1.0, 0.01);
+    Assert.assertEquals(sketch.getLgK(), 10);
+  }
+
+  @Test
+  public void algebraicFinalNullInputTuple() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+    final DataByteArray result = func.exec(null);
+    final CpcSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicFinalEmptyInputTuple() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple());
+    final CpcSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicFinalEmptyBag() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+    final CpcSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicFinalFromInitial() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+    final DataBag outerBag = BAG_FACTORY.newDefaultBag();
+    final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+    innerBag.add(TUPLE_FACTORY.newTuple("a"));
+    innerBag.add(TUPLE_FACTORY.newTuple("b"));
+    innerBag.add(TUPLE_FACTORY.newTuple("c"));
+    outerBag.add(TUPLE_FACTORY.newTuple(innerBag));
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(outerBag));
+    final CpcSketch sketch = getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 3.0, 0.01);
+  }
+
+  @Test
+  public void algebraicFinalFromIntermediate() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+    final CpcSketch inputSketch = new CpcSketch();
+    inputSketch.update("a");
+    inputSketch.update("b");
+    DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+    DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+    final CpcSketch sketch = getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+  }
+
+  @Test
+  public void algebraicFinalFromIntermediateCustomLgKAndSeed() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal())
+        .getConstructor(String.class, String.class).newInstance("10", "123");
+    final CpcSketch inputSketch = new CpcSketch(10, 123);
+    inputSketch.update("a");
+    inputSketch.update("b");
+    DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+    DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+    final CpcSketch sketch = getSketch(result, 123);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+    Assert.assertEquals(sketch.getLgK(), 10);
+  }
+
+  static CpcSketch getSketch(final DataByteArray dba) throws Exception {
+    return getSketch(dba, DEFAULT_UPDATE_SEED);
+  }
+
+  static CpcSketch getSketch(final DataByteArray dba, final long seed) throws Exception {
+    Assert.assertNotNull(dba);
+    Assert.assertTrue(dba.size() > 0);
+    return CpcSketch.heapify(dba.get(), seed); 
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/cpc/GetEstimateAndErrorBoundsTest.java b/src/test/java/com/yahoo/sketches/pig/cpc/GetEstimateAndErrorBoundsTest.java
new file mode 100644
index 0000000..de92ba3
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/cpc/GetEstimateAndErrorBoundsTest.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+public class GetEstimateAndErrorBoundsTest {
+
+  private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+  @Test
+  public void nullInputTuple() throws Exception {
+    final EvalFunc<Tuple> func = new GetEstimateAndErrorBounds();
+    final Tuple result = func.exec(null);
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void emptyInputTuple() throws Exception {
+    final EvalFunc<Tuple> func = new GetEstimateAndErrorBounds();
+    final Tuple result = func.exec(TUPLE_FACTORY.newTuple());
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void normalCase() throws Exception {
+    final EvalFunc<Tuple> func = new GetEstimateAndErrorBounds();
+    final CpcSketch sketch = new CpcSketch();
+    sketch.update(1);
+    sketch.update(2);
+    Tuple result = func.exec(TUPLE_FACTORY.newTuple(new DataByteArray(sketch.toByteArray())));
+    Assert.assertNotNull(result);
+    Assert.assertEquals((Double) result.get(0), 2.0, 0.01);
+    Assert.assertTrue((Double) result.get(1) <= 2.0);
+    Assert.assertTrue((Double) result.get(2) >= 2.0);
+  }
+
+  @Test
+  public void normalCaseWithKappa() throws Exception {
+    final EvalFunc<Tuple> func = new GetEstimateAndErrorBounds("1");
+    final CpcSketch sketch = new CpcSketch();
+    sketch.update(1);
+    sketch.update(2);
+    Tuple result = func.exec(TUPLE_FACTORY.newTuple(new DataByteArray(sketch.toByteArray())));
+    Assert.assertNotNull(result);
+    Assert.assertEquals((Double) result.get(0), 2.0, 0.01);
+    Assert.assertTrue((Double) result.get(1) <= 2.0);
+    Assert.assertTrue((Double) result.get(2) >= 2.0);
+  }
+
+  @Test
+  public void normalCaseWithKappaAndSeed() throws Exception {
+    final EvalFunc<Tuple> func = new GetEstimateAndErrorBounds("3", "123");
+    final CpcSketch sketch = new CpcSketch(12, 123);
+    sketch.update(1);
+    sketch.update(2);
+    Tuple result = func.exec(TUPLE_FACTORY.newTuple(new DataByteArray(sketch.toByteArray())));
+    Assert.assertNotNull(result);
+    Assert.assertEquals((Double) result.get(0), 2.0, 0.01);
+    Assert.assertTrue((Double) result.get(1) <= 2.0);
+    Assert.assertTrue((Double) result.get(2) >= 2.0);
+  }
+
+  @Test
+  public void schema() throws Exception {
+    EvalFunc<Tuple> func = new GetEstimateAndErrorBounds();
+    Schema inputSchema = new Schema(new Schema.FieldSchema("Sketch", DataType.BYTEARRAY));
+    Schema outputSchema = func.outputSchema(inputSchema);
+    Assert.assertNotNull(outputSchema);
+    Assert.assertEquals(outputSchema.size(), 1);
+    Assert.assertEquals(DataType.findTypeName(outputSchema.getField(0).type), "tuple");
+    Schema innerSchema = outputSchema.getField(0).schema;
+    Assert.assertEquals(innerSchema.size(), 3);
+    Assert.assertEquals(DataType.findTypeName(innerSchema.getField(0).type), "double");
+    Assert.assertEquals(DataType.findTypeName(innerSchema.getField(1).type), "double");
+    Assert.assertEquals(DataType.findTypeName(innerSchema.getField(2).type), "double");
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/cpc/GetEstimateTest.java b/src/test/java/com/yahoo/sketches/pig/cpc/GetEstimateTest.java
new file mode 100644
index 0000000..7ed7bb1
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/cpc/GetEstimateTest.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.TupleFactory;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+import junit.framework.Assert;
+
+public class GetEstimateTest {
+
+  private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+  @Test
+  public void nullInputTuple() throws Exception {
+    final EvalFunc<Double> func = new GetEstimate();
+    final Double result = func.exec(null);
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void emptyInputTuple() throws Exception {
+    final EvalFunc<Double> func = new GetEstimate();
+    final Double result = func.exec(TUPLE_FACTORY.newTuple());
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void normalCase() throws Exception {
+    final EvalFunc<Double> func = new GetEstimate();
+    final CpcSketch sketch = new CpcSketch();
+    sketch.update(1);
+    sketch.update(2);
+    Double result = func.exec(TUPLE_FACTORY.newTuple(new DataByteArray(sketch.toByteArray())));
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result, 2.0, 0.01);
+  }
+
+  @Test
+  public void normalCaseCustomSeed() throws Exception {
+    final EvalFunc<Double> func = new GetEstimate("123");
+    final CpcSketch sketch = new CpcSketch(12, 123);
+    sketch.update(1);
+    sketch.update(2);
+    Double result = func.exec(TUPLE_FACTORY.newTuple(new DataByteArray(sketch.toByteArray())));
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result, 2.0, 0.01);
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/cpc/SketchToStringTest.java b/src/test/java/com/yahoo/sketches/pig/cpc/SketchToStringTest.java
new file mode 100644
index 0000000..1783219
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/cpc/SketchToStringTest.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.TupleFactory;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+import junit.framework.Assert;
+
+public class SketchToStringTest {
+
+  private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+  @Test
+  public void nullInputTuple() throws Exception {
+    final EvalFunc<String> func = new SketchToString();
+    final String result = func.exec(null);
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void emptyInputTuple() throws Exception {
+    final EvalFunc<String> func = new SketchToString();
+    final String result = func.exec(TUPLE_FACTORY.newTuple());
+    Assert.assertNull(result);
+  }
+
+  @Test
+  public void normalCase() throws Exception {
+    final EvalFunc<String> func = new SketchToString();
+    final CpcSketch sketch = new CpcSketch();
+    final String result = func.exec(TUPLE_FACTORY.newTuple(new DataByteArray(sketch.toByteArray())));
+    Assert.assertNotNull(result);
+    Assert.assertTrue(result.length() > 0);
+  }
+
+  @Test
+  public void normalCaseWithDetail() throws Exception {
+    final EvalFunc<String> func = new SketchToString("true");
+    final CpcSketch sketch = new CpcSketch();
+    final String result = func.exec(TUPLE_FACTORY.newTuple(new DataByteArray(sketch.toByteArray())));
+    Assert.assertNotNull(result);
+    Assert.assertTrue(result.length() > 0);
+  }
+
+  @Test
+  public void normalCaseWithDetailAndSeed() throws Exception {
+    final EvalFunc<String> func = new SketchToString("true", "123");
+    final CpcSketch sketch = new CpcSketch(12, 123);
+    final String result = func.exec(TUPLE_FACTORY.newTuple(new DataByteArray(sketch.toByteArray())));
+    Assert.assertNotNull(result);
+    Assert.assertTrue(result.length() > 0);
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/cpc/UnionSketchTest.java b/src/test/java/com/yahoo/sketches/pig/cpc/UnionSketchTest.java
new file mode 100644
index 0000000..28cb96a
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/cpc/UnionSketchTest.java
@@ -0,0 +1,283 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+public class UnionSketchTest {
+
+  private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+  private static final BagFactory BAG_FACTORY = BagFactory.getInstance();
+
+  @Test
+  public void execNullInputTuple() throws Exception {
+    final EvalFunc<DataByteArray> func = new UnionSketch();
+    final DataByteArray result = func.exec(null);
+    final CpcSketch sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void execEmptyInputTuple() throws Exception {
+    final EvalFunc<DataByteArray> func = new UnionSketch();
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple());
+    final CpcSketch sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void execEmptyBag() throws Exception {
+    final EvalFunc<DataByteArray> func = new UnionSketch();
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+    final CpcSketch sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void execNormalCase() throws Exception {
+    final EvalFunc<DataByteArray> func = new UnionSketch();
+    final CpcSketch inputSketch = new CpcSketch();
+    inputSketch.update(1);
+    inputSketch.update(2);
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+    final CpcSketch sketch = DataToSketchTest.getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+  }
+
+  @Test
+  public void execNormalCaseCustomLgKAndSeed() throws Exception {
+    final EvalFunc<DataByteArray> func = new UnionSketch("10", "123");
+    final CpcSketch inputSketch = new CpcSketch(10, 123);
+    inputSketch.update(1);
+    inputSketch.update(2);
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+    final CpcSketch sketch = DataToSketchTest.getSketch(result, 123);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+    Assert.assertEquals(sketch.getLgK(), 10);
+  }
+
+  @Test
+  public void accumulator() throws Exception {
+    final Accumulator<DataByteArray> func = new UnionSketch();
+
+    // no input yet
+    DataByteArray result = func.getValue();
+    CpcSketch sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // null input tuple
+    func.accumulate(null);
+    result = func.getValue();
+    sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // empty input tuple
+    func.accumulate(TUPLE_FACTORY.newTuple());
+    result = func.getValue();
+    sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // empty bag
+    func.accumulate(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+    result = func.getValue();
+    sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // normal case
+    CpcSketch inputSketch = new CpcSketch();
+    inputSketch.update(1);
+    inputSketch.update(2);
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+    func.accumulate(TUPLE_FACTORY.newTuple(bag));
+    result = func.getValue();
+    sketch = DataToSketchTest.getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+
+    // cleanup
+    func.cleanup();
+    result = func.getValue();
+    sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicInitial() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new UnionSketch().getInitial()).newInstance();
+    final Tuple input = TUPLE_FACTORY.newTuple();
+    final Tuple result = func.exec(input);
+    Assert.assertEquals(result, input);
+  }
+
+  @Test
+  public void algebraicIntermediateNullInputTuple() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed()).newInstance();
+    final Tuple result = func.exec(null);
+    final CpcSketch sketch = DataToSketchTest.getSketch((DataByteArray) result.get(0));
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicIntermediateEmptyInputTuple() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed()).newInstance();
+    final Tuple result = func.exec(TUPLE_FACTORY.newTuple());
+    final CpcSketch sketch = DataToSketchTest.getSketch((DataByteArray) result.get(0));
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicIntermediateFromInitial() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed()).newInstance();
+    final CpcSketch inputSketch = new CpcSketch();
+    inputSketch.update(1);
+    inputSketch.update(2);
+    inputSketch.update(3);
+    final DataBag outerBag = BAG_FACTORY.newDefaultBag();
+    final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+    innerBag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+    outerBag.add(TUPLE_FACTORY.newTuple(innerBag));
+    final Tuple result = func.exec(TUPLE_FACTORY.newTuple(outerBag));
+    final CpcSketch sketch = DataToSketchTest.getSketch((DataByteArray) result.get(0));
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 3.0, 0.01);
+  }
+
+  @Test
+  public void algebraicIntermediateFromIntermediate() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed()).newInstance();
+    final CpcSketch inputSketch = new CpcSketch();
+    inputSketch.update("a");
+    inputSketch.update("b");
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+    final Tuple result = func.exec(TUPLE_FACTORY.newTuple(bag));
+    final CpcSketch sketch = DataToSketchTest.getSketch((DataByteArray) result.get(0));
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+  }
+
+  @Test
+  public void algebraicIntermediateFromIntermediateCustomLgKAndSeed() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed())
+        .getConstructor(String.class, String.class).newInstance("10", "123");
+    final CpcSketch inputSketch = new CpcSketch(10, 123);
+    inputSketch.update("a");
+    inputSketch.update("b");
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+    final Tuple result = func.exec(TUPLE_FACTORY.newTuple(bag));
+    final CpcSketch sketch = DataToSketchTest.getSketch((DataByteArray) result.get(0), 123);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+    Assert.assertEquals(sketch.getLgK(), 10);
+  }
+
+  @Test
+  public void algebraicFinalNullInputTuple() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal()).newInstance();
+    final DataByteArray result = func.exec(null);
+    final CpcSketch sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicFinalEmptyInputTuple() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal()).newInstance();
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple());
+    final CpcSketch sketch = DataToSketchTest.getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicFinalFromInitial() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal()).newInstance();
+    final CpcSketch inputSketch = new CpcSketch();
+    inputSketch.update(1);
+    inputSketch.update(2);
+    inputSketch.update(3);
+    final DataBag outerBag = BAG_FACTORY.newDefaultBag();
+    final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+    innerBag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+    outerBag.add(TUPLE_FACTORY.newTuple(innerBag));
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(outerBag));
+    final CpcSketch sketch = DataToSketchTest.getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 3.0, 0.01);
+  }
+
+  @Test
+  public void algebraicFinalFromInitialCustomLgKAndSeed() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal())
+        .getConstructor(String.class, String.class).newInstance("10", "123");
+    final CpcSketch inputSketch = new CpcSketch(10,123);
+    inputSketch.update(1);
+    inputSketch.update(2);
+    inputSketch.update(3);
+    final DataBag outerBag = BAG_FACTORY.newDefaultBag();
+    final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+    innerBag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+    outerBag.add(TUPLE_FACTORY.newTuple(innerBag));
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(outerBag));
+    final CpcSketch sketch = DataToSketchTest.getSketch(result, 123);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 3.0, 0.01);
+    Assert.assertEquals(sketch.getLgK(), 10);
+  }
+
+  @Test
+  public void algebraicFinalFromIntermediate() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal()).newInstance();
+    final CpcSketch inputSketch = new CpcSketch();
+    inputSketch.update("a");
+    inputSketch.update("b");
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+    final CpcSketch sketch = DataToSketchTest.getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/kll/DataToSketchTest.java b/src/test/java/com/yahoo/sketches/pig/kll/DataToSketchTest.java
new file mode 100644
index 0000000..dc342d3
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/kll/DataToSketchTest.java
@@ -0,0 +1,328 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+public class DataToSketchTest {
+  private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+  private static final BagFactory BAG_FACTORY = BagFactory.getInstance();
+
+  @Test
+  public void execNullInputTuple() throws Exception {
+    final EvalFunc<DataByteArray> func = new DataToSketch();
+    final DataByteArray result = func.exec(null);
+    final KllFloatsSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void execEmptyInputTuple() throws Exception {
+    final EvalFunc<DataByteArray> func = new DataToSketch();
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple());
+    final KllFloatsSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void execEmptyBag() throws Exception {
+    final EvalFunc<DataByteArray> func = new DataToSketch();
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+    final KllFloatsSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test(expectedExceptions = ClassCastException.class)
+  public void execWrongValueType() throws Exception {
+    final EvalFunc<DataByteArray> func = new DataToSketch();
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple("a"));
+    func.exec(TUPLE_FACTORY.newTuple(bag));
+  }
+
+  @Test
+  public void execNormalCase() throws Exception {
+    final EvalFunc<DataByteArray> func = new DataToSketch();
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple(Float.valueOf(1)));
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+    final KllFloatsSketch sketch = getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 1);
+  }
+  
+  @Test
+  public void execMixedNullCase() throws Exception {
+    final EvalFunc<DataByteArray> func = new DataToSketch();
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple(Float.valueOf(1)));
+    bag.add(null);
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+    final KllFloatsSketch sketch = getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 1);
+  }
+
+  @Test
+  public void accumulator() throws Exception {
+    final Accumulator<DataByteArray> func = new DataToSketch();
+
+    // no input yet
+    DataByteArray result = func.getValue();
+    KllFloatsSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // null input tuple
+    func.accumulate(null);
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // empty input tuple
+    func.accumulate(TUPLE_FACTORY.newTuple());
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // empty bag
+    func.accumulate(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // normal case
+    DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple(Float.valueOf(1)));
+    func.accumulate(TUPLE_FACTORY.newTuple(bag));
+    func.accumulate(TUPLE_FACTORY.newTuple(bag));
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 2);
+    
+    // mixed null case
+    bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple(Float.valueOf(1)));
+    bag.add(null);
+    func.accumulate(TUPLE_FACTORY.newTuple(bag));
+    func.accumulate(TUPLE_FACTORY.newTuple(bag));
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 4);
+
+    // cleanup
+    func.cleanup();
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void accumulatorCustomK() throws Exception {
+    final Accumulator<DataByteArray> func = new DataToSketch("400");
+    final KllFloatsSketch sketch = getSketch(func.getValue());
+    Assert.assertEquals(sketch.getK(), 400);
+  }
+
+  @Test
+  public void algebraicInitial() throws Exception {
+    final EvalFunc<Tuple> func = new DataToSketch.Initial();
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple());
+    final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(bag));
+    Assert.assertNotNull(resultTuple);
+    Assert.assertEquals(resultTuple.size(), 1);
+    Assert.assertTrue(resultTuple.get(0) instanceof DataBag);
+    Assert.assertEquals(((DataBag) resultTuple.get(0)).size(), 1);
+  }
+
+  @Test
+  public void algebraicIntermediateNullInputTupleCustomK() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func = (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed())
+        .getConstructor(String.class).newInstance("400");
+    final Tuple resultTuple = func.exec(null);
+    final KllFloatsSketch sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+    Assert.assertEquals(sketch.getK(), 400);
+  }
+
+  @Test
+  public void algebraicIntermediateEmptyInputTuple() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func = (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+    final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple());
+    final KllFloatsSketch sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicIntermediateNormalCase() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func = (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+
+    { // this is to simulate an output from Initial
+      final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+      innerBag.add(TUPLE_FACTORY.newTuple(Float.valueOf(1)));
+      bag.add(TUPLE_FACTORY.newTuple(innerBag));
+    }
+
+    { // this is to simulate an output from a prior call of IntermediateFinal
+      final KllFloatsSketch qs = new KllFloatsSketch();
+      qs.update(2);
+      bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(qs.toByteArray())));
+    }
+
+    final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(bag));
+    final KllFloatsSketch sketch = getSketch(resultTuple);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 2);
+  }
+  
+  @Test
+  public void algebraicIntermediateMixedNullCase() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func = (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+
+    { // this is to simulate an output from Initial
+      final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+      innerBag.add(TUPLE_FACTORY.newTuple(Float.valueOf(1)));
+      innerBag.add(null);
+      bag.add(TUPLE_FACTORY.newTuple(innerBag));
+    }
+
+    { // this is to simulate an output from a prior call of IntermediateFinal
+      final KllFloatsSketch qs = new KllFloatsSketch();
+      qs.update(2);
+      bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(qs.toByteArray())));
+    }
+
+    final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(bag));
+    final KllFloatsSketch sketch = getSketch(resultTuple);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 2);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void algebraicIntermediateWrongType() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func = (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+
+    // this bag must have tuples with either bags or data byte arrays
+    bag.add(TUPLE_FACTORY.newTuple(1.0));
+    func.exec(TUPLE_FACTORY.newTuple(bag));
+  }
+
+  @Test
+  public void algebraicFinalNullInputTupleCustomK() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func = (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal())
+        .getConstructor(String.class).newInstance("400");
+    final DataByteArray result = func.exec(null);
+    final KllFloatsSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+    Assert.assertEquals(sketch.getK(), 400);
+  }
+
+  @Test
+  public void algebraicFinalEmptyInputTuple() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func = (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple());
+    final KllFloatsSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicFinalNormalCase() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func = (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+
+    { // this is to simulate an output from Initial
+      final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+      innerBag.add(TUPLE_FACTORY.newTuple(Float.valueOf(1)));
+      bag.add(TUPLE_FACTORY.newTuple(innerBag));
+    }
+
+    { // this is to simulate an output from a prior call of Intermediate
+      final KllFloatsSketch qs = new KllFloatsSketch();
+      qs.update(2);
+      bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(qs.toByteArray())));
+    }
+
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+    final KllFloatsSketch sketch = getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 2);
+  }
+  
+  @Test
+  public void algebraicFinalMixedNullCase() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func = (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+
+    { // this is to simulate an output from Initial
+      final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+      innerBag.add(TUPLE_FACTORY.newTuple(Float.valueOf(1)));
+      innerBag.add(null);
+      bag.add(TUPLE_FACTORY.newTuple(innerBag));
+    }
+
+    { // this is to simulate an output from a prior call of Intermediate
+      final KllFloatsSketch qs = new KllFloatsSketch();
+      qs.update(2);
+      bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(qs.toByteArray())));
+    }
+
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+    final KllFloatsSketch sketch = getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 2);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void algebraicFinalWrongType() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func = (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+
+    // this bag must have tuples with either bags or data byte arrays
+    bag.add(TUPLE_FACTORY.newTuple(1.0));
+    func.exec(TUPLE_FACTORY.newTuple(bag));
+  }
+
+  // end of tests
+
+  private static KllFloatsSketch getSketch(final Tuple tuple) throws Exception {
+    Assert.assertNotNull(tuple);
+    Assert.assertEquals(tuple.size(), 1);
+    final DataByteArray bytes = (DataByteArray) tuple.get(0);
+    return getSketch(bytes);
+  }
+
+  private static KllFloatsSketch getSketch(final DataByteArray bytes) throws Exception {
+    Assert.assertTrue(bytes.size() > 0);
+    return KllFloatsSketch.heapify(Memory.wrap(bytes.get()));
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/kll/GetCdfTest.java b/src/test/java/com/yahoo/sketches/pig/kll/GetCdfTest.java
new file mode 100644
index 0000000..fce1f3f
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/kll/GetCdfTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+import org.testng.Assert;
+
+public class GetCdfTest {
+  private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+  @Test
+  public void emptySketch() throws Exception {
+    final EvalFunc<Tuple> func = new GetCdf();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0f)));
+    Assert.assertNull(resultTuple);
+  }
+
+  @Test
+  public void normalCase() throws Exception {
+    final EvalFunc<Tuple> func = new GetCdf();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    for (int i = 1; i <= 10; i++) sketch.update(i);
+    final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 2f, 7f)));
+    Assert.assertNotNull(resultTuple);
+    Assert.assertEquals(resultTuple.size(), 3);
+    Assert.assertEquals(((double) resultTuple.get(0)), 0.1);
+    Assert.assertEquals(((double) resultTuple.get(1)), 0.6);
+    Assert.assertEquals(((double) resultTuple.get(2)), 1.0);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongNumberOfInputs() throws Exception {
+    final EvalFunc<Tuple> func = new GetCdf();
+    func.exec(TUPLE_FACTORY.newTuple(1));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeForSketch() throws Exception {
+    final EvalFunc<Tuple> func = new GetCdf();
+    func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(1.0, 1.0)));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeOfSplitPoint() throws Exception {
+    final EvalFunc<Tuple> func = new GetCdf();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 1)));
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/kll/GetKTest.java b/src/test/java/com/yahoo/sketches/pig/kll/GetKTest.java
new file mode 100644
index 0000000..e5d65a3
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/kll/GetKTest.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+import org.testng.Assert;
+
+public class GetKTest {
+
+  private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+  @Test
+  public void defalutK() throws Exception {
+    final EvalFunc<Integer> func = new GetK();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    final Integer result = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()))));
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result, Integer.valueOf(KllFloatsSketch.DEFAULT_K));
+  }
+
+  @Test
+  public void customK() throws Exception {
+    final EvalFunc<Integer> func = new GetK();
+    final KllFloatsSketch sketch = new KllFloatsSketch(400);
+    final Integer result = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()))));
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result, Integer.valueOf(400));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void noInputs() throws Exception {
+    final EvalFunc<Integer> func = new GetK();
+    func.exec(TUPLE_FACTORY.newTuple());
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void tooManyInputs() throws Exception {
+    final EvalFunc<Integer> func = new GetK();
+    func.exec(TUPLE_FACTORY.newTuple(2));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeForSketch() throws Exception {
+    final EvalFunc<Integer> func = new GetK();
+    func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(1.0)));
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/kll/GetPmfTest.java b/src/test/java/com/yahoo/sketches/pig/kll/GetPmfTest.java
new file mode 100644
index 0000000..4693e84
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/kll/GetPmfTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+import org.testng.Assert;
+
+public class GetPmfTest {
+  private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+  @Test
+  public void emptySketch() throws Exception {
+    final EvalFunc<Tuple> func = new GetPmf();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0f)));
+    Assert.assertNull(resultTuple);
+  }
+
+  @Test
+  public void normalCase() throws Exception {
+    final EvalFunc<Tuple> func = new GetPmf();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    for (int i = 1; i <= 10; i++) sketch.update(i);
+    final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 2f, 7f)));
+    Assert.assertNotNull(resultTuple);
+    Assert.assertEquals(resultTuple.size(), 3);
+    Assert.assertEquals(((double) resultTuple.get(0)), 0.1);
+    Assert.assertEquals(((double) resultTuple.get(1)), 0.5);
+    Assert.assertEquals(((double) resultTuple.get(2)), 0.4);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongNumberOfInputs() throws Exception {
+    final EvalFunc<Tuple> func = new GetPmf();
+    func.exec(TUPLE_FACTORY.newTuple(1));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeForSketch() throws Exception {
+    final EvalFunc<Tuple> func = new GetPmf();
+    func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(1.0, 1.0)));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeOfSplitPoint() throws Exception {
+    final EvalFunc<Tuple> func = new GetPmf();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 1)));
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/kll/GetQuantileTest.java b/src/test/java/com/yahoo/sketches/pig/kll/GetQuantileTest.java
new file mode 100644
index 0000000..9b650e3
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/kll/GetQuantileTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+import org.testng.Assert;
+
+public class GetQuantileTest {
+  private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+  @Test
+  public void emptySketch() throws Exception {
+    final EvalFunc<Float> func = new GetQuantile();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    Float result = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.0)));
+    Assert.assertEquals(result, Float.NaN);
+  }
+
+  @Test
+  public void normalCase() throws Exception {
+    final EvalFunc<Float> func = new GetQuantile();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    sketch.update(1);
+    final Float result = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.5)));
+    Assert.assertEquals(result, 1f);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongNumberOfInputs() throws Exception {
+    final EvalFunc<Float> func = new GetQuantile();
+    func.exec(TUPLE_FACTORY.newTuple(1));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeForSketch() throws Exception {
+    final EvalFunc<Float> func = new GetQuantile();
+    func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(1.0, 1.0)));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeForFraction() throws Exception {
+    final EvalFunc<Float> func = new GetQuantile();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 1)));
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/kll/GetQuantilesTest.java b/src/test/java/com/yahoo/sketches/pig/kll/GetQuantilesTest.java
new file mode 100644
index 0000000..5dbc463
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/kll/GetQuantilesTest.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+import org.testng.Assert;
+
+public class GetQuantilesTest {
+  private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+  @Test
+  public void emptySketch() throws Exception {
+    final EvalFunc<Tuple> func = new GetQuantiles();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.5)));
+    Assert.assertNotNull(resultTuple);
+    Assert.assertEquals(resultTuple.size(), 1);
+    Assert.assertEquals(((float) resultTuple.get(0)), Float.NaN);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void tooFewInputs() throws Exception {
+    final EvalFunc<Tuple> func = new GetQuantiles();
+    func.exec(TUPLE_FACTORY.newTuple(1));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeForSketch() throws Exception {
+    final EvalFunc<Tuple> func = new GetQuantiles();
+    func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(1.0, 1.0)));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeForFractionOrNumberOfIntervals() throws Exception {
+    final EvalFunc<Tuple> func = new GetQuantiles();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), "")));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeAmongFractions() throws Exception {
+    final EvalFunc<Tuple> func = new GetQuantiles();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.0, 1)));
+  }
+
+  @Test
+  public void oneFraction() throws Exception {
+    final EvalFunc<Tuple> func = new GetQuantiles();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    for (int i = 1; i <= 10; i++) sketch.update(i);
+    final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.5)));
+    Assert.assertNotNull(resultTuple);
+    Assert.assertEquals(resultTuple.size(), 1);
+    Assert.assertEquals(((float) resultTuple.get(0)), 6f);
+  }
+
+  @Test
+  public void severalFractions() throws Exception {
+    final EvalFunc<Tuple> func = new GetQuantiles();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    for (int i = 1; i <= 10; i++) sketch.update(i);
+    final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.0, 0.5, 1.0)));
+    Assert.assertNotNull(resultTuple);
+    Assert.assertEquals(resultTuple.size(), 3);
+    Assert.assertEquals(((float) resultTuple.get(0)), 1f);
+    Assert.assertEquals(((float) resultTuple.get(1)), 6f);
+    Assert.assertEquals(((float) resultTuple.get(2)), 10f);
+  }
+
+  @Test
+  public void numberOfEvenlySpacedIntervals() throws Exception {
+    final EvalFunc<Tuple> func = new GetQuantiles();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    for (int i = 1; i <= 10; i++) sketch.update(i);
+    final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 3)));
+    Assert.assertNotNull(resultTuple);
+    Assert.assertEquals(resultTuple.size(), 3);
+    Assert.assertEquals(((float) resultTuple.get(0)), 1f);
+    Assert.assertEquals(((float) resultTuple.get(1)), 6f);
+    Assert.assertEquals(((float) resultTuple.get(2)), 10f);
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/kll/GetRankTest.java b/src/test/java/com/yahoo/sketches/pig/kll/GetRankTest.java
new file mode 100644
index 0000000..83d42df
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/kll/GetRankTest.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+import org.testng.Assert;
+
+public class GetRankTest {
+  private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+  @Test
+  public void emptySketch() throws Exception {
+    final EvalFunc<Double> func = new GetRank();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    final Double result = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0f)));
+    Assert.assertEquals(result, Double.NaN);
+  }
+
+  @Test
+  public void normalCase() throws Exception {
+    final EvalFunc<Double> func = new GetRank();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    sketch.update(1);
+    sketch.update(2);
+    sketch.update(3);
+    sketch.update(4);
+    final Double result = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 3f)));
+    Assert.assertEquals(result, 0.5);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongNumberOfInputs() throws Exception {
+    final EvalFunc<Double> func = new GetRank();
+    func.exec(TUPLE_FACTORY.newTuple(1));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeForSketch() throws Exception {
+    final EvalFunc<Double> func = new GetRank();
+    func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(1.0, 1.0)));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeForValue() throws Exception {
+    final EvalFunc<Double> func = new GetRank();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 1)));
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/kll/SketchToStringTest.java b/src/test/java/com/yahoo/sketches/pig/kll/SketchToStringTest.java
new file mode 100644
index 0000000..62e1d09
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/kll/SketchToStringTest.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+import org.testng.Assert;
+
+public class SketchToStringTest {
+  private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+  @Test
+  public void normalCase() throws Exception {
+    final EvalFunc<String> func = new SketchToString();
+    final KllFloatsSketch sketch = new KllFloatsSketch();
+    final String result = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()))));
+    Assert.assertNotNull(result);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void noInputs() throws Exception {
+    final EvalFunc<String> func = new SketchToString();
+    func.exec(TUPLE_FACTORY.newTuple());
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void tooManyInputs() throws Exception {
+    final EvalFunc<String> func = new SketchToString();
+    func.exec(TUPLE_FACTORY.newTuple(2));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeForSketch() throws Exception {
+    final EvalFunc<String> func = new SketchToString();
+    func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(1.0)));
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/kll/UnionSketchTest.java b/src/test/java/com/yahoo/sketches/pig/kll/UnionSketchTest.java
new file mode 100644
index 0000000..dd87b5a
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/kll/UnionSketchTest.java
@@ -0,0 +1,266 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+public class UnionSketchTest {
+  private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+  private static final BagFactory BAG_FACTORY = BagFactory.getInstance();
+
+  @Test
+  public void execNullInputTuple() throws Exception {
+    final EvalFunc<DataByteArray> func = new UnionSketch();
+    final DataByteArray result = func.exec(null);
+    final KllFloatsSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void execEmptyInputTuple() throws Exception {
+    final EvalFunc<DataByteArray> func = new UnionSketch();
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple());
+    final KllFloatsSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void execEmptyBag() throws Exception {
+    final EvalFunc<DataByteArray> func = new UnionSketch();
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+    final KllFloatsSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void execNormalCase() throws Exception {
+    final EvalFunc<DataByteArray> func = new UnionSketch();
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+    final KllFloatsSketch inputSketch = new KllFloatsSketch();
+    inputSketch.update(1);
+    bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+    final KllFloatsSketch sketch = getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 1);
+  }
+
+  @Test
+  public void accumulator() throws Exception {
+    final Accumulator<DataByteArray> func = new UnionSketch();
+
+    // no input yet
+    DataByteArray result = func.getValue();
+    KllFloatsSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // null input tuple
+    func.accumulate(null);
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // empty input tuple
+    func.accumulate(TUPLE_FACTORY.newTuple());
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // empty bag
+    func.accumulate(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // normal case
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+    final KllFloatsSketch inputSketch = new KllFloatsSketch();
+    inputSketch.update(1);
+    bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+    func.accumulate(TUPLE_FACTORY.newTuple(bag));
+    func.accumulate(TUPLE_FACTORY.newTuple(bag));
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 2);
+
+    // cleanup
+    func.cleanup();
+    result = func.getValue();
+    sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void accumulatorCustomK() throws Exception {
+    final Accumulator<DataByteArray> func = new UnionSketch("400");
+    final KllFloatsSketch sketch = getSketch(func.getValue());
+    Assert.assertEquals(sketch.getK(), 400);
+  }
+
+  @Test
+  public void algebraicInitial() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new UnionSketch().getInitial())
+        .getConstructor(String.class).newInstance("400");
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple());
+    final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(bag));
+    Assert.assertNotNull(resultTuple);
+    Assert.assertEquals(resultTuple.size(), 1);
+    Assert.assertTrue(resultTuple.get(0) instanceof DataBag);
+    Assert.assertEquals(((DataBag) resultTuple.get(0)).size(), 1);
+  }
+
+  @Test
+  public void algebraicIntermediateNullInputTupleCustomK() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed())
+        .getConstructor(String.class).newInstance("400");
+    final Tuple resultTuple = func.exec(null);
+    final KllFloatsSketch sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+    Assert.assertEquals(sketch.getK(), 400);
+  }
+
+  @Test
+  public void algebraicIntermediateEmptyInputTuple() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed()).newInstance();
+    final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple());
+    final KllFloatsSketch sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicIntermediateNormalCase() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed()).newInstance();
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+
+    { // this is to simulate an output from Initial
+      final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+      final KllFloatsSketch qs = new KllFloatsSketch();
+      qs.update(1);
+      innerBag.add(TUPLE_FACTORY.newTuple(new DataByteArray(qs.toByteArray())));
+      bag.add(TUPLE_FACTORY.newTuple(innerBag));
+    }
+
+    { // this is to simulate an output from a prior call of Intermediate
+      final KllFloatsSketch qs = new KllFloatsSketch();
+      qs.update(2);
+      bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(qs.toByteArray())));
+    }
+
+    final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(bag));
+    final KllFloatsSketch sketch = getSketch(resultTuple);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 2);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void algebraicIntermediateWrongType() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<Tuple> func =
+        (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed()).newInstance();
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+
+    // this bag must have tuples with either bags or data byte arrays
+    bag.add(TUPLE_FACTORY.newTuple(1.0));
+    func.exec(TUPLE_FACTORY.newTuple(bag));
+  }
+
+  @Test
+  public void algebraicFinalNullInputTupleCustomK() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal())
+        .getConstructor(String.class).newInstance("400");
+    final DataByteArray result = func.exec(null);
+    final KllFloatsSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+    Assert.assertEquals(sketch.getK(), 400);
+  }
+
+  @Test
+  public void algebraicFinalEmptyInputTuple() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal()).newInstance();
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple());
+    final KllFloatsSketch sketch = getSketch(result);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicFinalNormalCase() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal()).newInstance();
+    final DataBag bag = BAG_FACTORY.newDefaultBag();
+
+    { // this is to simulate an output from Initial
+      final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+      final KllFloatsSketch qs = new KllFloatsSketch();
+      qs.update(1);
+      innerBag.add(TUPLE_FACTORY.newTuple(new DataByteArray(qs.toByteArray())));
+      bag.add(TUPLE_FACTORY.newTuple(innerBag));
+    }
+
+    { // this is to simulate an output from a prior call of Intermediate
+      final KllFloatsSketch qs = new KllFloatsSketch();
+      qs.update(2);
+      bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(qs.toByteArray())));
+    }
+
+    final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+    final KllFloatsSketch sketch = getSketch(result);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 2);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void algebraicIntermediateFinalWrongType() throws Exception {
+    @SuppressWarnings("unchecked")
+    final EvalFunc<DataByteArray> func =
+        (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal()).newInstance();
+    DataBag bag = BAG_FACTORY.newDefaultBag();
+
+    // this bag must have tuples with either bags or data byte arrays
+    bag.add(TUPLE_FACTORY.newTuple(1.0));
+    func.exec(TUPLE_FACTORY.newTuple(bag));
+  }
+
+  // end of tests
+
+  private static KllFloatsSketch getSketch(final Tuple tuple) throws Exception {
+    Assert.assertNotNull(tuple);
+    Assert.assertEquals(tuple.size(), 1);
+    final DataByteArray bytes = (DataByteArray) tuple.get(0);
+    return getSketch(bytes);
+  }
+
+  private static KllFloatsSketch getSketch(final DataByteArray bytes) throws Exception {
+    Assert.assertTrue(bytes.size() > 0);
+    return KllFloatsSketch.heapify(Memory.wrap(bytes.get()));
+  }
+
+}