Merge pull request #14 from DataSketches/quantiles

Quantiles Sketch UDFs
diff --git a/pom.xml b/pom.xml
index b4bf1a0..0239ee5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,7 +55,7 @@
     <dependency>
       <groupId>com.yahoo.datasketches</groupId>
       <artifactId>sketches-core</artifactId>
-      <version>0.5.1</version>
+      <version>0.5.2</version>
     </dependency> 
 
     <!-- hadoop -->
diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/DataToSketch.java b/src/main/java/com/yahoo/sketches/pig/quantiles/DataToSketch.java
new file mode 100644
index 0000000..f8e04b3
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/quantiles/DataToSketch.java
@@ -0,0 +1,327 @@
+/*

+ * Copyright 2016, Yahoo! Inc.

+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.

+ */

+package com.yahoo.sketches.pig.quantiles;

+

+import java.io.IOException;

+

+import org.apache.pig.Accumulator;

+import org.apache.pig.Algebraic;

+import org.apache.pig.EvalFunc;

+import org.apache.pig.data.DataBag;

+import org.apache.pig.data.DataByteArray;

+import org.apache.pig.data.DataType;

+import org.apache.pig.data.Tuple;

+import org.apache.pig.data.TupleFactory;

+import org.apache.pig.impl.logicalLayer.FrontendException;

+import org.apache.pig.impl.logicalLayer.schema.Schema;

+

+import com.yahoo.sketches.memory.NativeMemory;

+import com.yahoo.sketches.quantiles.QuantilesSketch;

+import com.yahoo.sketches.quantiles.Union;

+import com.yahoo.sketches.quantiles.UnionBuilder;

+

+/**

+ * This is a Pig UDF that builds Sketches from data. 

+ * To assist Pig, this class implements both the <i>Accumulator</i> and <i>Algebraic</i> interfaces.

+ */

+public class DataToSketch extends EvalFunc<Tuple> implements Accumulator<Tuple>, Algebraic {

+

+  private static final TupleFactory tupleFactory_ = TupleFactory.getInstance();

+

+  // With the single exception of the Accumulator interface, UDFs are stateless.

+  // All parameters kept at the class level must be final, except for the accumUnion.

+  private final UnionBuilder unionBuilder_;

+  private Union accumUnion_;

+

+  // TOP LEVEL API

+

+  /**

+   * Default constructor. Assumes default k.

+   */

+  public DataToSketch() {

+    this(0);

+  }

+

+  /**

+   * String constructor.

+   * 

+   * @param kStr string representation of k

+   */

+  public DataToSketch(final String kStr) {

+    this(Integer.parseInt(kStr));

+  }

+

+  /**

+   * Base constructor.

+   * 

+   * @param k parameter that determines the accuracy and size of the sketch.

+   */

+  public DataToSketch(final int k) {

+    super();

+    unionBuilder_ = Union.builder();

+    if (k > 0) unionBuilder_.setK(k);

+  }

+

+  //@formatter:off

+  /**

+   * Top-level exec function.

+   * This method accepts an input Tuple containing a Bag of one or more inner <b>Datum Tuples</b>

+   * and returns a single updated <b>Sketch</b> as a <b>Sketch Tuple</b>.

+   * 

+   * <p>

+   * If a large number of calls are anticipated, leveraging either the <i>Algebraic</i> or

+   * <i>Accumulator</i> interfaces is recommended. Pig normally handles this automatically.

+   * 

+   * <p>

+   * Internally, this method presents the inner <b>Datum Tuples</b> to a new <b>Union</b>,

+   * which is returned as a <b>Sketch Tuple</b>

+   * 

+   * <p>

+   * Types are in the form: Java data type: Pig DataType

+   * 

+   * <p>

+   * 

+   * <b>Input Tuple</b>

+   * <ul>

+   *   <li>Tuple: TUPLE (Must contain only one field)

+   *     <ul>

+   *       <li>index 0: DataBag: BAG (May contain 0 or more Inner Tuples)

+   *         <ul>

+   *           <li>index 0: Tuple: TUPLE <b>Datum Tuple</b></li>

+   *           <li>...</li>

+   *           <li>index n-1: Tuple: TUPLE <b>Datum Tuple</b></li>

+   *         </ul>

+   *       </li>

+   *     </ul>

+   *   </li>

+   * </ul>

+   * 

+   * <b>Datum Tuple</b>

+   * <ul>

+   *   <li>Tuple: TUPLE (Must contain only one field)

+   *     <ul>

+   *       <li>index 0: Double: DOUBLE</li>

+   *     </ul>

+   *   </li>

+   * </ul>

+   * 

+   * <b>Sketch Tuple</b>

+   * <ul>

+   *   <li>Tuple: TUPLE (Contains exactly 1 field)

+   *     <ul>

+   *       <li>index 0: DataByteArray: BYTEARRAY = a serialized QuantilesSketch object.</li>

+   *     </ul>

+   *   </li>

+   * </ul>

+   * 

+   * @param inputTuple A tuple containing a single bag, containing Datum Tuples.

+   * @return Sketch Tuple. If inputTuple is null or empty, returns empty sketch.

+   * @see "org.apache.pig.EvalFunc.exec(org.apache.pig.data.Tuple)"

+   * @throws IOException from Pig.

+   */

+  // @formatter:on

+

+  @Override // TOP LEVEL EXEC

+  public Tuple exec(final Tuple inputTuple) throws IOException {

+    //The exec is a stateless function. It operates on the input and returns a result.

+    if (inputTuple != null && inputTuple.size() > 0) {

+      final Union union = unionBuilder_.build();

+      final DataBag bag = (DataBag) inputTuple.get(0);

+      for (final Tuple innerTuple: bag) union.update((Double) innerTuple.get(0));

+      final QuantilesSketch resultSketch = union.getResultAndReset();

+      if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));

+    }

+    // return empty sketch

+    return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));

+  }

+

+  @Override

