Merge pull request #14 from DataSketches/quantiles
Quantiles Sketch UDFs
diff --git a/pom.xml b/pom.xml
index b4bf1a0..0239ee5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,7 +55,7 @@
<dependency>
<groupId>com.yahoo.datasketches</groupId>
<artifactId>sketches-core</artifactId>
- <version>0.5.1</version>
+ <version>0.5.2</version>
</dependency>
<!-- hadoop -->
diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/DataToSketch.java b/src/main/java/com/yahoo/sketches/pig/quantiles/DataToSketch.java
new file mode 100644
index 0000000..f8e04b3
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/quantiles/DataToSketch.java
@@ -0,0 +1,327 @@
+/*
+ * Copyright 2016, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+package com.yahoo.sketches.pig.quantiles;
+
+import java.io.IOException;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import com.yahoo.sketches.memory.NativeMemory;
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+import com.yahoo.sketches.quantiles.Union;
+import com.yahoo.sketches.quantiles.UnionBuilder;
+
+/**
+ * This is a Pig UDF that builds Sketches from data.
+ * To assist Pig, this class implements both the <i>Accumulator</i> and <i>Algebraic</i> interfaces.
+ */
+public class DataToSketch extends EvalFunc<Tuple> implements Accumulator<Tuple>, Algebraic {
+
+ private static final TupleFactory tupleFactory_ = TupleFactory.getInstance();
+
+ // With the single exception of the Accumulator interface, UDFs are stateless.
+ // All parameters kept at the class level must be final, except for the accumUnion.
+ private final UnionBuilder unionBuilder_;
+ private Union accumUnion_;
+
+ // TOP LEVEL API
+
+ /**
+ * Default constructor. Assumes default k.
+ */
+ public DataToSketch() {
+ this(0);
+ }
+
+ /**
+ * String constructor.
+ *
+ * @param kStr string representation of k
+ */
+ public DataToSketch(final String kStr) {
+ this(Integer.parseInt(kStr));
+ }
+
+ /**
+ * Base constructor.
+ *
+ * @param k parameter that determines the accuracy and size of the sketch.
+ */
+ public DataToSketch(final int k) {
+ super();
+ unionBuilder_ = Union.builder();
+ if (k > 0) unionBuilder_.setK(k);
+ }
+
+ //@formatter:off
+ /**
+ * Top-level exec function.
+ * This method accepts an input Tuple containing a Bag of one or more inner <b>Datum Tuples</b>
+ * and returns a single updated <b>Sketch</b> as a <b>Sketch Tuple</b>.
+ *
+ * <p>
+ * If a large number of calls are anticipated, leveraging either the <i>Algebraic</i> or
+ * <i>Accumulator</i> interfaces is recommended. Pig normally handles this automatically.
+ *
+ * <p>
+ * Internally, this method presents the inner <b>Datum Tuples</b> to a new <b>Union</b>,
+ * which is returned as a <b>Sketch Tuple</b>
+ *
+ * <p>
+ * Types are in the form: Java data type: Pig DataType
+ *
+ * <p>
+ *
+ * <b>Input Tuple</b>
+ * <ul>
+ * <li>Tuple: TUPLE (Must contain only one field)
+ * <ul>
+ * <li>index 0: DataBag: BAG (May contain 0 or more Inner Tuples)
+ * <ul>
+ * <li>index 0: Tuple: TUPLE <b>Datum Tuple</b></li>
+ * <li>...</li>
+ * <li>index n-1: Tuple: TUPLE <b>Datum Tuple</b></li>
+ * </ul>
+ * </li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * <b>Datum Tuple</b>
+ * <ul>
+ * <li>Tuple: TUPLE (Must contain only one field)
+ * <ul>
+ * <li>index 0: Double: DOUBLE</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * <b>Sketch Tuple</b>
+ * <ul>
+ * <li>Tuple: TUPLE (Contains exactly 1 field)
+ * <ul>
+ * <li>index 0: DataByteArray: BYTEARRAY = a serialized QuantilesSketch object.</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * @param inputTuple A tuple containing a single bag, containing Datum Tuples.
+ * @return Sketch Tuple. If inputTuple is null or empty, returns empty sketch.
+ * @see "org.apache.pig.EvalFunc.exec(org.apache.pig.data.Tuple)"
+ * @throws IOException from Pig.
+ */
+ // @formatter:on
+
+ @Override // TOP LEVEL EXEC
+ public Tuple exec(final Tuple inputTuple) throws IOException {
+ //The exec is a stateless function. It operates on the input and returns a result.
+ if (inputTuple != null && inputTuple.size() > 0) {
+ final Union union = unionBuilder_.build();
+ final DataBag bag = (DataBag) inputTuple.get(0);
+ for (final Tuple innerTuple: bag) union.update((Double) innerTuple.get(0));
+ final QuantilesSketch resultSketch = union.getResultAndReset();
+ if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));
+ }
+ // return empty sketch
+ return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));
+ }
+
+ @Override
+ public Schema outputSchema(final Schema input) {
+ if (input == null) return null;
+ try {
+ final Schema tupleSchema = new Schema();
+ tupleSchema.add(new Schema.FieldSchema("Sketch", DataType.BYTEARRAY));
+ return new Schema(new Schema.FieldSchema(getSchemaName(
+ this.getClass().getName().toLowerCase(), input), tupleSchema, DataType.TUPLE));
+ } catch (FrontendException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // ACCUMULATOR INTERFACE
+
+ /**
+ * An <i>Accumulator</i> version of the standard <i>exec()</i> method. Like <i>exec()</i>,
+ * accumulator is called with a bag of Datum Tuples. Unlike <i>exec()</i>, it doesn't serialize the
+ * sketch at the end. Instead, it can be called multiple times, each time with another bag of
+ * Datum Tuples to be input to the Union.
+ *
+ * @param inputTuple A tuple containing a single bag, containing Datum Tuples.
+ * @see #exec
+ * @see "org.apache.pig.Accumulator.accumulate(org.apache.pig.data.Tuple)"
+ * @throws IOException by Pig
+ */
+ @Override
+ public void accumulate(final Tuple inputTuple) throws IOException {
+ if (inputTuple == null || inputTuple.size() == 0) return;
+ final DataBag bag = (DataBag) inputTuple.get(0);
+ if (bag == null) return;
+ if (accumUnion_ == null) accumUnion_ = unionBuilder_.build();
+ for (final Tuple innerTuple: bag) accumUnion_.update((Double) innerTuple.get(0));
+ }
+
+ /**
+ * Returns the result of the Union that has been built up by multiple calls to {@link #accumulate}.
+ *
+ * @return Sketch Tuple. (see {@link #exec} for return tuple format)
+ * @see "org.apache.pig.Accumulator.getValue()"
+ */
+ @Override
+ public Tuple getValue() {
+ if (accumUnion_ != null) {
+ final QuantilesSketch resultSketch = accumUnion_.getResultAndReset();
+ if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));
+ }
+ // return empty sketch
+ return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));
+ }
+
+ /**
+ * Cleans up the UDF state after being called using the {@link Accumulator} interface.
+ *
+ * @see "org.apache.pig.Accumulator.cleanup()"
+ */
+ @Override
+ public void cleanup() {
+ accumUnion_ = null;
+ }
+
+ // ALGEBRAIC INTERFACE
+
+ @Override
+ public String getInitial() {
+ return Initial.class.getName();
+ }
+
+ @Override
+ public String getIntermed() {
+ return IntermediateFinal.class.getName();
+ }
+
+ @Override
+ public String getFinal() {
+ return IntermediateFinal.class.getName();
+ }
+
+ // STATIC Initial Class only called by Pig
+
+ /**
+ * Class used to calculate the initial pass of an Algebraic sketch operation.
+ *
+ * <p>
+ * The Initial class simply passes through all records unchanged so that they can be
+ * processed by the intermediate processor instead.</p>
+ */
+ public static class Initial extends EvalFunc<Tuple> {
+ // The Algebraic worker classes (Initial, IntermediateFinal) are static and stateless.
+ // The constructors and final parameters must mirror the parent class as there is no linkage
+ // between them.
+ /**
+ * Default constructor.
+ */
+ public Initial() {}
+
+ /**
+ * Constructor for the initial pass of an Algebraic function. Pig will call this and pass the
+ * same constructor arguments as the base UDF. In this case the arguments are ignored.
+ *
+ * @param kStr string representation of k
+ */
+ public Initial(final String kStr) {}
+
+ @Override // Initial exec
+ public Tuple exec(final Tuple inputTuple) throws IOException {
+ return inputTuple;
+ }
+ }
+
+ // STATIC IntermediateFinal Class only called by Pig
+
+ /**
+ * Class used to calculate the intermediate or final combiner pass of an <i>Algebraic</i> sketch
+ * operation. This is called from the combiner, and may be called multiple times (from the mapper
+ * and from the reducer). It will receive a bag of values returned by either the <i>Intermediate</i>
+ * stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
+ * interpret both types.
+ */
+ public static class IntermediateFinal extends EvalFunc<Tuple> {
+ // The Algebraic worker classes (Initial, IntermediateFinal) are static and stateless.
+ // The constructors and final parameters must mirror the parent class as there is no linkage
+ // between them.
+ private final UnionBuilder unionBuilder_;
+
+ /**
+ * Default constructor. Assumes default k.
+ */
+ public IntermediateFinal() {
+ this(0);
+ }
+
+ /**
+ * Constructor for the intermediate and final passes of an Algebraic function. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ *
+ * @param kStr string representation of k
+ */
+ public IntermediateFinal(final String kStr) {
+ this(Integer.parseInt(kStr));
+ }
+
+ /**
+ * Constructor with primitives for the intermediate and final passes of an Algebraic function.
+ *
+ * @param k parameter that determines the accuracy and size of the sketch.
+ */
+ public IntermediateFinal(final int k) {
+ unionBuilder_ = Union.builder();
+ if (k > 0) unionBuilder_.setK(k);
+ }
+
+ @Override // IntermediateFinal exec
+ public Tuple exec(final Tuple inputTuple) throws IOException { //throws is in API
+ if (inputTuple != null && inputTuple.size() > 0) {
+ final Union union = unionBuilder_.build();
+ final DataBag outerBag = (DataBag) inputTuple.get(0);
+ for (final Tuple dataTuple: outerBag) {
+ final Object f0 = dataTuple.get(0);
+ if (f0 == null) continue;
+ if (f0 instanceof DataBag) {
+ final DataBag innerBag = (DataBag) f0; // inputTuple.bag0.dataTupleN.f0:bag
+ if (innerBag.size() == 0) continue;
+ // If field 0 of a dataTuple is a Bag all innerTuples of this inner bag
+ // will be passed into the union.
+ // It is due to system bagged outputs from multiple mapper Initial functions.
+ // The Intermediate stage was bypassed.
+ for (final Tuple innerTuple: innerBag) union.update((Double) innerTuple.get(0));
+ } else if (f0 instanceof DataByteArray) { // inputTuple.bag0.dataTupleN.f0:DBA
+ // If field 0 of a dataTuple is a DataByteArray we assume it is a sketch
+ // due to system bagged outputs from multiple mapper Intermediate functions.
+ // Each dataTuple.DBA:sketch will merged into the union.
+ final DataByteArray dba = (DataByteArray) f0;
+ union.update(new NativeMemory(dba.get()));
+ } else {
+ throw new IllegalArgumentException("dataTuple.Field0: Is not a DataByteArray: "
+ + f0.getClass().getName());
+ }
+ }
+ final QuantilesSketch resultSketch = union.getResultAndReset();
+ if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));
+ }
+ // return empty sketch
+ return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));
+ }
+ } // end IntermediateFinal
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/GetPMF.java b/src/main/java/com/yahoo/sketches/pig/quantiles/GetPMF.java
new file mode 100644
index 0000000..fde4be0
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/quantiles/GetPMF.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2016, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+package com.yahoo.sketches.pig.quantiles;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.memory.NativeMemory;
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+/**
+ * This UDF is to get an approximation to the Probability Mass Function (PMF) of the input stream
+ * given a sketch and a set of split points - an array of <i>m</i> unique, monotonically increasing
+ * double values that divide the real number line into <i>m+1</i> consecutive disjoint intervals.
+ * The function returns an array of m+1 doubles each of which is an approximation to the fraction
+ * of the input stream values that fell into one of those intervals. Intervals are inclusive of
+ * the left split point and exclusive of the right split point.
+ */
+public class GetPMF extends EvalFunc<Tuple> {
+
+ @Override
+ public Tuple exec(final Tuple input) throws IOException {
+ if (input.size() < 2) throw new IllegalArgumentException("expected two or more inputs: sketch and list of split points");
+
+ if (!(input.get(0) instanceof DataByteArray)) throw new IllegalArgumentException("expected a DataByteArray as a sketch, got " + input.get(0).getClass().getSimpleName());
+ final DataByteArray dba = (DataByteArray) input.get(0);
+ final QuantilesSketch sketch = QuantilesSketch.heapify(new NativeMemory(dba.get()));
+
+ double[] splitPoints = new double[input.size() - 1];
+ for (int i = 1; i < input.size(); i++) {
+ if (!(input.get(i) instanceof Double)) throw new IllegalArgumentException("expected a double value as a split point, got " + input.get(i).getClass().getSimpleName());
+ splitPoints[i - 1] = (double) input.get(i);
+ }
+ return Util.doubleArrayToTuple(sketch.getPMF(splitPoints));
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/GetQuantile.java b/src/main/java/com/yahoo/sketches/pig/quantiles/GetQuantile.java
new file mode 100644
index 0000000..be58b6f
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/quantiles/GetQuantile.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2016, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+package com.yahoo.sketches.pig.quantiles;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.memory.NativeMemory;
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+/**
+ * This UDF is to get a quantile value from a QuantilesSketch. A single value for a
+ * given fraction is returned. The fraction represents a normalized rank and must be
+ * from 0 to 1 inclusive. For example, the fraction of 0.5 corresponds to 50th percentile,
+ * which is the median value of the distribution (the number separating the higher half
+ * of the probability distribution from the lower half).
+ */
+public class GetQuantile extends EvalFunc<Double> {
+
+ @Override
+ public Double exec(final Tuple input) throws IOException {
+ if (input.size() != 2) throw new IllegalArgumentException("expected two inputs: sketch and fraction");
+
+ if (!(input.get(0) instanceof DataByteArray)) throw new IllegalArgumentException("expected a DataByteArray as a sketch, got " + input.get(0).getClass().getSimpleName());
+ final DataByteArray dba = (DataByteArray) input.get(0);
+ final QuantilesSketch sketch = QuantilesSketch.heapify(new NativeMemory(dba.get()));
+
+ if (!(input.get(1) instanceof Double)) throw new IllegalArgumentException("expected a double value as a fraction, got " + input.get(1).getClass().getSimpleName());
+ final double fraction = (double) input.get(1);
+ return sketch.getQuantile(fraction);
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/GetQuantiles.java b/src/main/java/com/yahoo/sketches/pig/quantiles/GetQuantiles.java
new file mode 100644
index 0000000..6f3efe9
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/quantiles/GetQuantiles.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2016, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+package com.yahoo.sketches.pig.quantiles;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.memory.NativeMemory;
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+/**
+ * This UDF is to get a list of quantile values from a QuantileSketch given a list of fractions.
+ * The fractions represent normalized ranks and must be from 0 to 1 inclusive. For example,
+ * the fraction of 0.5 corresponds to 50th percentile, which is the median value of the
+ * distribution (the number separating the higher half of the probability distribution
+ * from the lower half).
+ */
+public class GetQuantiles extends EvalFunc<Tuple> {
+
+ @Override
+ public Tuple exec(final Tuple input) throws IOException {
+ if (input.size() < 2) throw new IllegalArgumentException("expected two or more inputs: sketch and list of fractions");
+
+ if (!(input.get(0) instanceof DataByteArray)) throw new IllegalArgumentException("expected a DataByteArray as a sketch, got " + input.get(0).getClass().getSimpleName());
+ final DataByteArray dba = (DataByteArray) input.get(0);
+ final QuantilesSketch sketch = QuantilesSketch.heapify(new NativeMemory(dba.get()));
+
+ double[] fractions = new double[input.size() - 1];
+ for (int i = 1; i < input.size(); i++) {
+ if (!(input.get(i) instanceof Double)) throw new IllegalArgumentException("expected a double value as a fraction, got " + input.get(i).getClass().getSimpleName());
+ fractions[i - 1] = (double) input.get(i);
+ }
+ return Util.doubleArrayToTuple(sketch.getQuantiles(fractions));
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/Merge.java b/src/main/java/com/yahoo/sketches/pig/quantiles/Merge.java
new file mode 100644
index 0000000..fbd18ea
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/quantiles/Merge.java
@@ -0,0 +1,341 @@
+/*
+ * Copyright 2016, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+package com.yahoo.sketches.pig.quantiles;
+
+import java.io.IOException;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import com.yahoo.sketches.memory.NativeMemory;
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+import com.yahoo.sketches.quantiles.Union;
+import com.yahoo.sketches.quantiles.UnionBuilder;
+
+/**
+ * This is a Pig UDF that merges Quantiles Sketches.
+ * To assist Pig, this class implements both the <i>Accumulator</i> and <i>Algebraic</i> interfaces.
+ */
+public class Merge extends EvalFunc<Tuple> implements Accumulator<Tuple>, Algebraic {
+
+ private static final TupleFactory tupleFactory_ = TupleFactory.getInstance();
+
+ // With the single exception of the Accumulator interface, UDFs are stateless.
+ // All parameters kept at the class level must be final, except for the accumUnion.
+ private final UnionBuilder unionBuilder_;
+ private Union accumUnion_;
+
+ //TOP LEVEL API
+
+ /**
+ * Default constructor. Assumes default k.
+ */
+ public Merge() {
+ this(0);
+ }
+
+ /**
+ * String constructor.
+ *
+ * @param kStr string representation of k
+ */
+ public Merge(final String kStr) {
+ this(Integer.parseInt(kStr));
+ }
+
+ /**
+ * Base constructor.
+ *
+ * @param k parameter that determines the accuracy and size of the sketch.
+ */
+ public Merge(final int k) {
+ super();
+ unionBuilder_ = Union.builder();
+ if (k > 0) unionBuilder_.setK(k);
+ }
+
+ //@formatter:off
+ /**
+ * Top-level exec function.
+ * This method accepts an input Tuple containing a Bag of one or more inner <b>Sketch Tuples</b>
+ * and returns a single updated <b>Sketch</b> as a <b>Sketch Tuple</b>.
+ *
+ * <p>
+ * If a large number of calls are anticipated, leveraging either the <i>Algebraic</i> or
+ * <i>Accumulator</i> interfaces is recommended. Pig normally handles this automatically.
+ *
+ * <p>
+ * Internally, this method presents the inner <b>Sketch Tuples</b> to a new <b>Union</b>.
+ * The result is returned as a <b>Sketch Tuple</b>
+ *
+ * <p>
+ * Types are in the form: Java data type: Pig DataType
+ *
+ * <p>
+ *
+ * <b>Input Tuple</b>
+ * <ul>
+ * <li>Tuple: TUPLE (Must contain only one field)
+ * <ul>
+ * <li>index 0: DataBag: BAG (May contain 0 or more Inner Tuples)
+ * <ul>
+ * <li>index 0: Tuple: TUPLE <b>Sketch Tuple</b></li>
+ * <li>...</li>
+ * <li>index n-1: Tuple: TUPLE <b>Sketch Tuple</b></li>
+ * </ul>
+ * </li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * <b>Sketch Tuple</b>
+ * <ul>
+ * <li>Tuple: TUPLE (Contains exactly 1 field)
+ * <ul>
+ * <li>index 0: DataByteArray: BYTEARRAY = The serialization of a Sketch object.</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * @param inputTuple A tuple containing a single bag, containing Sketch Tuples.
+ * @return Sketch Tuple. If inputTuple is null or empty, returns empty sketch.
+ * @see "org.apache.pig.EvalFunc.exec(org.apache.pig.data.Tuple)"
+ */
+ //@formatter:on
+
+ @Override // TOP LEVEL EXEC
+ public Tuple exec(final Tuple inputTuple) throws IOException {
+ //The exec is a stateless function. It operates on the input and returns a result.
+ if (inputTuple != null && inputTuple.size() > 0) {
+ final Union union = unionBuilder_.build();
+ final DataBag bag = (DataBag) inputTuple.get(0);
+ updateUnion(bag, union);
+ final QuantilesSketch resultSketch = union.getResultAndReset();
+ if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));
+ }
+ // return empty sketch
+ return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));
+ }
+
+ @Override
+ public Schema outputSchema(final Schema input) {
+ if (input == null) return null;
+ try {
+ final Schema tupleSchema = new Schema();
+ tupleSchema.add(new Schema.FieldSchema("Sketch", DataType.BYTEARRAY));
+ return new Schema(new Schema.FieldSchema(getSchemaName(
+ this.getClass().getName().toLowerCase(), input), tupleSchema, DataType.TUPLE));
+ } catch (FrontendException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ //ACCUMULATOR INTERFACE
+
+ /**
+ * An <i>Accumulator</i> version of the standard <i>exec()</i> method. Like <i>exec()</i>,
+ * accumulator is called with a bag of Sketch Tuples. Unlike <i>exec()</i>, it doesn't serialize the
+ * sketch at the end. Instead, it can be called multiple times, each time with another bag of
+ * Sketch Tuples to be input to the Union.
+ *
+ * @param inputTuple A tuple containing a single bag, containing Sketch Tuples.
+ * @see #exec
+ * @see "org.apache.pig.Accumulator.accumulate(org.apache.pig.data.Tuple)"
+ * @throws IOException by Pig
+ */
+ @Override
+ public void accumulate(final Tuple inputTuple) throws IOException {
+ if (inputTuple == null || inputTuple.size() == 0) return;
+ final DataBag bag = (DataBag) inputTuple.get(0);
+ if (bag == null) return;
+ if (accumUnion_ == null) accumUnion_ = unionBuilder_.build();
+ updateUnion(bag, accumUnion_);
+ }
+
+ /**
+ * Returns the result of the Union that has been built up by multiple calls to {@link #accumulate}.
+ *
+ * @return Sketch Tuple. (see {@link #exec} for return tuple format)
+ * @see "org.apache.pig.Accumulator.getValue()"
+ */
+ @Override
+ public Tuple getValue() {
+ if (accumUnion_ != null) {
+ final QuantilesSketch resultSketch = accumUnion_.getResultAndReset();
+ if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));
+ }
+ // return empty sketch
+ return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));
+ }
+
+ /**
+ * Cleans up the UDF state after being called using the {@link Accumulator} interface.
+ *
+ * @see "org.apache.pig.Accumulator.cleanup()"
+ */
+ @Override
+ public void cleanup() {
+ accumUnion_ = null;
+ }
+
+ //ALGEBRAIC INTERFACE
+
+ @Override
+ public String getInitial() {
+ return Initial.class.getName();
+ }
+
+ @Override
+ public String getIntermed() {
+ return IntermediateFinal.class.getName();
+ }
+
+ @Override
+ public String getFinal() {
+ return IntermediateFinal.class.getName();
+ }
+
+ //TOP LEVEL PRIVATE STATIC METHODS
+
+ /**
+ * Updates a union from a bag of sketches
+ *
+ * @param bag A bag of sketchTuples.
+ * @param union The union to update
+ */
+ private static void updateUnion(DataBag bag, Union union) throws ExecException {
+ for (Tuple innerTuple: bag) {
+ final Object f0 = innerTuple.get(0);
+ if (f0 == null) continue;
+ if (f0 instanceof DataByteArray) {
+ final DataByteArray dba = (DataByteArray) f0;
+ if (dba.size() > 0) {
+ union.update(new NativeMemory(dba.get()));
+ }
+ } else {
+ throw new IllegalArgumentException("Field type was not DataType.BYTEARRAY: " + innerTuple.getType(0));
+ }
+ }
+ }
+
+ //STATIC Initial Class only called by Pig
+
+ /**
+ * Class used to calculate the initial pass of an Algebraic sketch operation.
+ *
+ * <p>
+ * The Initial class simply passes through all records unchanged so that they can be
+ * processed by the intermediate processor instead.</p>
+ */
+ public static class Initial extends EvalFunc<Tuple> {
+ // The Algebraic worker classes (Initial, IntermediateFinal) are static and stateless.
+ // The constructors and final parameters must mirror the parent class as there is no linkage
+ // between them.
+ /**
+ * Default constructor.
+ */
+ public Initial() {}
+
+ /**
+ * Constructor for the initial pass of an Algebraic function. Pig will call this and pass the
+ * same constructor arguments as the base UDF. In this case the arguments are ignored.
+ *
+ * @param kStr string representation of k
+ */
+ public Initial(final String kStr) {}
+
+ @Override // Initial exec
+ public Tuple exec(final Tuple inputTuple) throws IOException {
+ return inputTuple;
+ }
+ }
+
+ // STATIC IntermediateFinal Class only called by Pig
+
+ /**
+ * Class used to calculate the intermediate or final combiner pass of an <i>Algebraic</i> union
+ * operation. This is called from the combiner, and may be called multiple times (from the mapper
+ * and from the reducer). It will receive a bag of values returned by either the <i>Intermediate</i>
+ * stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
+ * interpret both types.
+ */
+ public static class IntermediateFinal extends EvalFunc<Tuple> {
+ // The Algebraic worker classes (Initial, IntermediateFinal) are static and stateless.
+ // The constructors and final parameters must mirror the parent class as there is no linkage
+ // between them.
+ private final UnionBuilder unionBuilder_;
+
+ /**
+ * Default constructor. Assumes default k.
+ */
+ public IntermediateFinal() {
+ this(0);
+ }
+
+ /**
+ * Constructor for the intermediate and final passes of an Algebraic function. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ *
+ * @param kStr string representation of k
+ */
+ public IntermediateFinal(final String kStr) {
+ this(Integer.parseInt(kStr));
+ }
+
+ /**
+ * Constructor with primitives for the intermediate and final passes of an Algebraic function.
+ *
+ * @param k parameter that determines the accuracy and size of the sketch.
+ */
+ public IntermediateFinal(final int k) {
+ unionBuilder_ = Union.builder();
+ if (k > 0) unionBuilder_.setK(k);
+ }
+
+ @Override // IntermediateFinal exec
+ public Tuple exec(final Tuple inputTuple) throws IOException {
+ if (inputTuple != null && inputTuple.size() > 0) {
+ final Union union = unionBuilder_.build();
+ final DataBag outerBag = (DataBag) inputTuple.get(0);
+ for (final Tuple dataTuple: outerBag) {
+ final Object f0 = dataTuple.get(0);
+ if (f0 == null) continue;
+ if (f0 instanceof DataBag) {
+ final DataBag innerBag = (DataBag) f0; //inputTuple.bag0.dataTupleN.f0:bag
+ if (innerBag.size() == 0) continue;
+ // If field 0 of a dataTuple is again a Bag all tuples of this inner bag
+ // will be passed into the union.
+ // It is due to system bagged outputs from multiple mapper Initial functions.
+ // The Intermediate stage was bypassed.
+ updateUnion(innerBag, union);
+ } else if (f0 instanceof DataByteArray) { //inputTuple.bag0.dataTupleN.f0:DBA
+ // If field 0 of a dataTuple is a DataByteArray we assume it is a sketch from a prior call
+ // It is due to system bagged outputs from multiple mapper Intermediate functions.
+ // Each dataTuple.DBA:sketch will merged into the union.
+ final DataByteArray dba = (DataByteArray) f0;
+ union.update(new NativeMemory(dba.get()));
+ } else {
+ throw new IllegalArgumentException("dataTuple.Field0: Is not a DataByteArray: "
+ + f0.getClass().getName());
+ }
+ }
+ final QuantilesSketch resultSketch = union.getResultAndReset();
+ if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));
+ }
+ // return empty sketch
+ return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));
+ }
+ } // end IntermediateFinal
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/Util.java b/src/main/java/com/yahoo/sketches/pig/quantiles/Util.java
new file mode 100644
index 0000000..9922a5e
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/quantiles/Util.java
@@ -0,0 +1,17 @@
+package com.yahoo.sketches.pig.quantiles;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+public class Util {
+
+ static Tuple doubleArrayToTuple(double[] array) throws ExecException {
+ Tuple tuple = TupleFactory.getInstance().newTuple(array.length);
+ for (int i = 0; i < array.length; i++) {
+ tuple.set(i, array[i]);
+ }
+ return tuple;
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/quantiles/DataToSketchTest.java b/src/test/java/com/yahoo/sketches/pig/quantiles/DataToSketchTest.java
new file mode 100644
index 0000000..0e0423c
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/quantiles/DataToSketchTest.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2016, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+package com.yahoo.sketches.pig.quantiles;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.yahoo.sketches.memory.NativeMemory;
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+public class DataToSketchTest {
+ private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+ private static final BagFactory bagFactory = BagFactory.getInstance();
+
+ @Test
+ public void execNullInputTuple() throws Exception {
+ EvalFunc<Tuple> func = new DataToSketch();
+ Tuple resultTuple = func.exec(null);
+ QuantilesSketch sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void execEmptyInputTuple() throws Exception {
+ EvalFunc<Tuple> func = new DataToSketch();
+ Tuple resultTuple = func.exec(tupleFactory.newTuple());
+ QuantilesSketch sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void execEmptyBag() throws Exception {
+ EvalFunc<Tuple> func = new DataToSketch();
+ Tuple resultTuple = func.exec(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+ QuantilesSketch sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void execNormalCase() throws Exception {
+ EvalFunc<Tuple> func = new DataToSketch();
+ DataBag bag = bagFactory.newDefaultBag();
+ bag.add(tupleFactory.newTuple(1.0));
+ Tuple resultTuple = func.exec(tupleFactory.newTuple(bag));
+ QuantilesSketch sketch = getSketch(resultTuple);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 1);
+ }
+
+ @Test
+ public void accumulator() throws Exception {
+ Accumulator<Tuple> func = new DataToSketch();
+
+ // no input yet
+ Tuple resultTuple = func.getValue();
+ QuantilesSketch sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // null input tuple
+ func.accumulate(null);
+ resultTuple = func.getValue();
+ sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // empty input tuple
+ func.accumulate(tupleFactory.newTuple());
+ resultTuple = func.getValue();
+ sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // empty bag
+ func.accumulate(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+ resultTuple = func.getValue();
+ sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // normal case
+ DataBag bag = bagFactory.newDefaultBag();
+ bag.add(tupleFactory.newTuple(1.0));
+ func.accumulate(tupleFactory.newTuple(bag));
+ func.accumulate(tupleFactory.newTuple(bag));
+ resultTuple = func.getValue();
+ sketch = getSketch(resultTuple);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 2);
+
+ // cleanup
+ func.cleanup();
+ resultTuple = func.getValue();
+ sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicInitial() throws Exception {
+ EvalFunc<Tuple> func = new DataToSketch.Initial();
+ DataBag bag = bagFactory.newDefaultBag();
+ bag.add(tupleFactory.newTuple());
+ Tuple resultTuple = func.exec(tupleFactory.newTuple(bag));
+ Assert.assertNotNull(resultTuple);
+ Assert.assertEquals(resultTuple.size(), 1);
+ Assert.assertTrue(resultTuple.get(0) instanceof DataBag);
+ Assert.assertEquals(((DataBag) resultTuple.get(0)).size(), 1);
+ }
+
+ @Test
+ public void algebraicIntermediateFinalNullInputTuple() throws Exception {
+ EvalFunc<Tuple> func = new DataToSketch.IntermediateFinal();
+ Tuple resultTuple = func.exec(null);
+ QuantilesSketch sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicIntermediateFinalEmptyInputTuple() throws Exception {
+ EvalFunc<Tuple> func = new DataToSketch.IntermediateFinal();
+ Tuple resultTuple = func.exec(tupleFactory.newTuple());
+ QuantilesSketch sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicIntermediateFinalNormalCase() throws Exception {
+ EvalFunc<Tuple> func = new DataToSketch.IntermediateFinal();
+ DataBag bag = bagFactory.newDefaultBag();
+
+ { // this is to simulate an output from Initial
+ DataBag innerBag = bagFactory.newDefaultBag();
+ innerBag.add(tupleFactory.newTuple(1.0));
+ bag.add(tupleFactory.newTuple(innerBag));
+ }
+
+ { // this is to simulate an output from a prior call of IntermediateFinal
+ QuantilesSketch qs = QuantilesSketch.builder().build();
+ qs.update(2.0);
+ bag.add(tupleFactory.newTuple(new DataByteArray(qs.toByteArray())));
+ }
+
+ Tuple resultTuple = func.exec(tupleFactory.newTuple(bag));
+ QuantilesSketch sketch = getSketch(resultTuple);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 2);
+ }
+
+ // end of tests
+
+ private static QuantilesSketch getSketch(Tuple tuple) throws Exception {
+ Assert.assertNotNull(tuple);
+ Assert.assertEquals(tuple.size(), 1);
+ DataByteArray bytes = (DataByteArray) tuple.get(0);
+ Assert.assertTrue(bytes.size() > 0);
+ return QuantilesSketch.heapify(new NativeMemory(bytes.get()));
+ }
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/quantiles/GetPMFTest.java b/src/test/java/com/yahoo/sketches/pig/quantiles/GetPMFTest.java
new file mode 100644
index 0000000..85f8618
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/quantiles/GetPMFTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2016, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+package com.yahoo.sketches.pig.quantiles;
+
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+public class GetPMFTest {
+ private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+
+ @Test
+ public void emptySketch() throws Exception {
+ EvalFunc<Tuple> func = new GetPMF();
+ QuantilesSketch sketch = QuantilesSketch.builder().build();
+ Tuple resultTuple = func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.5)));
+ Assert.assertNotNull(resultTuple);
+ Assert.assertEquals(resultTuple.size(), 2);
+ Assert.assertEquals(((double) resultTuple.get(0)), Double.NaN);
+ Assert.assertEquals(((double) resultTuple.get(1)), Double.NaN);
+ }
+
+ @Test
+ public void normalCase() throws Exception {
+ EvalFunc<Tuple> func = new GetPMF();
+ QuantilesSketch sketch = QuantilesSketch.builder().build();
+ for (int i = 1; i <= 10; i++) sketch.update(i);
+ Tuple resultTuple = func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 2.0, 7.0)));
+ Assert.assertNotNull(resultTuple);
+ Assert.assertEquals(resultTuple.size(), 3);
+ Assert.assertEquals(((double) resultTuple.get(0)), 0.1);
+ Assert.assertEquals(((double) resultTuple.get(1)), 0.5);
+ Assert.assertEquals(((double) resultTuple.get(2)), 0.4);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongNumberOfInputs() throws Exception {
+ EvalFunc<Tuple> func = new GetPMF();
+ func.exec(tupleFactory.newTuple(1));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeForSketch() throws Exception {
+ EvalFunc<Tuple> func = new GetPMF();
+ func.exec(tupleFactory.newTuple(Arrays.asList(1.0, 1.0)));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeForFraction() throws Exception {
+ EvalFunc<Tuple> func = new GetPMF();
+ QuantilesSketch sketch = QuantilesSketch.builder().build();
+ func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 1)));
+ }
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/quantiles/GetQuantileTest.java b/src/test/java/com/yahoo/sketches/pig/quantiles/GetQuantileTest.java
new file mode 100644
index 0000000..e5ee1a1
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/quantiles/GetQuantileTest.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2016, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+package com.yahoo.sketches.pig.quantiles;
+
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+public class GetQuantileTest {
+ private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+
+ @Test
+ public void emptySketch() throws Exception {
+ EvalFunc<Double> func = new GetQuantile();
+ QuantilesSketch sketch = QuantilesSketch.builder().build();
+ Double result = func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.0)));
+ Assert.assertEquals(result, Double.POSITIVE_INFINITY);
+ }
+
+ @Test
+ public void normalCase() throws Exception {
+ EvalFunc<Double> func = new GetQuantile();
+ QuantilesSketch sketch = QuantilesSketch.builder().build();
+ sketch.update(1.0);
+ Double result = func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.5)));
+ Assert.assertEquals(result, 1.0);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongNumberOfInputs() throws Exception {
+ EvalFunc<Double> func = new GetQuantile();
+ func.exec(tupleFactory.newTuple(1));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeForSketch() throws Exception {
+ EvalFunc<Double> func = new GetQuantile();
+ func.exec(tupleFactory.newTuple(Arrays.asList(1.0, 1.0)));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeForFraction() throws Exception {
+ EvalFunc<Double> func = new GetQuantile();
+ QuantilesSketch sketch = QuantilesSketch.builder().build();
+ func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 1)));
+ }
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/quantiles/GetQuantilesTest.java b/src/test/java/com/yahoo/sketches/pig/quantiles/GetQuantilesTest.java
new file mode 100644
index 0000000..cba2c42
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/quantiles/GetQuantilesTest.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2016, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+package com.yahoo.sketches.pig.quantiles;
+
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+public class GetQuantilesTest {
+ private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+
+ @Test
+ public void emptySketch() throws Exception {
+ EvalFunc<Tuple> func = new GetQuantiles();
+ QuantilesSketch sketch = QuantilesSketch.builder().build();
+ Tuple resultTuple = func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.5)));
+ Assert.assertNotNull(resultTuple);
+ Assert.assertEquals(resultTuple.size(), 1);
+ Assert.assertEquals(((double) resultTuple.get(0)), Double.NaN);
+ }
+
+ @Test
+ public void normalCase() throws Exception {
+ EvalFunc<Tuple> func = new GetQuantiles();
+ QuantilesSketch sketch = QuantilesSketch.builder().build();
+ for (int i = 1; i <= 10; i++) sketch.update(i);
+ Tuple resultTuple = func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.0, 0.5, 1.0)));
+ Assert.assertNotNull(resultTuple);
+ Assert.assertEquals(resultTuple.size(), 3);
+ Assert.assertEquals(((double) resultTuple.get(0)), 1.0);
+ Assert.assertEquals(((double) resultTuple.get(1)), 6.0);
+ Assert.assertEquals(((double) resultTuple.get(2)), 10.0);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongNumberOfInputs() throws Exception {
+ EvalFunc<Tuple> func = new GetQuantiles();
+ func.exec(tupleFactory.newTuple(1));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeForSketch() throws Exception {
+ EvalFunc<Tuple> func = new GetQuantiles();
+ func.exec(tupleFactory.newTuple(Arrays.asList(1.0, 1.0)));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeForFraction() throws Exception {
+ EvalFunc<Tuple> func = new GetQuantiles();
+ QuantilesSketch sketch = QuantilesSketch.builder().build();
+ func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 1)));
+ }
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/quantiles/MergeTest.java b/src/test/java/com/yahoo/sketches/pig/quantiles/MergeTest.java
new file mode 100644
index 0000000..98fc699
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/quantiles/MergeTest.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2016, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+package com.yahoo.sketches.pig.quantiles;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.yahoo.sketches.memory.NativeMemory;
+import com.yahoo.sketches.quantiles.QuantilesSketch;
+
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+public class MergeTest {
+ private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+ private static final BagFactory bagFactory = BagFactory.getInstance();
+
+ @Test
+ public void execNullInputTuple() throws Exception {
+ EvalFunc<Tuple> func = new Merge();
+ Tuple resultTuple = func.exec(null);
+ QuantilesSketch sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void execEmptyInputTuple() throws Exception {
+ EvalFunc<Tuple> func = new Merge();
+ Tuple resultTuple = func.exec(tupleFactory.newTuple());
+ QuantilesSketch sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void execEmptyBag() throws Exception {
+ EvalFunc<Tuple> func = new Merge();
+ Tuple resultTuple = func.exec(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+ QuantilesSketch sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void execNormalCase() throws Exception {
+ EvalFunc<Tuple> func = new Merge();
+ DataBag bag = bagFactory.newDefaultBag();
+ QuantilesSketch inputSketch = QuantilesSketch.builder().build();
+ inputSketch.update(1.0);
+ bag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toByteArray())));
+ Tuple resultTuple = func.exec(tupleFactory.newTuple(bag));
+ QuantilesSketch sketch = getSketch(resultTuple);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 1);
+ }
+
+ @Test
+ public void accumulator() throws Exception {
+ Accumulator<Tuple> func = new Merge();
+
+ // no input yet
+ Tuple resultTuple = func.getValue();
+ QuantilesSketch sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // null input tuple
+ func.accumulate(null);
+ resultTuple = func.getValue();
+ sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // empty input tuple
+ func.accumulate(tupleFactory.newTuple());
+ resultTuple = func.getValue();
+ sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // empty bag
+ func.accumulate(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+ resultTuple = func.getValue();
+ sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // normal case
+ DataBag bag = bagFactory.newDefaultBag();
+ QuantilesSketch inputSketch = QuantilesSketch.builder().build();
+ inputSketch.update(1.0);
+ bag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toByteArray())));
+ func.accumulate(tupleFactory.newTuple(bag));
+ func.accumulate(tupleFactory.newTuple(bag));
+ resultTuple = func.getValue();
+ sketch = getSketch(resultTuple);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 2);
+
+ // cleanup
+ func.cleanup();
+ resultTuple = func.getValue();
+ sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicInitial() throws Exception {
+ EvalFunc<Tuple> func = new Merge.Initial();
+ DataBag bag = bagFactory.newDefaultBag();
+ bag.add(tupleFactory.newTuple());
+ Tuple resultTuple = func.exec(tupleFactory.newTuple(bag));
+ Assert.assertNotNull(resultTuple);
+ Assert.assertEquals(resultTuple.size(), 1);
+ Assert.assertTrue(resultTuple.get(0) instanceof DataBag);
+ Assert.assertEquals(((DataBag) resultTuple.get(0)).size(), 1);
+ }
+
+ @Test
+ public void algebraicIntermediateFinalNullInputTuple() throws Exception {
+ EvalFunc<Tuple> func = new Merge.IntermediateFinal();
+ Tuple resultTuple = func.exec(null);
+ QuantilesSketch sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicIntermediateFinalEmptyInputTuple() throws Exception {
+ EvalFunc<Tuple> func = new Merge.IntermediateFinal();
+ Tuple resultTuple = func.exec(tupleFactory.newTuple());
+ QuantilesSketch sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicIntermediateFinalNormalCase() throws Exception {
+ EvalFunc<Tuple> func = new Merge.IntermediateFinal();
+ DataBag bag = bagFactory.newDefaultBag();
+
+ { // this is to simulate an output from Initial
+ DataBag innerBag = bagFactory.newDefaultBag();
+ QuantilesSketch qs = QuantilesSketch.builder().build();
+ qs.update(1.0);
+ innerBag.add(tupleFactory.newTuple(new DataByteArray(qs.toByteArray())));
+ bag.add(tupleFactory.newTuple(innerBag));
+ }
+
+ { // this is to simulate an output from a prior call of IntermediateFinal
+ QuantilesSketch qs = QuantilesSketch.builder().build();
+ qs.update(2.0);
+ bag.add(tupleFactory.newTuple(new DataByteArray(qs.toByteArray())));
+ }
+
+ Tuple resultTuple = func.exec(tupleFactory.newTuple(bag));
+ QuantilesSketch sketch = getSketch(resultTuple);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 2);
+ }
+
+ // end of tests
+
+ private static QuantilesSketch getSketch(Tuple tuple) throws Exception {
+ Assert.assertNotNull(tuple);
+ Assert.assertEquals(tuple.size(), 1);
+ DataByteArray bytes = (DataByteArray) tuple.get(0);
+ Assert.assertTrue(bytes.size() > 0);
+ return QuantilesSketch.heapify(new NativeMemory(bytes.get()));
+ }
+}