+  public Schema outputSchema(final Schema input) {

+    if (input == null) return null;

+    try {

+      final Schema tupleSchema = new Schema();

+      tupleSchema.add(new Schema.FieldSchema("Sketch", DataType.BYTEARRAY));

+      return new Schema(new Schema.FieldSchema(getSchemaName(

+          this.getClass().getName().toLowerCase(), input), tupleSchema, DataType.TUPLE));

+    } catch (FrontendException e) {

+      throw new RuntimeException(e);

+    }

+  }

+

+  // ACCUMULATOR INTERFACE

+

+  /**

+   * An <i>Accumulator</i> version of the standard <i>exec()</i> method. Like <i>exec()</i>,

+   * accumulator is called with a bag of Datum Tuples. Unlike <i>exec()</i>, it doesn't serialize the

+   * sketch at the end. Instead, it can be called multiple times, each time with another bag of

+   * Datum Tuples to be input to the Union.

+   * 

+   * @param inputTuple A tuple containing a single bag, containing Datum Tuples.

+   * @see #exec

+   * @see "org.apache.pig.Accumulator.accumulate(org.apache.pig.data.Tuple)"

+   * @throws IOException by Pig

+   */

+  @Override

+  public void accumulate(final Tuple inputTuple) throws IOException {

+    if (inputTuple == null || inputTuple.size() == 0) return;

+    final DataBag bag = (DataBag) inputTuple.get(0);

+    if (bag == null) return;

+    if (accumUnion_ == null) accumUnion_ = unionBuilder_.build();

+    for (final Tuple innerTuple: bag) accumUnion_.update((Double) innerTuple.get(0));

+  }

+

+  /**

+   * Returns the result of the Union that has been built up by multiple calls to {@link #accumulate}.

+   * 

+   * @return Sketch Tuple. (see {@link #exec} for return tuple format)

+   * @see "org.apache.pig.Accumulator.getValue()"

+   */

+  @Override

+  public Tuple getValue() {

+    if (accumUnion_ != null) {

+      final QuantilesSketch resultSketch = accumUnion_.getResultAndReset();

+      if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));

+    }

+    // return empty sketch

+    return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));

+  }

+

+  /**

+   * Cleans up the UDF state after being called using the {@link Accumulator} interface.

+   * 

+   * @see "org.apache.pig.Accumulator.cleanup()"

+   */

+  @Override

+  public void cleanup() {

+    accumUnion_ = null;

+  }

+

+  // ALGEBRAIC INTERFACE

+

+  @Override

+  public String getInitial() {

+    return Initial.class.getName();

+  }

+

+  @Override

+  public String getIntermed() {

+    return IntermediateFinal.class.getName();

+  }

+

+  @Override

+  public String getFinal() {

+    return IntermediateFinal.class.getName();

+  }

+

+  // STATIC Initial Class only called by Pig

+

+  /**

+   * Class used to calculate the initial pass of an Algebraic sketch operation. 

+   * 

+   * <p>

+   * The Initial class simply passes through all records unchanged so that they can be

+   * processed by the intermediate processor instead.</p>

+   */

+  public static class Initial extends EvalFunc<Tuple> {

+    // The Algebraic worker classes (Initial, IntermediateFinal) are static and stateless. 

+    // The constructors and final parameters must mirror the parent class as there is no linkage

+    // between them.

+    /**

+     * Default constructor.

+     */

+    public Initial() {}

+

+    /**

+     * Constructor for the initial pass of an Algebraic function. Pig will call this and pass the 

+     * same constructor arguments as the base UDF. In this case the arguments are ignored.

+     * 

+     * @param kStr string representation of k

+     */

+    public Initial(final String kStr) {}

+

+    @Override // Initial exec

+    public Tuple exec(final Tuple inputTuple) throws IOException {

+      return inputTuple;

+    }

+  }

+

+  // STATIC IntermediateFinal Class only called by Pig

+

+  /**

+   * Class used to calculate the intermediate or final combiner pass of an <i>Algebraic</i> sketch 

+   * operation. This is called from the combiner, and may be called multiple times (from the mapper 

+   * and from the reducer). It will receive a bag of values returned by either the <i>Intermediate</i> 

+   * stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and 

+   * interpret both types.

+   */

+  public static class IntermediateFinal extends EvalFunc<Tuple> {

+    // The Algebraic worker classes (Initial, IntermediateFinal) are static and stateless. 

+    // The constructors and final parameters must mirror the parent class as there is no linkage

+    // between them.

+    private final UnionBuilder unionBuilder_;

+

+    /**

+     * Default constructor. Assumes default k.

+     */

+    public IntermediateFinal() {

+      this(0);

+    }

+

+    /**

+     * Constructor for the intermediate and final passes of an Algebraic function. Pig will call 

+     * this and pass the same constructor arguments as the base UDF.

+     *

+     * @param kStr string representation of k

+     */

+    public IntermediateFinal(final String kStr) {

+      this(Integer.parseInt(kStr));

+    }

+

+    /**

+     * Constructor with primitives for the intermediate and final passes of an Algebraic function. 

+     * 

+     * @param k parameter that determines the accuracy and size of the sketch.

+     */

+    public IntermediateFinal(final int k) {

+      unionBuilder_ = Union.builder();

+      if (k > 0) unionBuilder_.setK(k);

+    }

+

+    @Override // IntermediateFinal exec

+    public Tuple exec(final Tuple inputTuple) throws IOException { //throws is in API

+      if (inputTuple != null && inputTuple.size() > 0) {

+        final Union union = unionBuilder_.build();

+        final DataBag outerBag = (DataBag) inputTuple.get(0);

+        for (final Tuple dataTuple: outerBag) {

+          final Object f0 = dataTuple.get(0);

+          if (f0 == null) continue;

+          if (f0 instanceof DataBag) {

+            final DataBag innerBag = (DataBag) f0; // inputTuple.bag0.dataTupleN.f0:bag

+            if (innerBag.size() == 0) continue;

+            // If field 0 of a dataTuple is a Bag all innerTuples of this inner bag

+            // will be passed into the union.

+            // It is due to system bagged outputs from multiple mapper Initial functions.  

+            // The Intermediate stage was bypassed.

+            for (final Tuple innerTuple: innerBag) union.update((Double) innerTuple.get(0));

+          } else if (f0 instanceof DataByteArray) { // inputTuple.bag0.dataTupleN.f0:DBA

+            // If field 0 of a dataTuple is a DataByteArray we assume it is a sketch

+            // due to system bagged outputs from multiple mapper Intermediate functions.

+            // Each dataTuple.DBA:sketch will merged into the union.

+            final DataByteArray dba = (DataByteArray) f0;

+            union.update(new NativeMemory(dba.get()));

+          } else {

+            throw new IllegalArgumentException("dataTuple.Field0: Is not a DataByteArray: "

+                + f0.getClass().getName());

+          }

+        }

+        final QuantilesSketch resultSketch = union.getResultAndReset();

+        if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));

+      }

+      // return empty sketch

+      return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));

+    }

+  } // end IntermediateFinal

+

+}

diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/GetPMF.java b/src/main/java/com/yahoo/sketches/pig/quantiles/GetPMF.java
new file mode 100644
index 0000000..fde4be0
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/quantiles/GetPMF.java
@@ -0,0 +1,42 @@
+/*

+ * Copyright 2016, Yahoo! Inc.

+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.

+ */

+package com.yahoo.sketches.pig.quantiles;

+

+import java.io.IOException;

+

+import org.apache.pig.EvalFunc;

+import org.apache.pig.data.DataByteArray;

+import org.apache.pig.data.Tuple;

+

+import com.yahoo.sketches.memory.NativeMemory;

+import com.yahoo.sketches.quantiles.QuantilesSketch;

+

+/**

+ * This UDF is to get an approximation to the Probability Mass Function (PMF) of the input stream 

+ * given a sketch and a set of split points - an array of <i>m</i> unique, monotonically increasing

+ * double values that divide the real number line into <i>m+1</i> consecutive disjoint intervals.

+ * The function returns an array of m+1 doubles each of which is an approximation to the fraction

+ * of the input stream values that fell into one of those intervals. Intervals are inclusive of

+ * the left split point and exclusive of the right split point.

+ */

+public class GetPMF extends EvalFunc<Tuple> {

+

+  @Override

+  public Tuple exec(final Tuple input) throws IOException {

+    if (input.size() < 2) throw new IllegalArgumentException("expected two or more inputs: sketch and list of split points");

+

+    if (!(input.get(0) instanceof DataByteArray)) throw new IllegalArgumentException("expected a DataByteArray as a sketch, got " + input.get(0).getClass().getSimpleName());

+    final DataByteArray dba = (DataByteArray) input.get(0);

+    final QuantilesSketch sketch = QuantilesSketch.heapify(new NativeMemory(dba.get()));

+

+    double[] splitPoints = new double[input.size() - 1];

+    for (int i = 1; i < input.size(); i++) {

+      if (!(input.get(i) instanceof Double)) throw new IllegalArgumentException("expected a double value as a split point, got " + input.get(i).getClass().getSimpleName());

+      splitPoints[i - 1] = (double) input.get(i);

+    }

+    return Util.doubleArrayToTuple(sketch.getPMF(splitPoints));

+  }

+

+}

diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/GetQuantile.java b/src/main/java/com/yahoo/sketches/pig/quantiles/GetQuantile.java
new file mode 100644
index 0000000..be58b6f
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/quantiles/GetQuantile.java
@@ -0,0 +1,38 @@
+/*

+ * Copyright 2016, Yahoo! Inc.

+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.

+ */

+package com.yahoo.sketches.pig.quantiles;

+

+import java.io.IOException;

+

+import org.apache.pig.EvalFunc;

+import org.apache.pig.data.DataByteArray;

+import org.apache.pig.data.Tuple;

+

+import com.yahoo.sketches.memory.NativeMemory;

+import com.yahoo.sketches.quantiles.QuantilesSketch;

+

+/**

+ * This UDF is to get a quantile value from a QuantilesSketch. A single value for a

+ * given fraction is returned. The fraction represents a normalized rank and must be

+ * from 0 to 1 inclusive. For example, the fraction of 0.5 corresponds to 50th percentile,

+ * which is the median value of the distribution (the number separating the higher half

+ * of the probability distribution from the lower half).

+ */

+public class GetQuantile extends EvalFunc<Double> {

+

+  @Override

+  public Double exec(final Tuple input) throws IOException {

+    if (input.size() != 2) throw new IllegalArgumentException("expected two inputs: sketch and fraction");

+

+    if (!(input.get(0) instanceof DataByteArray)) throw new IllegalArgumentException("expected a DataByteArray as a sketch, got " + input.get(0).getClass().getSimpleName());

+    final DataByteArray dba = (DataByteArray) input.get(0);

+    final QuantilesSketch sketch = QuantilesSketch.heapify(new NativeMemory(dba.get()));

+

+    if (!(input.get(1) instanceof Double)) throw new IllegalArgumentException("expected a double value as a fraction, got " + input.get(1).getClass().getSimpleName());

+    final double fraction = (double) input.get(1);

+    return sketch.getQuantile(fraction);

+  }

+

+}

diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/GetQuantiles.java b/src/main/java/com/yahoo/sketches/pig/quantiles/GetQuantiles.java
new file mode 100644
index 0000000..6f3efe9
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/quantiles/GetQuantiles.java
@@ -0,0 +1,41 @@
+/*

+ * Copyright 2016, Yahoo! Inc.

+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.

+ */

+package com.yahoo.sketches.pig.quantiles;

+

+import java.io.IOException;

+

+import org.apache.pig.EvalFunc;

+import org.apache.pig.data.DataByteArray;

+import org.apache.pig.data.Tuple;

+

+import com.yahoo.sketches.memory.NativeMemory;

+import com.yahoo.sketches.quantiles.QuantilesSketch;

+

+/**

+ * This UDF is to get a list of quantile values from a QuantileSketch given a list of fractions.

+ * The fractions represent normalized ranks and must be from 0 to 1 inclusive. For example,

+ * the fraction of 0.5 corresponds to 50th percentile, which is the median value of the

+ * distribution (the number separating the higher half of the probability distribution

+ * from the lower half).

+ */

+public class GetQuantiles extends EvalFunc<Tuple> {

+

+  @Override

+  public Tuple exec(final Tuple input) throws IOException {

+    if (input.size() < 2) throw new IllegalArgumentException("expected two or more inputs: sketch and list of fractions");

+

+    if (!(input.get(0) instanceof DataByteArray)) throw new IllegalArgumentException("expected a DataByteArray as a sketch, got " + input.get(0).getClass().getSimpleName());

+    final DataByteArray dba = (DataByteArray) input.get(0);

+    final QuantilesSketch sketch = QuantilesSketch.heapify(new NativeMemory(dba.get()));

+

+    double[] fractions = new double[input.size() - 1];

+    for (int i = 1; i < input.size(); i++) {

+      if (!(input.get(i) instanceof Double)) throw new IllegalArgumentException("expected a double value as a fraction, got " + input.get(i).getClass().getSimpleName());

+      fractions[i - 1] = (double) input.get(i);

+    }

+    return Util.doubleArrayToTuple(sketch.getQuantiles(fractions));

+  }

+

+}

diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/Merge.java b/src/main/java/com/yahoo/sketches/pig/quantiles/Merge.java
new file mode 100644
index 0000000..fbd18ea
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/quantiles/Merge.java
@@ -0,0 +1,341 @@
+/*
+ * Copyright 2016, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+package com.yahoo.sketches.pig.quantiles;
+
+import java.io.IOException;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import com.yahoo.sketches.memory.NativeMemory;
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+import com.yahoo.sketches.quantiles.Union;
+import com.yahoo.sketches.quantiles.UnionBuilder;
+
+/**
+ * This is a Pig UDF that merges Quantiles Sketches.
+ * To assist Pig, this class implements both the <i>Accumulator</i> and <i>Algebraic</i> interfaces.
+ */
+public class Merge extends EvalFunc<Tuple> implements Accumulator<Tuple>, Algebraic {
+
+  private static final TupleFactory tupleFactory_ = TupleFactory.getInstance();
+
+  // With the single exception of the Accumulator interface, UDFs are stateless.
+  // All parameters kept at the class level must be final, except for the accumUnion.
+  private final UnionBuilder unionBuilder_;
+  private Union accumUnion_;
+
+  //TOP LEVEL API
+
+  /**
+   * Default constructor. Assumes default k.
+   */
+  public Merge() {
+    this(0);
+  }
+
+  /**
+   * String constructor.
+   * 
+   * @param kStr string representation of k
+   */
+  public Merge(final String kStr) {
+    this(Integer.parseInt(kStr));
+  }
+
+  /**
+   * Base constructor.
+   * 
+   * @param k parameter that determines the accuracy and size of the sketch.
+   */
+  public Merge(final int k) {
+    super();
+    unionBuilder_ = Union.builder();
+    if (k > 0) unionBuilder_.setK(k);
+  }
+
+  //@formatter:off
+  /**
+   * Top-level exec function.
+   * This method accepts an input Tuple containing a Bag of one or more inner <b>Sketch Tuples</b>
+   * and returns a single updated <b>Sketch</b> as a <b>Sketch Tuple</b>.
+   * 
+   * <p>
+   * If a large number of calls are anticipated, leveraging either the <i>Algebraic</i> or
+   * <i>Accumulator</i> interfaces is recommended. Pig normally handles this automatically.
+   * 
+   * <p>
+   * Internally, this method presents the inner <b>Sketch Tuples</b> to a new <b>Union</b>. 
+   * The result is returned as a <b>Sketch Tuple</b>
+   * 
+   * <p>
+   * Types are in the form: Java data type: Pig DataType
+   * 
+   * <p>
+   * 
+   * <b>Input Tuple</b>
+   * <ul>
+   *   <li>Tuple: TUPLE (Must contain only one field)
+   *     <ul>
+   *       <li>index 0: DataBag: BAG (May contain 0 or more Inner Tuples)
+   *         <ul>
+   *           <li>index 0: Tuple: TUPLE <b>Sketch Tuple</b></li>
+   *           <li>...</li>
+   *           <li>index n-1: Tuple: TUPLE <b>Sketch Tuple</b></li>
+   *         </ul>
+   *       </li>
+   *     </ul>
+   *   </li>
+   * </ul>
+   * 
+   * <b>Sketch Tuple</b>
+   * <ul>
+   *   <li>Tuple: TUPLE (Contains exactly 1 field)
+   *     <ul>
+   *       <li>index 0: DataByteArray: BYTEARRAY = The serialization of a Sketch object.</li>
+   *     </ul>
+   *   </li>
+   * </ul>
+   * 
+   * @param inputTuple A tuple containing a single bag, containing Sketch Tuples.
+   * @return Sketch Tuple. If inputTuple is null or empty, returns empty sketch.
+   * @see "org.apache.pig.EvalFunc.exec(org.apache.pig.data.Tuple)"
+   */
+  //@formatter:on
+
+  @Override // TOP LEVEL EXEC
+  public Tuple exec(final Tuple inputTuple) throws IOException {
+    //The exec is a stateless function.  It operates on the input and returns a result.
+    if (inputTuple != null && inputTuple.size() > 0) {
+      final Union union = unionBuilder_.build();
+      final DataBag bag = (DataBag) inputTuple.get(0);
+      updateUnion(bag, union);
+      final QuantilesSketch resultSketch = union.getResultAndReset();
+      if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));
+    }
+    // return empty sketch
+    return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));
+  }
+
+  @Override
+  public Schema outputSchema(final Schema input) {
+    if (input == null) return null;
+    try {
+      final Schema tupleSchema = new Schema();
+      tupleSchema.add(new Schema.FieldSchema("Sketch", DataType.BYTEARRAY));
+      return new Schema(new Schema.FieldSchema(getSchemaName(
+          this.getClass().getName().toLowerCase(), input), tupleSchema, DataType.TUPLE));
+    } catch (FrontendException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  //ACCUMULATOR INTERFACE
+
+  /**
+   * An <i>Accumulator</i> version of the standard <i>exec()</i> method. Like <i>exec()</i>,
+   * accumulator is called with a bag of Sketch Tuples. Unlike <i>exec()</i>, it doesn't serialize the
+   * sketch at the end. Instead, it can be called multiple times, each time with another bag of
+   * Sketch Tuples to be input to the Union.
+   * 
+   * @param inputTuple A tuple containing a single bag, containing Sketch Tuples.
+   * @see #exec
+   * @see "org.apache.pig.Accumulator.accumulate(org.apache.pig.data.Tuple)"
+   * @throws IOException by Pig
+   */
+  @Override
+  public void accumulate(final Tuple inputTuple) throws IOException {
+    if (inputTuple == null || inputTuple.size() == 0) return;
+    final DataBag bag = (DataBag) inputTuple.get(0);
+    if (bag == null) return;
+    if (accumUnion_ == null) accumUnion_ = unionBuilder_.build();
+    updateUnion(bag, accumUnion_);
+  }
+
+  /**
+   * Returns the result of the Union that has been built up by multiple calls to {@link #accumulate}.
+   * 
+   * @return Sketch Tuple. (see {@link #exec} for return tuple format)
+   * @see "org.apache.pig.Accumulator.getValue()"
+   */
+  @Override
+  public Tuple getValue() {
+    if (accumUnion_ != null) {
+      final QuantilesSketch resultSketch = accumUnion_.getResultAndReset();
+      if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));
+    }
+    // return empty sketch
+    return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));
+  }
+
+  /**
+   * Cleans up the UDF state after being called using the {@link Accumulator} interface.
+   * 
+   * @see "org.apache.pig.Accumulator.cleanup()"
+   */
+  @Override
+  public void cleanup() {
+    accumUnion_ = null;
+  }
+
+  //ALGEBRAIC INTERFACE
+
+  @Override
+  public String getInitial() {
+    return Initial.class.getName();
+  }
+
+  @Override
+  public String getIntermed() {
+    return IntermediateFinal.class.getName();
+  }
+
+  @Override
+  public String getFinal() {
+    return IntermediateFinal.class.getName();
+  }
+  
+  //TOP LEVEL PRIVATE STATIC METHODS  
+  
+  /**
+   * Updates a union from a bag of sketches
+   * 
+   * @param bag A bag of sketchTuples.
+   * @param union The union to update
+   */
+  private static void updateUnion(DataBag bag, Union union) throws ExecException {
+    for (Tuple innerTuple: bag) {
+      final Object f0 = innerTuple.get(0);
+      if (f0 == null) continue;
+      if (f0 instanceof DataByteArray) {
+        final DataByteArray dba = (DataByteArray) f0;
+        if (dba.size() > 0) {
+          union.update(new NativeMemory(dba.get()));
+        }
+      } else {
+        throw new IllegalArgumentException("Field type was not DataType.BYTEARRAY: " + innerTuple.getType(0));
+      }
+    }
+  }
+
+  //STATIC Initial Class only called by Pig
+
+  /**
+   * Class used to calculate the initial pass of an Algebraic sketch operation. 
+   * 
+   * <p>
+   * The Initial class simply passes through all records unchanged so that they can be
+   * processed by the intermediate processor instead.</p>
+   */
+  public static class Initial extends EvalFunc<Tuple> {
+    // The Algebraic worker classes (Initial, IntermediateFinal) are static and stateless. 
+    // The constructors and final parameters must mirror the parent class as there is no linkage
+    // between them.
+    /**
+     * Default constructor.
+     */
+    public Initial() {}
+
+    /**
+     * Constructor for the initial pass of an Algebraic function. Pig will call this and pass the 
+     * same constructor arguments as the base UDF. In this case the arguments are ignored.
+     * 
+     * @param kStr string representation of k
+     */
+    public Initial(final String kStr) {}
+
+    @Override // Initial exec
+    public Tuple exec(final Tuple inputTuple) throws IOException {
+      return inputTuple;
+    }
+  }
+
+  // STATIC IntermediateFinal Class only called by Pig
+
+  /**
+   * Class used to calculate the intermediate or final combiner pass of an <i>Algebraic</i> union 
+   * operation. This is called from the combiner, and may be called multiple times (from the mapper 
+   * and from the reducer). It will receive a bag of values returned by either the <i>Intermediate</i> 
+   * stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and 
+   * interpret both types.
+   */
+  public static class IntermediateFinal extends EvalFunc<Tuple> {
+    // The Algebraic worker classes (Initial, IntermediateFinal) are static and stateless. 
+    // The constructors and final parameters must mirror the parent class as there is no linkage
+    // between them.
+    private final UnionBuilder unionBuilder_;
+
+    /**
+     * Default constructor. Assumes default k.
+     */
+    public IntermediateFinal() {
+      this(0);
+    }
+
+    /**
+     * Constructor for the intermediate and final passes of an Algebraic function. Pig will call 
+     * this and pass the same constructor arguments as the base UDF.
+     * 
+     * @param kStr string representation of k
+     */
+    public IntermediateFinal(final String kStr) {
+      this(Integer.parseInt(kStr));
+    }
+
+    /**
+     * Constructor with primitives for the intermediate and final passes of an Algebraic function.
+     * 
+     * @param k parameter that determines the accuracy and size of the sketch.
+     */
+    public IntermediateFinal(final int k) {
+      unionBuilder_ = Union.builder();
+      if (k > 0) unionBuilder_.setK(k);
+    }
+
+    @Override // IntermediateFinal exec
+    public Tuple exec(final Tuple inputTuple) throws IOException {
+      if (inputTuple != null && inputTuple.size() > 0) {
+        final Union union = unionBuilder_.build();
+        final DataBag outerBag = (DataBag) inputTuple.get(0);
+        for (final Tuple dataTuple: outerBag) {
+          final Object f0 = dataTuple.get(0);
+          if (f0 == null) continue;
+          if (f0 instanceof DataBag) {
+            final DataBag innerBag = (DataBag) f0; //inputTuple.bag0.dataTupleN.f0:bag
+            if (innerBag.size() == 0) continue;
+            // If field 0 of a dataTuple is again a Bag all tuples of this inner bag
+            // will be passed into the union.
+            // It is due to system bagged outputs from multiple mapper Initial functions.  
+            // The Intermediate stage was bypassed.
+            updateUnion(innerBag, union);
+          } else if (f0 instanceof DataByteArray) { //inputTuple.bag0.dataTupleN.f0:DBA
+            // If field 0 of a dataTuple is a DataByteArray we assume it is a sketch from a prior call
+            // It is due to system bagged outputs from multiple mapper Intermediate functions.
+            // Each dataTuple.DBA:sketch will merged into the union.
+            final DataByteArray dba = (DataByteArray) f0;
+            union.update(new NativeMemory(dba.get()));
+          } else {
+            throw new IllegalArgumentException("dataTuple.Field0: Is not a DataByteArray: "
+              + f0.getClass().getName());
+          }
+        }
+        final QuantilesSketch resultSketch = union.getResultAndReset();
+        if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));
+      }
+      // return empty sketch
+      return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));
+    }
+  } // end IntermediateFinal
+  
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/Util.java b/src/main/java/com/yahoo/sketches/pig/quantiles/Util.java
new file mode 100644
index 0000000..9922a5e
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/quantiles/Util.java
@@ -0,0 +1,17 @@
+package com.yahoo.sketches.pig.quantiles;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+public class Util {
+
+  static Tuple doubleArrayToTuple(double[] array) throws ExecException {
+    Tuple tuple = TupleFactory.getInstance().newTuple(array.length);
+    for (int i = 0; i < array.length; i++) {
+      tuple.set(i, array[i]);
+    }
+    return tuple;
+  }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/quantiles/DataToSketchTest.java b/src/test/java/com/yahoo/sketches/pig/quantiles/DataToSketchTest.java
new file mode 100644
index 0000000..0e0423c
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/quantiles/DataToSketchTest.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2016, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+package com.yahoo.sketches.pig.quantiles;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.yahoo.sketches.memory.NativeMemory;
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+public class DataToSketchTest {
+  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+  private static final BagFactory bagFactory = BagFactory.getInstance();
+
+  @Test
+  public void execNullInputTuple() throws Exception {
+    EvalFunc<Tuple> func = new DataToSketch();
+    Tuple resultTuple = func.exec(null);
+    QuantilesSketch sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void execEmptyInputTuple() throws Exception {
+    EvalFunc<Tuple> func = new DataToSketch();
+    Tuple resultTuple = func.exec(tupleFactory.newTuple());
+    QuantilesSketch sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void execEmptyBag() throws Exception {
+    EvalFunc<Tuple> func = new DataToSketch();
+    Tuple resultTuple = func.exec(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+    QuantilesSketch sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void execNormalCase() throws Exception {
+    EvalFunc<Tuple> func = new DataToSketch();
+    DataBag bag = bagFactory.newDefaultBag();
+    bag.add(tupleFactory.newTuple(1.0));
+    Tuple resultTuple = func.exec(tupleFactory.newTuple(bag));
+    QuantilesSketch sketch = getSketch(resultTuple);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 1);
+  }
+
+  @Test
+  public void accumulator() throws Exception {
+    Accumulator<Tuple> func = new DataToSketch();
+
+    // no input yet
+    Tuple resultTuple = func.getValue();
+    QuantilesSketch sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // null input tuple
+    func.accumulate(null);
+    resultTuple = func.getValue();
+    sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // empty input tuple
+    func.accumulate(tupleFactory.newTuple());
+    resultTuple = func.getValue();
+    sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // empty bag
+    func.accumulate(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+    resultTuple = func.getValue();
+    sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // normal case
+    DataBag bag = bagFactory.newDefaultBag();
+    bag.add(tupleFactory.newTuple(1.0));
+    func.accumulate(tupleFactory.newTuple(bag));
+    func.accumulate(tupleFactory.newTuple(bag));
+    resultTuple = func.getValue();
+    sketch = getSketch(resultTuple);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 2);
+
+    // cleanup
+    func.cleanup();
+    resultTuple = func.getValue();
+    sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicInitial() throws Exception {
+    EvalFunc<Tuple> func = new DataToSketch.Initial();
+    DataBag bag = bagFactory.newDefaultBag();
+    bag.add(tupleFactory.newTuple());
+    Tuple resultTuple = func.exec(tupleFactory.newTuple(bag));
+    Assert.assertNotNull(resultTuple);
+    Assert.assertEquals(resultTuple.size(), 1);
+    Assert.assertTrue(resultTuple.get(0) instanceof DataBag);
+    Assert.assertEquals(((DataBag) resultTuple.get(0)).size(), 1);
+  }
+
+  @Test
+  public void algebraicIntermediateFinalNullInputTuple() throws Exception {
+    EvalFunc<Tuple> func = new DataToSketch.IntermediateFinal();
+    Tuple resultTuple = func.exec(null);
+    QuantilesSketch sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicIntermediateFinalEmptyInputTuple() throws Exception {
+    EvalFunc<Tuple> func = new DataToSketch.IntermediateFinal();
+    Tuple resultTuple = func.exec(tupleFactory.newTuple());
+    QuantilesSketch sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicIntermediateFinalNormalCase() throws Exception {
+    EvalFunc<Tuple> func = new DataToSketch.IntermediateFinal();
+    DataBag bag = bagFactory.newDefaultBag();
+
+    { // this is to simulate an output from Initial
+      DataBag innerBag = bagFactory.newDefaultBag();
+      innerBag.add(tupleFactory.newTuple(1.0));
+      bag.add(tupleFactory.newTuple(innerBag));
+    }
+
+    { // this is to simulate an output from a prior call of IntermediateFinal
+      QuantilesSketch qs = QuantilesSketch.builder().build();
+      qs.update(2.0);
+      bag.add(tupleFactory.newTuple(new DataByteArray(qs.toByteArray())));
+    }
+
+    Tuple resultTuple = func.exec(tupleFactory.newTuple(bag));
+    QuantilesSketch sketch = getSketch(resultTuple);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 2);
+  }
+
+  // end of tests
+
+  private static QuantilesSketch getSketch(Tuple tuple) throws Exception {
+    Assert.assertNotNull(tuple);
+    Assert.assertEquals(tuple.size(), 1);
+    DataByteArray bytes = (DataByteArray) tuple.get(0);
+    Assert.assertTrue(bytes.size() > 0);
+    return QuantilesSketch.heapify(new NativeMemory(bytes.get()));
+  }
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/quantiles/GetPMFTest.java b/src/test/java/com/yahoo/sketches/pig/quantiles/GetPMFTest.java
new file mode 100644
index 0000000..85f8618
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/quantiles/GetPMFTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2016, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+package com.yahoo.sketches.pig.quantiles;
+
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+public class GetPMFTest {
+  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+
+  @Test
+  public void emptySketch() throws Exception {
+    EvalFunc<Tuple> func = new GetPMF();
+    QuantilesSketch sketch = QuantilesSketch.builder().build();
+    Tuple resultTuple = func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.5)));
+    Assert.assertNotNull(resultTuple);
+    Assert.assertEquals(resultTuple.size(), 2);
+    Assert.assertEquals(((double) resultTuple.get(0)), Double.NaN);
+    Assert.assertEquals(((double) resultTuple.get(1)), Double.NaN);
+  }
+
+  @Test
+  public void normalCase() throws Exception {
+    EvalFunc<Tuple> func = new GetPMF();
+    QuantilesSketch sketch = QuantilesSketch.builder().build();
+    for (int i = 1; i <= 10; i++) sketch.update(i);
+    Tuple resultTuple = func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 2.0, 7.0)));
+    Assert.assertNotNull(resultTuple);
+    Assert.assertEquals(resultTuple.size(), 3);
+    Assert.assertEquals(((double) resultTuple.get(0)), 0.1);
+    Assert.assertEquals(((double) resultTuple.get(1)), 0.5);
+    Assert.assertEquals(((double) resultTuple.get(2)), 0.4);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongNumberOfInputs() throws Exception {
+    EvalFunc<Tuple> func = new GetPMF();
+    func.exec(tupleFactory.newTuple(1));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeForSketch() throws Exception {
+    EvalFunc<Tuple> func = new GetPMF();
+    func.exec(tupleFactory.newTuple(Arrays.asList(1.0, 1.0)));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeForFraction() throws Exception {
+    EvalFunc<Tuple> func = new GetPMF();
+    QuantilesSketch sketch = QuantilesSketch.builder().build();
+    func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 1)));
+  }
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/quantiles/GetQuantileTest.java b/src/test/java/com/yahoo/sketches/pig/quantiles/GetQuantileTest.java
new file mode 100644
index 0000000..e5ee1a1
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/quantiles/GetQuantileTest.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2016, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+package com.yahoo.sketches.pig.quantiles;
+
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+public class GetQuantileTest {
+  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+
+  @Test
+  public void emptySketch() throws Exception {
+    EvalFunc<Double> func = new GetQuantile();
+    QuantilesSketch sketch = QuantilesSketch.builder().build();
+    Double result = func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.0)));
+    Assert.assertEquals(result, Double.POSITIVE_INFINITY);
+  }
+
+  @Test
+  public void normalCase() throws Exception {
+    EvalFunc<Double> func = new GetQuantile();
+    QuantilesSketch sketch = QuantilesSketch.builder().build();
+    sketch.update(1.0);
+    Double result = func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.5)));
+    Assert.assertEquals(result, 1.0);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongNumberOfInputs() throws Exception {
+    EvalFunc<Double> func = new GetQuantile();
+    func.exec(tupleFactory.newTuple(1));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeForSketch() throws Exception {
+    EvalFunc<Double> func = new GetQuantile();
+    func.exec(tupleFactory.newTuple(Arrays.asList(1.0, 1.0)));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeForFraction() throws Exception {
+    EvalFunc<Double> func = new GetQuantile();
+    QuantilesSketch sketch = QuantilesSketch.builder().build();
+    func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 1)));
+  }
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/quantiles/GetQuantilesTest.java b/src/test/java/com/yahoo/sketches/pig/quantiles/GetQuantilesTest.java
new file mode 100644
index 0000000..cba2c42
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/quantiles/GetQuantilesTest.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2016, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+package com.yahoo.sketches.pig.quantiles;
+
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+public class GetQuantilesTest {
+  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+
+  @Test
+  public void emptySketch() throws Exception {
+    EvalFunc<Tuple> func = new GetQuantiles();
+    QuantilesSketch sketch = QuantilesSketch.builder().build();
+    Tuple resultTuple = func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.5)));
+    Assert.assertNotNull(resultTuple);
+    Assert.assertEquals(resultTuple.size(), 1);
+    Assert.assertEquals(((double) resultTuple.get(0)), Double.NaN);
+  }
+
+  @Test
+  public void normalCase() throws Exception {
+    EvalFunc<Tuple> func = new GetQuantiles();
+    QuantilesSketch sketch = QuantilesSketch.builder().build();
+    for (int i = 1; i <= 10; i++) sketch.update(i);
+    Tuple resultTuple = func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.0, 0.5, 1.0)));
+    Assert.assertNotNull(resultTuple);
+    Assert.assertEquals(resultTuple.size(), 3);
+    Assert.assertEquals(((double) resultTuple.get(0)), 1.0);
+    Assert.assertEquals(((double) resultTuple.get(1)), 6.0);
+    Assert.assertEquals(((double) resultTuple.get(2)), 10.0);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongNumberOfInputs() throws Exception {
+    EvalFunc<Tuple> func = new GetQuantiles();
+    func.exec(tupleFactory.newTuple(1));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeForSketch() throws Exception {
+    EvalFunc<Tuple> func = new GetQuantiles();
+    func.exec(tupleFactory.newTuple(Arrays.asList(1.0, 1.0)));
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void wrongTypeForFraction() throws Exception {
+    EvalFunc<Tuple> func = new GetQuantiles();
+    QuantilesSketch sketch = QuantilesSketch.builder().build();
+    func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 1)));
+  }
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/quantiles/MergeTest.java b/src/test/java/com/yahoo/sketches/pig/quantiles/MergeTest.java
new file mode 100644
index 0000000..98fc699
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/quantiles/MergeTest.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2016, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+package com.yahoo.sketches.pig.quantiles;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.yahoo.sketches.memory.NativeMemory;
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+public class MergeTest {
+  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+  private static final BagFactory bagFactory = BagFactory.getInstance();
+
+  @Test
+  public void execNullInputTuple() throws Exception {
+    EvalFunc<Tuple> func = new Merge();
+    Tuple resultTuple = func.exec(null);
+    QuantilesSketch sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void execEmptyInputTuple() throws Exception {
+    EvalFunc<Tuple> func = new Merge();
+    Tuple resultTuple = func.exec(tupleFactory.newTuple());
+    QuantilesSketch sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void execEmptyBag() throws Exception {
+    EvalFunc<Tuple> func = new Merge();
+    Tuple resultTuple = func.exec(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+    QuantilesSketch sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void execNormalCase() throws Exception {
+    EvalFunc<Tuple> func = new Merge();
+    DataBag bag = bagFactory.newDefaultBag();
+    QuantilesSketch inputSketch = QuantilesSketch.builder().build();
+    inputSketch.update(1.0);
+    bag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toByteArray())));
+    Tuple resultTuple = func.exec(tupleFactory.newTuple(bag));
+    QuantilesSketch sketch = getSketch(resultTuple);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 1);
+  }
+
+  @Test
+  public void accumulator() throws Exception {
+    Accumulator<Tuple> func = new Merge();
+
+    // no input yet
+    Tuple resultTuple = func.getValue();
+    QuantilesSketch sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // null input tuple
+    func.accumulate(null);
+    resultTuple = func.getValue();
+    sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // empty input tuple
+    func.accumulate(tupleFactory.newTuple());
+    resultTuple = func.getValue();
+    sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // empty bag
+    func.accumulate(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+    resultTuple = func.getValue();
+    sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+
+    // normal case
+    DataBag bag = bagFactory.newDefaultBag();
+    QuantilesSketch inputSketch = QuantilesSketch.builder().build();
+    inputSketch.update(1.0);
+    bag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toByteArray())));
+    func.accumulate(tupleFactory.newTuple(bag));
+    func.accumulate(tupleFactory.newTuple(bag));
+    resultTuple = func.getValue();
+    sketch = getSketch(resultTuple);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 2);
+
+    // cleanup
+    func.cleanup();
+    resultTuple = func.getValue();
+    sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicInitial() throws Exception {
+    EvalFunc<Tuple> func = new Merge.Initial();
+    DataBag bag = bagFactory.newDefaultBag();
+    bag.add(tupleFactory.newTuple());
+    Tuple resultTuple = func.exec(tupleFactory.newTuple(bag));
+    Assert.assertNotNull(resultTuple);
+    Assert.assertEquals(resultTuple.size(), 1);
+    Assert.assertTrue(resultTuple.get(0) instanceof DataBag);
+    Assert.assertEquals(((DataBag) resultTuple.get(0)).size(), 1);
+  }
+
+  @Test
+  public void algebraicIntermediateFinalNullInputTuple() throws Exception {
+    EvalFunc<Tuple> func = new Merge.IntermediateFinal();
+    Tuple resultTuple = func.exec(null);
+    QuantilesSketch sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicIntermediateFinalEmptyInputTuple() throws Exception {
+    EvalFunc<Tuple> func = new Merge.IntermediateFinal();
+    Tuple resultTuple = func.exec(tupleFactory.newTuple());
+    QuantilesSketch sketch = getSketch(resultTuple);
+    Assert.assertTrue(sketch.isEmpty());
+  }
+
+  @Test
+  public void algebraicIntermediateFinalNormalCase() throws Exception {
+    EvalFunc<Tuple> func = new Merge.IntermediateFinal();
+    DataBag bag = bagFactory.newDefaultBag();
+
+    { // this is to simulate an output from Initial
+      DataBag innerBag = bagFactory.newDefaultBag();
+      QuantilesSketch qs = QuantilesSketch.builder().build();
+      qs.update(1.0);
+      innerBag.add(tupleFactory.newTuple(new DataByteArray(qs.toByteArray())));
+      bag.add(tupleFactory.newTuple(innerBag));
+    }
+
+    { // this is to simulate an output from a prior call of IntermediateFinal
+      QuantilesSketch qs = QuantilesSketch.builder().build();
+      qs.update(2.0);
+      bag.add(tupleFactory.newTuple(new DataByteArray(qs.toByteArray())));
+    }
+
+    Tuple resultTuple = func.exec(tupleFactory.newTuple(bag));
+    QuantilesSketch sketch = getSketch(resultTuple);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 2);
+  }
+
+  // end of tests
+
+  private static QuantilesSketch getSketch(Tuple tuple) throws Exception {
+    Assert.assertNotNull(tuple);
+    Assert.assertEquals(tuple.size(), 1);
+    DataByteArray bytes = (DataByteArray) tuple.get(0);
+    Assert.assertTrue(bytes.size() > 0);
+    return QuantilesSketch.heapify(new NativeMemory(bytes.get()));
+  }
+}