Merge pull request #55 from DataSketches/kll-cpc-udfs
KLL and CPC UDFs
diff --git a/pom.xml b/pom.xml
index 6c2db9d..675a21a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,15 +145,14 @@
<dependency>
<groupId>com.yahoo.datasketches</groupId>
<artifactId>sketches-core</artifactId>
- <version>0.11.0</version>
+ <version>0.13.0</version>
</dependency>
<!-- Pig -->
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
- <version>0.16.0</version>
- <classifier>h2</classifier>
+ <version>0.17.0</version>
<scope>provided</scope>
</dependency>
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/AlgebraicFinal.java b/src/main/java/com/yahoo/sketches/pig/cpc/AlgebraicFinal.java
new file mode 100644
index 0000000..aacc381
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/AlgebraicFinal.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+import com.yahoo.sketches.cpc.CpcUnion;
+
+/**
+ * Class used to calculate the final pass of an <i>Algebraic</i> sketch
+ * operation. It will receive a bag of values returned by either the <i>Intermediate</i>
+ * stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
+ * interpret both types.
+ *
+ * @author Alexander Saydakov
+ */
+abstract class AlgebraicFinal extends EvalFunc<DataByteArray> {
+
+ private final int lgK_;
+ private final long seed_;
+ private DataByteArray emptySketch_; // this is to cash an empty sketch tuple
+ private boolean isFirstCall_ = true; // for logging
+
+ /**
+ * Constructor with primitives for the final passes of an Algebraic function.
+ *
+ * @param lgK parameter controlling the sketch size and accuracy
+ * @param seed for the hash function
+ */
+ public AlgebraicFinal(final int lgK, final long seed) {
+ lgK_ = lgK;
+ seed_ = seed;
+ }
+
+ @Override
+ public DataByteArray exec(final Tuple inputTuple) throws IOException {
+ if (isFirstCall_) {
+ Logger.getLogger(getClass()).info("Algebraic was used");
+ isFirstCall_ = false;
+ }
+ final DataByteArray dba = process(inputTuple, lgK_, seed_, isInputRaw());
+ if (dba == null) {
+ return getEmptySketch();
+ }
+ return dba;
+ }
+
+ static DataByteArray process(final Tuple inputTuple, final int lgK, final long seed, boolean isInputRaw) throws IOException {
+ if (inputTuple == null || inputTuple.size() == 0) {
+ return null;
+ }
+ CpcSketch sketch = null;
+ CpcUnion union = null;
+ final DataBag outerBag = (DataBag) inputTuple.get(0);
+ if (outerBag == null) {
+ return null;
+ }
+ for (final Tuple dataTuple: outerBag) {
+ final Object f0 = dataTuple.get(0); // inputTuple.bag0.dataTupleN.f0
+ if (f0 == null) {
+ continue;
+ }
+ if (f0 instanceof DataBag) {
+ final DataBag innerBag = (DataBag) f0; // inputTuple.bag0.dataTupleN.f0:bag
+ if (innerBag.size() == 0) { continue; }
+ // If field 0 of a dataTuple is a Bag, all innerTuples of this inner bag
+ // will be passed into the union.
+ // It is due to system bagged outputs from multiple mapper Initial functions.
+ // The Intermediate stage was bypassed.
+ if (isInputRaw) {
+ if (sketch == null) {
+ sketch = new CpcSketch(lgK, seed);
+ }
+ DataToSketch.updateSketch(innerBag, sketch);
+ } else {
+ if (union == null) {
+ union = new CpcUnion(lgK, seed);
+ }
+ UnionSketch.updateUnion(innerBag, union, seed);
+ }
+ } else if (f0 instanceof DataByteArray) { // inputTuple.bag0.dataTupleN.f0:DBA
+ // If field 0 of a dataTuple is a DataByteArray, we assume it is a sketch
+ // due to system bagged outputs from multiple mapper Intermediate functions.
+ // Each dataTuple.DBA:sketch will merged into the union.
+ final DataByteArray dba = (DataByteArray) f0;
+ if (union == null) {
+ union = new CpcUnion(lgK, seed);
+ }
+ union.update(CpcSketch.heapify(dba.get(), seed));
+ } else { // we should never get here
+ throw new IllegalArgumentException("dataTuple.Field0 is not a DataBag or DataByteArray: "
+ + f0.getClass().getName());
+ }
+ }
+ if (sketch != null && union != null) {
+ union.update(sketch);
+ sketch = null;
+ }
+ if (sketch != null) {
+ return new DataByteArray(sketch.toByteArray());
+ } else if (union != null) {
+ return new DataByteArray(union.getResult().toByteArray());
+ }
+ return null;
+ }
+
+ abstract boolean isInputRaw();
+
+ private DataByteArray getEmptySketch() {
+ if (emptySketch_ == null) {
+ emptySketch_ = new DataByteArray(new CpcSketch(lgK_, seed_).toByteArray());
+ }
+ return emptySketch_;
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/AlgebraicInitial.java b/src/main/java/com/yahoo/sketches/pig/cpc/AlgebraicInitial.java
new file mode 100644
index 0000000..f332dd0
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/AlgebraicInitial.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Class used to calculate the initial pass of an Algebraic sketch operation.
+ *
+ * <p>The Initial class simply passes through all records unchanged so that they can be
+ * processed by the intermediate processor instead.
+ *
+ * @author Alexander Saydakov
+ */
+public class AlgebraicInitial extends EvalFunc<Tuple> {
+
+ private boolean isFirstCall_ = true;
+
+ /**
+ * Default constructor for the initial pass of an Algebraic function.
+ */
+ public AlgebraicInitial() {}
+
+ /**
+ * Constructor for the initial pass of an Algebraic function. Pig will call this and pass the
+ * same constructor arguments as the original UDF. In this case the arguments are ignored.
+ *
+ * @param lgK in a form of a String
+ */
+ public AlgebraicInitial(final String lgK) {}
+
+ /**
+ * Constructor for the initial pass of an Algebraic function. Pig will call this and pass the
+ * same constructor arguments as the original UDF. In this case the arguments are ignored.
+ *
+ * @param lgK in a form of a String
+ * @param seed in a form of a String
+ */
+ public AlgebraicInitial(final String lgK, final String seed) {}
+
+ @Override
+ public Tuple exec(final Tuple inputTuple) throws IOException {
+ if (isFirstCall_) {
+ Logger.getLogger(getClass()).info("Algebraic was used");
+ isFirstCall_ = false;
+ }
+ return inputTuple;
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/AlgebraicIntermediate.java b/src/main/java/com/yahoo/sketches/pig/cpc/AlgebraicIntermediate.java
new file mode 100644
index 0000000..45473ad
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/AlgebraicIntermediate.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+/**
+ * Class used to calculate the intermediate combiner pass of an <i>Algebraic</i> sketch
+ * operation. This is called from the combiner, and may be called multiple times (from a mapper
+ * and from a reducer). It will receive a bag of values returned by either <i>Intermediate</i>
+ * or <i>Initial</i> stages, so it needs to be able to differentiate between and
+ * interpret both types.
+ *
+ * @author Alexander Saydakov
+ */
+abstract class AlgebraicIntermediate extends EvalFunc<Tuple> {
+
+ private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+ private final int lgK_;
+ private final long seed_;
+ private Tuple emptySketchTuple_; // this is to cash an empty sketch tuple
+ private boolean isFirstCall_ = true; // for logging
+
+ /**
+ * Constructor with primitives for the intermediate pass of an Algebraic function.
+ *
+ * @param lgK parameter controlling the sketch size and accuracy
+ * @param seed
+ */
+ public AlgebraicIntermediate(final int lgK, final long seed) {
+ lgK_ = lgK;
+ seed_ = seed;
+ }
+
+ @Override
+ public Tuple exec(final Tuple inputTuple) throws IOException {
+ if (isFirstCall_) {
+ Logger.getLogger(getClass()).info("Algebraic was used");
+ isFirstCall_ = false;
+ }
+ final DataByteArray dba = AlgebraicFinal.process(inputTuple, lgK_, seed_, isInputRaw());
+ if (dba == null) {
+ return getEmptySketchTuple();
+ }
+ return TUPLE_FACTORY.newTuple(dba);
+ }
+
+ abstract boolean isInputRaw();
+
+ private Tuple getEmptySketchTuple() {
+ if (emptySketchTuple_ == null) {
+ emptySketchTuple_ = TUPLE_FACTORY.newTuple(new DataByteArray(
+ new CpcSketch(lgK_, seed_).toByteArray()));
+ }
+ return emptySketchTuple_;
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/DataToSketch.java b/src/main/java/com/yahoo/sketches/pig/cpc/DataToSketch.java
new file mode 100644
index 0000000..5d8aa10
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/DataToSketch.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+/**
+ * This is a Pig UDF that builds sketches from data.
+ * This class implements both the <i>Accumulator</i> and <i>Algebraic</i> interfaces.
+ *
+ * @author Alexander Saydakov
+ */
+public class DataToSketch extends EvalFunc<DataByteArray> implements Accumulator<DataByteArray>, Algebraic {
+
+ private DataByteArray emptySketch_; // this is to cash an empty sketch
+
+ private final int lgK_;
+ private final long seed_;
+ private CpcSketch accumSketch_;
+ private boolean isFirstCall_; // for logging
+
+ /**
+ * Constructor with default lgK and seed
+ */
+ public DataToSketch() {
+ this(CpcSketch.DEFAULT_LG_K, DEFAULT_UPDATE_SEED);
+ }
+
+ /**
+ * Constructor with given lgK as string and default seed
+ *
+ * @param lgK in a form of a String
+ */
+ public DataToSketch(final String lgK) {
+ this(Integer.parseInt(lgK), DEFAULT_UPDATE_SEED);
+ }
+
+ /**
+ * Constructor with given lgK and seed as strings
+ *
+ * @param lgK in a form of a String
+ * @param seed in a form of a String
+ */
+ public DataToSketch(final String lgK, final String seed) {
+ this(Integer.parseInt(lgK), Long.parseLong(seed));
+ }
+
+ /**
+ * Base constructor.
+ *
+ * @param lgK parameter controlling the sketch size and accuracy
+ * @param seed parameter to use during hashing
+ */
+ public DataToSketch(final int lgK, final long seed) {
+ super();
+ lgK_ = lgK;
+ seed_ = seed;
+ }
+
+ /**
+ * Top-level exec function.
+ * This method accepts an input Tuple containing a Bag of one or more inner <b>Datum Tuples</b>
+ * and returns a single serialized CpcSketch as a DataByteArray.
+ *
+ * <b>Datum Tuple</b> is a Tuple containing a single field, which can be one of the following
+ * (Java type: Pig type):
+ * <ul>
+ * <li>Byte: BYTE</li>
+ * <li>Integer: INTEGER</li>
+ * <li>Long: LONG</li>
+ * <li>Float: FLOAT</li>
+ * <li>Double: DOUBLE</li>
+ * <li>String: CHARARRAY</li>
+ * <li>DataByteArray: BYTEARRAY</li>
+ * </ul>
+ *
+ * @param inputTuple A tuple containing a single bag, containing Datum Tuples.
+ * @return serialized CpcSketch
+ * @see "org.apache.pig.EvalFunc.exec(org.apache.pig.data.Tuple)"
+ * @throws IOException from Pig
+ */
+
+ @Override
+ public DataByteArray exec(final Tuple inputTuple) throws IOException {
+ if (isFirstCall_) {
+ Logger.getLogger(getClass()).info("Exec was used");
+ isFirstCall_ = false;
+ }
+ if (inputTuple == null || inputTuple.size() == 0) {
+ if (emptySketch_ == null) {
+ emptySketch_ = new DataByteArray(new CpcSketch(lgK_, seed_).toByteArray());
+ }
+ return emptySketch_;
+ }
+ final CpcSketch sketch = new CpcSketch(lgK_, seed_);
+ final DataBag bag = (DataBag) inputTuple.get(0);
+ updateSketch(bag, sketch);
+ return new DataByteArray(sketch.toByteArray());
+ }
+
+ /**
+ * An <i>Accumulator</i> version of the standard <i>exec()</i> method. Like <i>exec()</i>,
+ * accumulator is called with a bag of Datum Tuples. Unlike <i>exec()</i>, it doesn't serialize the
+ * result at the end. Instead, it can be called multiple times, each time with another bag of
+ * Datum Tuples to be input to the sketch.
+ *
+ * @param inputTuple A tuple containing a single bag, containing Datum Tuples.
+ * @see #exec
+ * @see "org.apache.pig.Accumulator.accumulate(org.apache.pig.data.Tuple)"
+ * @throws IOException by Pig
+ */
+ @Override
+ public void accumulate(final Tuple inputTuple) throws IOException {
+ if (isFirstCall_) {
+ Logger.getLogger(getClass()).info("Accumulator was used");
+ isFirstCall_ = false;
+ }
+ if (inputTuple == null || inputTuple.size() == 0) { return; }
+ final DataBag bag = (DataBag) inputTuple.get(0);
+ if (bag == null) { return; }
+ if (accumSketch_ == null) {
+ accumSketch_ = new CpcSketch(lgK_);
+ }
+ updateSketch(bag, accumSketch_);
+ }
+
+ /**
+ * Returns the sketch that has been built up by multiple calls to {@link #accumulate}.
+ *
+ * @return serialized CpcSketch
+ * @see "org.apache.pig.Accumulator.getValue()"
+ */
+ @Override
+ public DataByteArray getValue() {
+ if (accumSketch_ == null) {
+ if (emptySketch_ == null) {
+ emptySketch_ = new DataByteArray(new CpcSketch(lgK_, seed_).toByteArray());
+ }
+ return emptySketch_;
+ }
+ return new DataByteArray(accumSketch_.toByteArray());
+ }
+
+ /**
+ * Cleans up the UDF state after being called using the {@link Accumulator} interface.
+ *
+ * @see "org.apache.pig.Accumulator.cleanup()"
+ */
+ @Override
+ public void cleanup() {
+ accumSketch_ = null;
+ }
+
+ @Override
+ public String getInitial() {
+ return AlgebraicInitial.class.getName();
+ }
+
+ @Override
+ public String getIntermed() {
+ return DataToSketchAlgebraicIntermediate.class.getName();
+ }
+
+ @Override
+ public String getFinal() {
+ return DataToSketchAlgebraicFinal.class.getName();
+ }
+
+ static void updateSketch(final DataBag bag, final CpcSketch sketch) throws ExecException {
+ // bag is not empty, process each innerTuple in the bag
+ for (final Tuple innerTuple: bag) {
+ final Object f0 = innerTuple.get(0); // consider only field 0
+ if (f0 == null) {
+ continue;
+ }
+ final byte type = innerTuple.getType(0);
+
+ switch (type) {
+ case DataType.NULL:
+ break;
+ case DataType.BYTE:
+ sketch.update((byte) f0);
+ break;
+ case DataType.INTEGER:
+ sketch.update((int) f0);
+ break;
+ case DataType.LONG:
+ sketch.update((long) f0);
+ break;
+ case DataType.FLOAT:
+ sketch.update((float) f0);
+ break;
+ case DataType.DOUBLE:
+ sketch.update((double) f0);
+ break;
+ case DataType.BYTEARRAY: {
+ final DataByteArray dba = (DataByteArray) f0;
+ sketch.update(dba.get());
+ break;
+ }
+ case DataType.CHARARRAY: {
+ final String str = (String) f0;
+ // conversion to char[] avoids costly UTF-8 encoding
+ sketch.update(str.toCharArray());
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("Field 0 of innerTuple must be one of "
+ + "NULL, BYTE, INTEGER, LONG, FLOAT, DOUBLE, BYTEARRAY or CHARARRAY. "
+ + "Given Type = " + DataType.findTypeName(type)
+ + ", Object = " + f0.toString());
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/DataToSketchAlgebraicFinal.java b/src/main/java/com/yahoo/sketches/pig/cpc/DataToSketchAlgebraicFinal.java
new file mode 100644
index 0000000..e425d77
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/DataToSketchAlgebraicFinal.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+public class DataToSketchAlgebraicFinal extends AlgebraicFinal {
+
+ /**
+ * Default constructor for the final pass of an Algebraic function.
+ * Assumes default lgK and seed.
+ */
+ public DataToSketchAlgebraicFinal() {
+ super(CpcSketch.DEFAULT_LG_K, DEFAULT_UPDATE_SEED);
+ }
+
+ /**
+ * Constructor for the final pass of an Algebraic function. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ * Assumes default seed.
+ *
+ * @param lgK in a form of a String
+ */
+ public DataToSketchAlgebraicFinal(final String lgK) {
+ super(Integer.parseInt(lgK), DEFAULT_UPDATE_SEED);
+ }
+
+ /**
+ * Constructor for the final pass of an Algebraic function. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ *
+ * @param lgK parameter controlling the sketch size and accuracy
+ * @param seed for the hash function
+ */
+ public DataToSketchAlgebraicFinal(final String lgK, final String seed) {
+ super(Integer.parseInt(lgK), Long.parseLong(seed));
+ }
+
+ @Override
+ boolean isInputRaw() {
+ return true;
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/DataToSketchAlgebraicIntermediate.java b/src/main/java/com/yahoo/sketches/pig/cpc/DataToSketchAlgebraicIntermediate.java
new file mode 100644
index 0000000..dea8591
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/DataToSketchAlgebraicIntermediate.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+public class DataToSketchAlgebraicIntermediate extends AlgebraicIntermediate {
+
+ /**
+ * Default constructor for the intermediate pass of an Algebraic function.
+ * Assumes default lgK and seed.
+ */
+ public DataToSketchAlgebraicIntermediate() {
+ super(CpcSketch.DEFAULT_LG_K, DEFAULT_UPDATE_SEED);
+ }
+
+ /**
+ * Constructor for the intermediate pass of an Algebraic function. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ * Assumes default seed.
+ *
+ * @param lgK in a form of a String
+ */
+ public DataToSketchAlgebraicIntermediate(final String lgK) {
+ super(Integer.parseInt(lgK), DEFAULT_UPDATE_SEED);
+ }
+
+ /**
+ * Constructor for the intermediate pass of an Algebraic function. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ *
+ * @param lgK parameter controlling the sketch size and accuracy
+ * @param seed for the hash function
+ */
+ public DataToSketchAlgebraicIntermediate(final String lgK, final String seed) {
+ super(Integer.parseInt(lgK), Long.parseLong(seed));
+ }
+
+ @Override
+ boolean isInputRaw() {
+ return true;
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/GetEstimate.java b/src/main/java/com/yahoo/sketches/pig/cpc/GetEstimate.java
new file mode 100644
index 0000000..88ce2a5
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/GetEstimate.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+/**
+ * This is a User Defined Function (UDF) for getting a distinct count estimate from a given CpcdSketch
+ *
+ * @author Alexander Saydakov
+ */
+public class GetEstimate extends EvalFunc<Double> {
+
+ private final long seed_;
+
+ /**
+ * Constructor with default seed
+ */
+ public GetEstimate() {
+ this(DEFAULT_UPDATE_SEED);
+ }
+
+ /**
+ * Constructor with given seed
+ * @param seed in a form of a String
+ */
+ public GetEstimate(final String seed) {
+ this(Long.parseLong(seed));
+ }
+
+ /**
+ * Base constructor
+ * @param seed parameter for the hash function
+ */
+ GetEstimate(final long seed) {
+ seed_ = seed;
+ }
+
+ @Override
+ public Double exec(final Tuple sketchTuple) throws IOException {
+ if ((sketchTuple == null) || (sketchTuple.size() == 0)) {
+ return null;
+ }
+ final DataByteArray dba = (DataByteArray) sketchTuple.get(0);
+ final CpcSketch sketch = CpcSketch.heapify(dba.get(), seed_);
+ return sketch.getEstimate();
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/GetEstimateAndErrorBounds.java b/src/main/java/com/yahoo/sketches/pig/cpc/GetEstimateAndErrorBounds.java
new file mode 100644
index 0000000..be87f1b
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/GetEstimateAndErrorBounds.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+/**
+ * This is a User Defined Function (UDF) for obtaining the distinct count estimate
+ * along with a lower and upper bound from a given CpcSketch.
+ *
+ * <p>The result is a tuple with three double values: estimate, lower bound and upper bound.
+ * The bounds are best estimates for the confidence interval given <i>kappa</i>, which represents
+ * the number of standard deviations from the mean (1, 2 or 3).
+ *
+ * @author Alexander Saydakov
+ */
+public class GetEstimateAndErrorBounds extends EvalFunc<Tuple> {
+
+ private static int DEFAULT_KAPPA = 2;
+
+ private final int kappa_;
+ private final long seed_;
+
+ /**
+ * Constructor with default kappa and seed
+ */
+ public GetEstimateAndErrorBounds() {
+ this(DEFAULT_KAPPA, DEFAULT_UPDATE_SEED);
+ }
+
+ /**
+ * Constructor with given kappa and default seed
+ * @param kappa in a form of a String
+ */
+ public GetEstimateAndErrorBounds(final String kappa) {
+ this(Integer.parseInt(kappa), DEFAULT_UPDATE_SEED);
+ }
+
+ /**
+ * Constructor with given kappa and seed
+ * @param kappa in a form of a String
+ * @param seed in a form of a String
+ */
+ public GetEstimateAndErrorBounds(final String kappa, final String seed) {
+ this(Integer.parseInt(kappa), Long.parseLong(seed));
+ }
+
+ /**
+ * Base constructor
+ * @param kappa the given number of standard deviations from the mean: 1, 2 or 3
+ * @param seed parameter for the hash function
+ */
+ GetEstimateAndErrorBounds(final int kappa, final long seed) {
+ kappa_ = kappa;
+ seed_ = seed;
+ }
+
+ @Override
+ public Tuple exec(final Tuple sketchTuple) throws IOException {
+ if ((sketchTuple == null) || (sketchTuple.size() == 0)) {
+ return null;
+ }
+ final DataByteArray dba = (DataByteArray) sketchTuple.get(0);
+ final CpcSketch sketch = CpcSketch.heapify(dba.get(), seed_);
+ final Tuple outputTuple = TupleFactory.getInstance().newTuple(3);
+ outputTuple.set(0, Double.valueOf(sketch.getEstimate()));
+ outputTuple.set(1, Double.valueOf(sketch.getLowerBound(kappa_)));
+ outputTuple.set(2, Double.valueOf(sketch.getUpperBound(kappa_)));
+ return outputTuple;
+ }
+
+ /**
+ * The output is a Sketch Result Tuple Schema.
+ */
+ @Override
+ public Schema outputSchema(final Schema input) {
+ if (input == null) { return null; }
+ try {
+ final Schema tupleSchema = new Schema();
+ tupleSchema.add(new Schema.FieldSchema("Estimate", DataType.DOUBLE));
+ tupleSchema.add(new Schema.FieldSchema("LowerBound", DataType.DOUBLE));
+ tupleSchema.add(new Schema.FieldSchema("UpperBound", DataType.DOUBLE));
+ return new Schema(new Schema.FieldSchema(getSchemaName(this
+ .getClass().getName().toLowerCase(), input), tupleSchema, DataType.TUPLE));
+ } catch (final FrontendException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/SketchToString.java b/src/main/java/com/yahoo/sketches/pig/cpc/SketchToString.java
new file mode 100644
index 0000000..b82aa24
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/SketchToString.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+/**
+ * This is a User Defined Function (UDF) for printing a human-readable summary of a given CpcSketch
+ * @author Alexander Saydakov
+ */
+public class SketchToString extends EvalFunc<String> {
+
+ private final boolean isDetailed_;
+ private final long seed_;
+
+ /**
+ * Prints only the sketch summary using the default seed.
+ */
+ public SketchToString() {
+ this(false, DEFAULT_UPDATE_SEED);
+ }
+
+ /**
+ * Prints the summary and details using the default seed.
+ *
+ * @param isDetailed flag to print the sketch detail
+ */
+ public SketchToString(final String isDetailed) {
+ this(Boolean.parseBoolean(isDetailed), DEFAULT_UPDATE_SEED);
+ }
+
+ /**
+ * Prints the summary and details using a given seed.
+ *
+ * @param isDetailed flag to print the sketch detail
+ * @param seed parameter for the hash function
+ */
+ public SketchToString(final String isDetailed, final String seed) {
+ this(Boolean.parseBoolean(isDetailed), Long.parseLong(seed));
+ }
+
+ /**
+ * Internal constructor with primitive parameters.
+ *
+ * @param isDetailed flag to print the sketch detail
+ * @param seed parameter for the hash function
+ */
+ private SketchToString(final boolean isDetailed, final long seed) {
+ isDetailed_ = isDetailed;
+ seed_ = seed;
+ }
+
+ @Override
+ public String exec(final Tuple sketchTuple) throws IOException {
+ if ((sketchTuple == null) || (sketchTuple.size() == 0)) {
+ return null;
+ }
+ final DataByteArray dba = (DataByteArray) sketchTuple.get(0);
+ final CpcSketch sketch = CpcSketch.heapify(dba.get(), seed_);
+ return sketch.toString(isDetailed_);
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/UnionSketch.java b/src/main/java/com/yahoo/sketches/pig/cpc/UnionSketch.java
new file mode 100644
index 0000000..c4a9c9d
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/UnionSketch.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+import com.yahoo.sketches.cpc.CpcUnion;
+
+/**
+ * This is a Pig UDF that performs the Union operation on CpcSketches.
+ * This class implements both the <i>Accumulator</i> and <i>Algebraic</i> interfaces.
+ *
+ * @author Alexander Saydakov
+ */
+public class UnionSketch extends EvalFunc<DataByteArray> implements Accumulator<DataByteArray>, Algebraic {
+
+ private DataByteArray emptySketch_; // this is to cash an empty sketch
+
+ private final int lgK_;
+ private final long seed_;
+ private CpcUnion accumUnion_;
+ private boolean isFirstCall_;
+
+ /**
+ * Constructor with default lgK and target HLL type
+ */
+ public UnionSketch() {
+ this(CpcSketch.DEFAULT_LG_K, DEFAULT_UPDATE_SEED);
+ }
+
+ /**
+ * Constructor with given lgK as string and default seed.
+ *
+ * @param lgK in a form of a String
+ */
+ public UnionSketch(final String lgK) {
+ this(Integer.parseInt(lgK), DEFAULT_UPDATE_SEED);
+ }
+
+ /**
+ * Constructor with given lgK and seed as strings
+ *
+ * @param lgK in a form of a String
+ * @param seed in a form of a String
+ */
+ public UnionSketch(final String lgK, final String seed) {
+ this(Integer.parseInt(lgK), Long.parseLong(seed));
+ }
+
+ /**
+ * Base constructor.
+ *
+ * @param lgK parameter controlling the sketch size and accuracy
+ * @param seed for the hash function
+ */
+ public UnionSketch(final int lgK, final long seed) {
+ super();
+ lgK_ = lgK;
+ seed_ = seed;
+ }
+
+ /**
+ * Top-level exec function.
+ * This method accepts an input Tuple containing a Bag of one or more inner <b>Sketch Tuples</b>
+ * and returns a single serialized CpcSketch as a DataByteArray.
+ *
+ * <b>Sketch Tuple</b> is a Tuple containing a single DataByteArray (BYTEARRAY in Pig), which
+ * is a serialized HllSketch.
+ *
+ * @param inputTuple A tuple containing a single bag, containing Sketch Tuples.
+ * @return serialized CpcSketch
+ * @see "org.apache.pig.EvalFunc.exec(org.apache.pig.data.Tuple)"
+ * @throws IOException from Pig.
+ */
+ @Override
+ public DataByteArray exec(final Tuple inputTuple) throws IOException {
+ if (isFirstCall_) {
+ Logger.getLogger(getClass()).info("Exec was used");
+ isFirstCall_ = false;
+ }
+ if (inputTuple == null || inputTuple.size() == 0) {
+ if (emptySketch_ == null) {
+ emptySketch_ = new DataByteArray(new CpcSketch(lgK_, seed_).toByteArray());
+ }
+ return emptySketch_;
+ }
+ final CpcUnion union = new CpcUnion(lgK_, seed_);
+ final DataBag bag = (DataBag) inputTuple.get(0);
+ updateUnion(bag, union, seed_);
+ return new DataByteArray(union.getResult().toByteArray());
+ }
+
+ /**
+ * An <i>Accumulator</i> version of the standard <i>exec()</i> method. Like <i>exec()</i>,
+ * accumulator is called with a bag of Sketch Tuples. Unlike <i>exec()</i>, it doesn't serialize the
+ * result at the end. Instead, it can be called multiple times, each time with another bag of
+ * Sketch Tuples to be input to the union.
+ *
+ * @param inputTuple A tuple containing a single bag, containing Sketch Tuples.
+ * @see #exec
+ * @see "org.apache.pig.Accumulator.accumulate(org.apache.pig.data.Tuple)"
+ * @throws IOException by Pig
+ */
+ @Override
+ public void accumulate(final Tuple inputTuple) throws IOException {
+ if (isFirstCall_) {
+ Logger.getLogger(getClass()).info("Accumulator was used");
+ isFirstCall_ = false;
+ }
+ if (inputTuple == null || inputTuple.size() == 0) { return; }
+ final DataBag bag = (DataBag) inputTuple.get(0);
+ if (bag == null) { return; }
+ if (accumUnion_ == null) {
+ accumUnion_ = new CpcUnion(lgK_, seed_);
+ }
+ updateUnion(bag, accumUnion_, seed_);
+ }
+
+ /**
+ * Returns the sketch that has been built up by multiple calls to {@link #accumulate}.
+ *
+ * @return Sketch Tuple. (see {@link #exec} for return tuple format)
+ * @see "org.apache.pig.Accumulator.getValue()"
+ */
+ @Override
+ public DataByteArray getValue() {
+ if (accumUnion_ == null) {
+ if (emptySketch_ == null) {
+ emptySketch_ = new DataByteArray(
+ new CpcSketch(lgK_, seed_).toByteArray());
+ }
+ return emptySketch_;
+ }
+ return new DataByteArray(accumUnion_.getResult().toByteArray());
+ }
+
+ /**
+ * Cleans up the UDF state after being called using the {@link Accumulator} interface.
+ *
+ * @see "org.apache.pig.Accumulator.cleanup()"
+ */
+ @Override
+ public void cleanup() {
+ accumUnion_ = null;
+ }
+
+ @Override
+ public String getInitial() {
+ return AlgebraicInitial.class.getName();
+ }
+
+ @Override
+ public String getIntermed() {
+ return UnionSketchAlgebraicIntermediate.class.getName();
+ }
+
+ @Override
+ public String getFinal() {
+ return UnionSketchAlgebraicFinal.class.getName();
+ }
+
+ static void updateUnion(final DataBag bag, final CpcUnion union, final long seed) throws ExecException {
+ // Bag is not empty. process each innerTuple in the bag
+ for (final Tuple innerTuple : bag) {
+ final Object f0 = innerTuple.get(0); // consider only field 0
+ if (f0 == null) {
+ continue;
+ }
+ final byte type = innerTuple.getType(0);
+ if (type == DataType.BYTEARRAY) {
+ final DataByteArray dba = (DataByteArray) f0;
+ union.update(CpcSketch.heapify(dba.get(), seed));
+ } else {
+ throw new IllegalArgumentException("Field type was not DataType.BYTEARRAY: " + type);
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/UnionSketchAlgebraicFinal.java b/src/main/java/com/yahoo/sketches/pig/cpc/UnionSketchAlgebraicFinal.java
new file mode 100644
index 0000000..9cbd22d
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/UnionSketchAlgebraicFinal.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+public class UnionSketchAlgebraicFinal extends AlgebraicFinal {
+
+ /**
+ * Default constructor for the final pass of an Algebraic function.
+ * Assumes default lgK and seed.
+ */
+ public UnionSketchAlgebraicFinal() {
+ super(CpcSketch.DEFAULT_LG_K, DEFAULT_UPDATE_SEED);
+ }
+
+ /**
+ * Constructor for the final pass of an Algebraic function. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ * Assumes default seed.
+ *
+ * @param lgK in a form of a String
+ */
+ public UnionSketchAlgebraicFinal(final String lgK) {
+ super(Integer.parseInt(lgK), DEFAULT_UPDATE_SEED);
+ }
+
+ /**
+ * Constructor for the final pass of an Algebraic function. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ *
+ * @param lgK parameter controlling the sketch size and accuracy
+ * @param seed for the hash function
+ */
+ public UnionSketchAlgebraicFinal(final String lgK, final String seed) {
+ super(Integer.parseInt(lgK), Long.parseLong(seed));
+ }
+
+ @Override
+ boolean isInputRaw() {
+ return false;
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/UnionSketchAlgebraicIntermediate.java b/src/main/java/com/yahoo/sketches/pig/cpc/UnionSketchAlgebraicIntermediate.java
new file mode 100644
index 0000000..f5ab147
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/UnionSketchAlgebraicIntermediate.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+public class UnionSketchAlgebraicIntermediate extends AlgebraicIntermediate {
+
+ /**
+ * Default constructor of the intermediate pass of an Algebraic function.
+ * Assumes default lgK and seed.
+ */
+ public UnionSketchAlgebraicIntermediate() {
+ super(CpcSketch.DEFAULT_LG_K, DEFAULT_UPDATE_SEED);
+ }
+
+ /**
+ * Constructor for the intermediate pass of an Algebraic function. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ * Assumes default seed.
+ *
+ * @param lgK in a form of a String
+ */
+ public UnionSketchAlgebraicIntermediate(final String lgK) {
+ super(Integer.parseInt(lgK), DEFAULT_UPDATE_SEED);
+ }
+
+ /**
+ * Constructor for the intermediate pass of an Algebraic function. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ *
+ * @param lgK parameter controlling the sketch size and accuracy
+ * @param seed for the hash function
+ */
+ public UnionSketchAlgebraicIntermediate(final String lgK, final String seed) {
+ super(Integer.parseInt(lgK), Long.parseLong(seed));
+ }
+
+ @Override
+ boolean isInputRaw() {
+ return false;
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/cpc/package-info.java b/src/main/java/com/yahoo/sketches/pig/cpc/package-info.java
new file mode 100644
index 0000000..c50fe23
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/cpc/package-info.java
@@ -0,0 +1,16 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+/**
+ * Pig UDFs for CPC sketches.
+ * This is a distinct-counting sketch that implements the
+ * <i>Compressed Probabilistic Counting (CPC, a.k.a FM85)</i> algorithms developed by Kevin Lang in
+ * his paper
+ * <a href="https://arxiv.org/abs/1708.06839">Back to the Future: an Even More Nearly
+ * Optimal Cardinality Estimation Algorithm</a>.
+ *
+ * @author Alexander Saydakov
+ */
+package com.yahoo.sketches.pig.cpc;
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/DataToSketch.java b/src/main/java/com/yahoo/sketches/pig/kll/DataToSketch.java
new file mode 100644
index 0000000..dedf4ec
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/DataToSketch.java
@@ -0,0 +1,345 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.io.IOException;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+/**
+ * This UDF is to build sketches from data.
+ * This class implements both the <i>Accumulator</i> and <i>Algebraic</i> interfaces for performance optimization.
+ */
+public class DataToSketch extends EvalFunc<DataByteArray> implements Accumulator<DataByteArray>, Algebraic {
+
+ private static final TupleFactory TUPLE_FACTORY_ = TupleFactory.getInstance();
+
+ // With the single exception of the Accumulator interface, UDFs are stateless.
+ // All parameters kept at the class level must be final, except for the accumSketch.
+ private final int k_;
+ private KllFloatsSketch accumSketch_;
+
+ // TOP LEVEL API
+
+ /**
+ * Default constructor. Assumes default k.
+ */
+ public DataToSketch() {
+ this(KllFloatsSketch.DEFAULT_K);
+ }
+
+ /**
+ * String constructor.
+ *
+ * @param kStr string representation of k
+ */
+ public DataToSketch(final String kStr) {
+ this(Integer.parseInt(kStr));
+ }
+
+ /**
+ * Base constructor.
+ *
+ * @param k parameter that determines the accuracy and size of the sketch.
+ */
+ private DataToSketch(final int k) {
+ super();
+ k_ = k;
+ }
+
+ //@formatter:off
+ /**
+ * Top-level exec function.
+ * This method accepts an input Tuple containing a Bag of one or more inner <b>Datum Tuples</b>
+ * and returns a single updated <b>Sketch</b> as a DataByteArray.
+ *
+ * <p>Types are in the form: Java data type: Pig DataType
+ *
+ * <p><b>Input Tuple</b>
+ * <ul>
+ * <li>Tuple: TUPLE (Must contain only one field)
+ * <ul>
+ * <li>index 0: DataBag: BAG (May contain 0 or more Inner Tuples)
+ * <ul>
+ * <li>index 0: Tuple: TUPLE <b>Datum Tuple</b></li>
+ * <li>...</li>
+ * <li>index n-1: Tuple: TUPLE <b>Datum Tuple</b></li>
+ * </ul>
+ * </li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * <b>Datum Tuple</b>
+ * <ul>
+ * <li>Tuple: TUPLE (Must contain only one field)
+ * <ul>
+ * <li>index 0: Float: FLOAT</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * <b>Sketch Tuple</b>
+ * <ul>
+ * <li>Tuple: TUPLE (Contains exactly 1 field)
+ * <ul>
+ * <li>index 0: DataByteArray: BYTEARRAY = a serialized KllFloatsSketch object.</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * @param inputTuple A tuple containing a single bag, containing Datum Tuples
+ * @return serialized sketch
+ * @see "org.apache.pig.EvalFunc.exec(org.apache.pig.data.Tuple)"
+ * @throws IOException from Pig
+ */
+ // @formatter:on
+
+ @Override // TOP LEVEL EXEC
+ public DataByteArray exec(final Tuple inputTuple) throws IOException {
+ //The exec is a stateless function. It operates on the input and returns a result.
+ final KllFloatsSketch sketch = new KllFloatsSketch(k_);
+ if (inputTuple != null && inputTuple.size() > 0) {
+ final DataBag bag = (DataBag) inputTuple.get(0);
+ for (final Tuple innerTuple: bag) {
+ sketch.update((Float) innerTuple.get(0));
+ }
+ }
+ return new DataByteArray(sketch.toByteArray());
+ }
+
+ // ACCUMULATOR INTERFACE
+
+ /**
+ * An <i>Accumulator</i> version of the standard <i>exec()</i> method. Like <i>exec()</i>,
+ * accumulator is called with a bag of Datum Tuples. Unlike <i>exec()</i>, it doesn't serialize the
+ * sketch at the end. Instead, it can be called multiple times, each time with another bag of Datum Tuples.
+ *
+ * @param inputTuple A tuple containing a single bag, containing Datum Tuples.
+ * @see #exec
+ * @see "org.apache.pig.Accumulator.accumulate(org.apache.pig.data.Tuple)"
+ * @throws IOException by Pig
+ */
+ @Override
+ public void accumulate(final Tuple inputTuple) throws IOException {
+ if (inputTuple == null || inputTuple.size() == 0) { return; }
+ final DataBag bag = (DataBag) inputTuple.get(0);
+ if (bag == null) { return; }
+ if (accumSketch_ == null) {
+ accumSketch_ = new KllFloatsSketch(k_);
+ }
+ for (final Tuple innerTuple: bag) {
+ accumSketch_.update((Float) innerTuple.get(0));
+ }
+ }
+
+ /**
+ * Returns the result that has been built up by multiple calls to {@link #accumulate}.
+ *
+ * @return serialized sketch
+ * @see "org.apache.pig.Accumulator.getValue()"
+ */
+ @Override
+ public DataByteArray getValue() {
+ if (accumSketch_ != null) {
+ return new DataByteArray(accumSketch_.toByteArray());
+ }
+ // return empty sketch
+ return new DataByteArray(new KllFloatsSketch(k_).toByteArray());
+ }
+
+ /**
+ * Cleans up the UDF state after being called using the {@link Accumulator} interface.
+ *
+ * @see "org.apache.pig.Accumulator.cleanup()"
+ */
+ @Override
+ public void cleanup() {
+ accumSketch_ = null;
+ }
+
+ // ALGEBRAIC INTERFACE
+
+ @Override
+ public String getInitial() {
+ return Initial.class.getName();
+ }
+
+ @Override
+ public String getIntermed() {
+ return Intermediate.class.getName();
+ }
+
+ @Override
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ // STATIC Initial Class only called by Pig
+
+ /**
+ * Class used to calculate the initial pass of an Algebraic sketch operation.
+ *
+ * <p>
+ * The Initial class simply passes through all records unchanged so that they can be
+ * processed by the intermediate processor instead.</p>
+ */
+ public static class Initial extends EvalFunc<Tuple> {
+ // The Algebraic worker classes (Initial, Intermediate and Final) are static and stateless.
+ // The constructors and parameters must mirror the parent class as there is no linkage
+ // between them.
+ /**
+ * Default constructor.
+ */
+ public Initial() {}
+
+ /**
+ * Constructor with explicit k as string.
+ *
+ * @param kStr string representation of k
+ */
+ public Initial(final String kStr) {}
+
+ @Override
+ public Tuple exec(final Tuple inputTuple) throws IOException {
+ return inputTuple;
+ }
+ }
+
+ // STATIC Intermediate Class only called by Pig
+
+ /**
+ * Class used to calculate the intermediate pass of an <i>Algebraic</i> sketch operation.
+ * It will receive a bag of values returned by either the <i>Intermediate</i>
+ * stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
+ * interpret both types.
+ */
+ public static class Intermediate extends EvalFunc<Tuple> {
+ // The Algebraic worker classes (Initial, Intermediate and Final) are static and stateless.
+ // The constructors and parameters must mirror the parent class as there is no linkage
+ // between them.
+ private final int k_;
+
+ /**
+ * Default constructor. Assumes default k.
+ */
+ public Intermediate() {
+ this(KllFloatsSketch.DEFAULT_K);
+ }
+
+ /**
+ * Constructor with explicit k as string. Pig will call.
+ * this and pass the same constructor arguments as the base UDF.
+ *
+ * @param kStr string representation of k
+ */
+ public Intermediate(final String kStr) {
+ this(Integer.parseInt(kStr));
+ }
+
+ /**
+ * Constructor with primitive k.
+ *
+ * @param k parameter that determines the accuracy and size of the sketch.
+ */
+ private Intermediate(final int k) {
+ k_ = k;
+ }
+
+ @Override
+ public Tuple exec(final Tuple inputTuple) throws IOException { //throws is in API
+ return TUPLE_FACTORY_.newTuple(process(inputTuple, k_));
+ }
+ }
+
+ // STATIC Final Class only called by Pig
+
+ /**
+ * Class used to calculate the final pass of an <i>Algebraic</i> sketch operation.
+ * It will receive a bag of values returned by either the <i>Intermediate</i>
+ * stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
+ * interpret both types.
+ */
+ public static class Final extends EvalFunc<DataByteArray> {
+ // The Algebraic worker classes (Initial, Intermediate and Final) are static and stateless.
+ // The constructors and parameters must mirror the parent class as there is no linkage
+ // between them.
+ private final int k_;
+
+ /**
+ * Default constructor. Assumes default k.
+ */
+ public Final() {
+ this(KllFloatsSketch.DEFAULT_K);
+ }
+
+ /**
+ * Constructor with explicit k as string. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ *
+ * @param kStr string representation of k
+ */
+ public Final(final String kStr) {
+ this(Integer.parseInt(kStr));
+ }
+
+ /**
+ * Constructor with primitive k.
+ *
+ * @param k parameter that determines the accuracy and size of the sketch.
+ */
+ private Final(final int k) {
+ k_ = k;
+ }
+
+ @Override
+ public DataByteArray exec(final Tuple inputTuple) throws IOException {
+ return process(inputTuple, k_);
+ }
+ }
+
+ private static DataByteArray process(final Tuple inputTuple, final int k) throws IOException {
+ final KllFloatsSketch sketch = new KllFloatsSketch(k);
+ if (inputTuple != null && inputTuple.size() > 0) {
+ final DataBag outerBag = (DataBag) inputTuple.get(0);
+ for (final Tuple dataTuple: outerBag) {
+ final Object f0 = dataTuple.get(0);
+ if (f0 == null) { continue; }
+ if (f0 instanceof DataBag) {
+ final DataBag innerBag = (DataBag) f0; // inputTuple.bag0.dataTupleN.f0:bag
+ if (innerBag.size() == 0) { continue; }
+ // If field 0 of a dataTuple is a Bag all innerTuples of this inner bag
+ // will be passed into the union.
+ // It is due to system bagged outputs from multiple mapper Initial functions.
+ // The Intermediate stage was bypassed.
+ for (final Tuple innerTuple: innerBag) {
+ sketch.update((Float) innerTuple.get(0));
+ }
+ } else if (f0 instanceof DataByteArray) { // inputTuple.bag0.dataTupleN.f0:DBA
+ // If field 0 of a dataTuple is a DataByteArray we assume it is a sketch
+ // due to system bagged outputs from multiple mapper Intermediate functions.
+ // Each dataTuple.DBA:sketch will merged into the union.
+ final DataByteArray dba = (DataByteArray) f0;
+ sketch.merge(KllFloatsSketch.heapify(Memory.wrap(dba.get())));
+ } else {
+ throw new IllegalArgumentException("dataTuple.Field0: Is not a DataByteArray: "
+ + f0.getClass().getName());
+ }
+ }
+ }
+ return new DataByteArray(sketch.toByteArray());
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/GetCdf.java b/src/main/java/com/yahoo/sketches/pig/kll/GetCdf.java
new file mode 100644
index 0000000..1cfe22a
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/GetCdf.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+/**
+ * This UDF is to get an approximation to the Cumulative Distribution Function (CDF) of the input stream
+ * given a sketch and a set of split points - an array of <i>m</i> unique, monotonically increasing
+ * float values that divide the real number line into <i>m+1</i> consecutive disjoint intervals.
+ * The function returns an array of <i>m+1</i> double values, the first <i>m</i> of which are approximations
+ * to the ranks of the corresponding split points (fraction of input stream values that are less than
+ * a split point). The last value is always 1. CDF can also be viewed as a cumulative version of PMF.
+ */
+public class GetCdf extends EvalFunc<Tuple> {
+
+ @Override
+ public Tuple exec(final Tuple input) throws IOException {
+ if (input.size() < 2) {
+ throw new IllegalArgumentException(
+ "expected two or more inputs: sketch and list of split points");
+ }
+
+ if (!(input.get(0) instanceof DataByteArray)) {
+ throw new IllegalArgumentException("expected a DataByteArray as a sketch, got "
+ + input.get(0).getClass().getSimpleName());
+ }
+ final DataByteArray dba = (DataByteArray) input.get(0);
+ final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(dba.get()));
+
+ final float[] splitPoints = new float[input.size() - 1];
+ for (int i = 1; i < input.size(); i++) {
+ if (!(input.get(i) instanceof Float)) {
+ throw new IllegalArgumentException("expected a float value as a split point, got "
+ + input.get(i).getClass().getSimpleName());
+ }
+ splitPoints[i - 1] = (float) input.get(i);
+ }
+ final double[] cdf = sketch.getCDF(splitPoints);
+ if (cdf == null) { return null; }
+ return GetPmf.doubleArrayToTuple(cdf);
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/GetK.java b/src/main/java/com/yahoo/sketches/pig/kll/GetK.java
new file mode 100644
index 0000000..2507325
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/GetK.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+/**
+ * This UDF is to get the parameter K from a given sketch.
+ * This can be useful for debugging a work flow to make sure that resulting sketches
+ * have the intended K, and, therefore, the intended accuracy
+ */
+public class GetK extends EvalFunc<Integer> {
+
+ @Override
+ public Integer exec(final Tuple input) throws IOException {
+ if (input.size() != 1) {
+ throw new IllegalArgumentException("expected one input");
+ }
+
+ if (!(input.get(0) instanceof DataByteArray)) {
+ throw new IllegalArgumentException("expected a DataByteArray as a sketch, got "
+ + input.get(0).getClass().getSimpleName());
+ }
+ final DataByteArray dba = (DataByteArray) input.get(0);
+ final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(dba.get()));
+
+ return sketch.getK();
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/GetPmf.java b/src/main/java/com/yahoo/sketches/pig/kll/GetPmf.java
new file mode 100644
index 0000000..59b9014
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/GetPmf.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+/**
+ * This UDF is to get an approximation to the Probability Mass Function (PMF) of the input stream
+ * given a sketch and a set of split points - an array of <i>m</i> unique, monotonically increasing
+ * float values that divide the real number line into <i>m+1</i> consecutive disjoint intervals.
+ * The function returns an array of m+1 doubles each of which is an approximation to the fraction
+ * of the input stream values that fell into one of those intervals. Intervals are inclusive of
+ * the left split point and exclusive of the right split point.
+ */
+public class GetPmf extends EvalFunc<Tuple> {
+
+ @Override
+ public Tuple exec(final Tuple input) throws IOException {
+ if (input.size() < 2) {
+ throw new IllegalArgumentException(
+ "expected two or more inputs: sketch and list of split points");
+ }
+
+ if (!(input.get(0) instanceof DataByteArray)) {
+ throw new IllegalArgumentException("expected a DataByteArray as a sketch, got "
+ + input.get(0).getClass().getSimpleName());
+ }
+ final DataByteArray dba = (DataByteArray) input.get(0);
+ final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(dba.get()));
+
+ final float[] splitPoints = new float[input.size() - 1];
+ for (int i = 1; i < input.size(); i++) {
+ if (!(input.get(i) instanceof Float)) {
+ throw new IllegalArgumentException("expected a float value as a split point, got "
+ + input.get(i).getClass().getSimpleName());
+ }
+ splitPoints[i - 1] = (float) input.get(i);
+ }
+ final double[] pmf = sketch.getPMF(splitPoints);
+ if (pmf == null) { return null; }
+ return doubleArrayToTuple(pmf);
+ }
+
+ static Tuple doubleArrayToTuple(final double[] array) throws ExecException {
+ final Tuple tuple = TupleFactory.getInstance().newTuple(array.length);
+ for (int i = 0; i < array.length; i++) {
+ tuple.set(i, array[i]);
+ }
+ return tuple;
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/GetQuantile.java b/src/main/java/com/yahoo/sketches/pig/kll/GetQuantile.java
new file mode 100644
index 0000000..cb58cbe
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/GetQuantile.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+/**
+ * This UDF is to get a quantile value from a given sketch. A single value for a
+ * given fraction is returned. The fraction represents a normalized rank and must be
+ * from 0 to 1 inclusive. For example, the fraction of 0.5 corresponds to 50th percentile,
+ * which is the median value of the distribution (the number separating the higher half
+ * of the probability distribution from the lower half).
+ */
+public class GetQuantile extends EvalFunc<Float> {
+
+ @Override
+ public Float exec(final Tuple input) throws IOException {
+ if (input.size() != 2) {
+ throw new IllegalArgumentException("expected two inputs: sketch and fraction");
+ }
+
+ if (!(input.get(0) instanceof DataByteArray)) {
+ throw new IllegalArgumentException("expected a DataByteArray as a sketch, got "
+ + input.get(0).getClass().getSimpleName());
+ }
+ final DataByteArray dba = (DataByteArray) input.get(0);
+ final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(dba.get()));
+
+ if (!(input.get(1) instanceof Double)) {
+ throw new IllegalArgumentException("expected a double value as a fraction, got "
+ + input.get(1).getClass().getSimpleName());
+ }
+ final double fraction = (double) input.get(1);
+ return sketch.getQuantile(fraction);
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/GetQuantiles.java b/src/main/java/com/yahoo/sketches/pig/kll/GetQuantiles.java
new file mode 100644
index 0000000..59c7d8c
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/GetQuantiles.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+/**
+ * This UDF is to get a list of quantile values from a given sketch and a list of
+ * fractions or a number of evenly spaced intervals. The fractions represent normalized ranks and
+ * must be from 0 to 1 inclusive. For example, the fraction of 0.5 corresponds to 50th percentile,
+ * which is the median value of the distribution (the number separating the higher half
+ * of the probability distribution from the lower half).
+ */
+public class GetQuantiles extends EvalFunc<Tuple> {
+
+ @Override
+ public Tuple exec(final Tuple input) throws IOException {
+ if (input.size() < 2) {
+ throw new IllegalArgumentException("expected two or more inputs: sketch and list of fractions");
+ }
+
+ if (!(input.get(0) instanceof DataByteArray)) {
+ throw new IllegalArgumentException("expected a DataByteArray as a sketch, got "
+ + input.get(0).getClass().getSimpleName());
+ }
+ final DataByteArray dba = (DataByteArray) input.get(0);
+ final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(dba.get()));
+
+ if (input.size() == 2) {
+ final Object arg = input.get(1);
+ if (arg instanceof Integer) { // number of evenly spaced intervals
+ return floatArrayToTuple(sketch.getQuantiles((int) arg));
+ } else if (arg instanceof Double) { // just one fraction
+ return TupleFactory.getInstance().newTuple(Arrays.asList(sketch.getQuantile((double) arg)));
+ } else {
+ throw new IllegalArgumentException("expected a double value as a fraction or an integer value"
+ + " as a number of evenly spaced intervals, got " + arg.getClass().getSimpleName());
+ }
+ }
+ // more than one number - must be double fractions
+ final double[] fractions = new double[input.size() - 1];
+ for (int i = 1; i < input.size(); i++) {
+ if (!(input.get(i) instanceof Double)) {
+ throw new IllegalArgumentException("expected a double value as a fraction, got "
+ + input.get(i).getClass().getSimpleName());
+ }
+ fractions[i - 1] = (double) input.get(i);
+ }
+ return floatArrayToTuple(sketch.getQuantiles(fractions));
+ }
+
+ private static Tuple floatArrayToTuple(final float[] array) throws ExecException {
+ final Tuple tuple = TupleFactory.getInstance().newTuple(array.length);
+ for (int i = 0; i < array.length; i++) {
+ tuple.set(i, array[i]);
+ }
+ return tuple;
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/GetRank.java b/src/main/java/com/yahoo/sketches/pig/kll/GetRank.java
new file mode 100644
index 0000000..4b0a50e
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/GetRank.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+/**
+ * This UDF is to get a normalized rank for a given value from a given sketch. A single
+ * rank for a given value is returned. The normalized rank is a double value
+ * from 0 to 1 inclusive. For example, the rank of 0.5 corresponds to 50th percentile,
+ * which is the median value of the distribution (the number separating the higher half
+ * of the probability distribution from the lower half).
+ */
+public class GetRank extends EvalFunc<Double> {
+
+ @Override
+ public Double exec(final Tuple input) throws IOException {
+ if (input.size() != 2) {
+ throw new IllegalArgumentException("expected two inputs: sketch and value");
+ }
+
+ if (!(input.get(0) instanceof DataByteArray)) {
+ throw new IllegalArgumentException("expected a DataByteArray as a sketch, got "
+ + input.get(0).getClass().getSimpleName());
+ }
+ final DataByteArray dba = (DataByteArray) input.get(0);
+ final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(dba.get()));
+
+ if (!(input.get(1) instanceof Float)) {
+ throw new IllegalArgumentException("expected a float value, got "
+ + input.get(1).getClass().getSimpleName());
+ }
+ final float value = (float) input.get(1);
+ return sketch.getRank(value);
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/SketchToString.java b/src/main/java/com/yahoo/sketches/pig/kll/SketchToString.java
new file mode 100644
index 0000000..6e23a5a
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/SketchToString.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+/**
+ * This UDF is to get a human-readable summary of a given sketch.
+ */
+public class SketchToString extends EvalFunc<String> {
+
+ @Override
+ public String exec(final Tuple input) throws IOException {
+ if (input.size() != 1) {
+ throw new IllegalArgumentException("expected one input");
+ }
+
+ if (!(input.get(0) instanceof DataByteArray)) {
+ throw new IllegalArgumentException("expected a DataByteArray as a sketch, got "
+ + input.get(0).getClass().getSimpleName());
+ }
+ final DataByteArray dba = (DataByteArray) input.get(0);
+ final KllFloatsSketch sketch = KllFloatsSketch.heapify(Memory.wrap(dba.get()));
+
+ return sketch.toString();
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/UnionSketch.java b/src/main/java/com/yahoo/sketches/pig/kll/UnionSketch.java
new file mode 100644
index 0000000..1ce5425
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/UnionSketch.java
@@ -0,0 +1,353 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.io.IOException;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+/**
+ * This UDF is to merge sketches.
+ * This class implements both the <i>Accumulator</i> and <i>Algebraic</i> interfaces for performance optimization.
+ */
+public class UnionSketch extends EvalFunc<DataByteArray> implements Accumulator<DataByteArray>, Algebraic {
+
+ private static final TupleFactory TUPLE_FACTORY_ = TupleFactory.getInstance();
+
+ // With the single exception of the Accumulator interface, UDFs are stateless.
+ // All parameters kept at the class level must be final, except for the accumSketch.
+ private final int k_;
+ private KllFloatsSketch accumSketch_;
+
+ //TOP LEVEL API
+
+ /**
+ * Default constructor. Assumes default k.
+ */
+ public UnionSketch() {
+ this(KllFloatsSketch.DEFAULT_K);
+ }
+
+ /**
+ * Constructor with explicit k as string.
+ *
+ * @param kStr string representation of k
+ */
+ public UnionSketch(final String kStr) {
+ this(Integer.parseInt(kStr));
+ }
+
+ /**
+ * Base constructor.
+ *
+ * @param k parameter that determines the accuracy and size of the sketch.
+ */
+ private UnionSketch(final int k) {
+ super();
+ k_ = k;
+ }
+
+ //@formatter:off
+ /**
+ * Top-level exec function.
+ * This method accepts an input Tuple containing a Bag of one or more inner <b>Sketch Tuples</b>
+ * and returns a single updated <b>Sketch</b> as a DataByteArray.
+ *
+ * <p>Types are in the form: Java data type: Pig DataType
+ *
+ * <p><b>Input Tuple</b>
+ * <ul>
+ * <li>Tuple: TUPLE (Must contain only one field)
+ * <ul>
+ * <li>index 0: DataBag: BAG (May contain 0 or more Inner Tuples)
+ * <ul>
+ * <li>index 0: Tuple: TUPLE <b>Sketch Tuple</b></li>
+ * <li>...</li>
+ * <li>index n-1: Tuple: TUPLE <b>Sketch Tuple</b></li>
+ * </ul>
+ * </li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * <b>Sketch Tuple</b>
+ * <ul>
+ * <li>Tuple: TUPLE (Contains exactly 1 field)
+ * <ul>
+ * <li>index 0: DataByteArray: BYTEARRAY = The serialization of a Sketch object.</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * @param inputTuple A tuple containing a single bag, containing Sketch Tuples
+ * @return serialized sketch
+ * @see "org.apache.pig.EvalFunc.exec(org.apache.pig.data.Tuple)"
+ */
+ //@formatter:on
+
+ @Override // TOP LEVEL EXEC
+ public DataByteArray exec(final Tuple inputTuple) throws IOException {
+ //The exec is a stateless function. It operates on the input and returns a result.
+ final KllFloatsSketch sketch = new KllFloatsSketch(k_);
+ if (inputTuple != null && inputTuple.size() > 0) {
+ final DataBag bag = (DataBag) inputTuple.get(0);
+ updateUnion(bag, sketch);
+ }
+ return new DataByteArray(sketch.toByteArray());
+ }
+
+ //ACCUMULATOR INTERFACE
+
+ /**
+ * An <i>Accumulator</i> version of the standard <i>exec()</i> method. Like <i>exec()</i>,
+ * accumulator is called with a bag of Sketch Tuples. Unlike <i>exec()</i>, it doesn't serialize the
+ * sketch at the end. Instead, it can be called multiple times, each time with another bag of
+ * Sketch Tuples to be input to the Union.
+ *
+ * @param inputTuple A tuple containing a single bag, containing Sketch Tuples.
+ * @see #exec
+ * @see "org.apache.pig.Accumulator.accumulate(org.apache.pig.data.Tuple)"
+ * @throws IOException by Pig
+ */
+ @Override
+ public void accumulate(final Tuple inputTuple) throws IOException {
+ if (inputTuple == null || inputTuple.size() == 0) { return; }
+ final DataBag bag = (DataBag) inputTuple.get(0);
+ if (bag == null) { return; }
+ if (accumSketch_ == null) {
+ accumSketch_ = new KllFloatsSketch(k_);
+ }
+ updateUnion(bag, accumSketch_);
+ }
+
+ /**
+ * Returns the result of the Union that has been built up by multiple calls to {@link #accumulate}.
+ *
+ * @return serialized sketch
+ * @see "org.apache.pig.Accumulator.getValue()"
+ */
+ @Override
+ public DataByteArray getValue() {
+ if (accumSketch_ != null) {
+ return new DataByteArray(accumSketch_.toByteArray());
+ }
+ // return empty sketch
+ return new DataByteArray(new KllFloatsSketch(k_).toByteArray());
+ }
+
+ /**
+ * Cleans up the UDF state after being called using the {@link Accumulator} interface.
+ *
+ * @see "org.apache.pig.Accumulator.cleanup()"
+ */
+ @Override
+ public void cleanup() {
+ accumSketch_ = null;
+ }
+
+ //ALGEBRAIC INTERFACE
+
+ @Override
+ public String getInitial() {
+ return Initial.class.getName();
+ }
+
+ @Override
+ public String getIntermed() {
+ return Intermediate.class.getName();
+ }
+
+ @Override
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ //TOP LEVEL PRIVATE STATIC METHODS
+
+ /**
+ * Updates a union from a bag of sketches
+ *
+ * @param bag A bag of sketchTuples.
+ * @param union The union to update
+ */
+ private static void updateUnion(final DataBag bag, final KllFloatsSketch union) throws ExecException {
+ for (Tuple innerTuple: bag) {
+ final Object f0 = innerTuple.get(0);
+ if (f0 == null) { continue; }
+ if (f0 instanceof DataByteArray) {
+ final DataByteArray dba = (DataByteArray) f0;
+ if (dba.size() > 0) {
+ union.merge(KllFloatsSketch.heapify(Memory.wrap(dba.get())));
+ }
+ } else {
+ throw new IllegalArgumentException("Field type was not DataType.BYTEARRAY: " + innerTuple.getType(0));
+ }
+ }
+ }
+
+ //STATIC Initial Class only called by Pig
+
+ /**
+ * Class used to calculate the initial pass of an Algebraic sketch operation.
+ *
+ * <p>
+ * The Initial class simply passes through all records unchanged so that they can be
+ * processed by the intermediate processor instead.</p>
+ */
+ public static class Initial extends EvalFunc<Tuple> {
+ // The Algebraic worker classes (Initial, Intermediate and Final) are static and stateless.
+ // The constructors and parameters must mirror the parent class as there is no linkage
+ // between them.
+ /**
+ * Default constructor.
+ */
+ public Initial() {}
+
+ /**
+ * Constructor for the initial pass of an Algebraic function. Pig will call this and pass the
+ * same constructor arguments as the base UDF. In this case the arguments are ignored.
+ *
+ * @param kStr string representation of k
+ */
+ public Initial(final String kStr) {}
+
+ @Override
+ public Tuple exec(final Tuple inputTuple) throws IOException {
+ return inputTuple;
+ }
+ }
+
+ // STATIC Intermediate Class only called by Pig
+
+ /**
+ * Class used to calculate the intermediate pass of an <i>Algebraic</i> union operation.
+ * It will receive a bag of values returned by either the <i>Intermediate</i>
+ * stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
+ * interpret both types.
+ */
+ public static class Intermediate extends EvalFunc<Tuple> {
+ // The Algebraic worker classes (Initial, Intermediate and Final) are static and stateless.
+ // The constructors and final parameters must mirror the parent class as there is no linkage
+ // between them.
+ private final int k_;
+
+ /**
+ * Default constructor. Assumes default k.
+ */
+ public Intermediate() {
+ this(KllFloatsSketch.DEFAULT_K);
+ }
+
+ /**
+ * Constructor with explicit k as string.
+ *
+ * @param kStr string representation of k
+ */
+ public Intermediate(final String kStr) {
+ this(Integer.parseInt(kStr));
+ }
+
+ /**
+ * Constructor with primitive k.
+ *
+ * @param k parameter that determines the accuracy and size of the sketch.
+ */
+ private Intermediate(final int k) {
+ k_ = k;
+ }
+
+ @Override
+ public Tuple exec(final Tuple inputTuple) throws IOException {
+ return TUPLE_FACTORY_.newTuple(process(inputTuple, k_));
+ }
+ }
+
+ // STATIC Final Class only called by Pig
+
+ /**
+ * Class used to calculate the final pass of an <i>Algebraic</i> union operation.
+ * It will receive a bag of values returned by either the <i>Intermediate</i>
+ * stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
+ * interpret both types.
+ */
+ public static class Final extends EvalFunc<DataByteArray> {
+ // The Algebraic worker classes (Initial, Intermediate and Final) are static and stateless.
+ // The constructors and final parameters must mirror the parent class as there is no linkage
+ // between them.
+ private final int k_;
+
+ /**
+ * Default constructor. Assumes default k.
+ */
+ public Final() {
+ this(KllFloatsSketch.DEFAULT_K);
+ }
+
+ /**
+ * Constructor with explicit k as string.
+ *
+ * @param kStr string representation of k
+ */
+ public Final(final String kStr) {
+ this(Integer.parseInt(kStr));
+ }
+
+ /**
+ * Constructor with primitive k.
+ *
+ * @param k parameter that determines the accuracy and size of the sketch.
+ */
+ private Final(final int k) {
+ k_ = k;
+ }
+
+ @Override
+ public DataByteArray exec(final Tuple inputTuple) throws IOException {
+ return process(inputTuple, k_);
+ }
+ }
+
+ private static DataByteArray process(final Tuple inputTuple, final int k) throws IOException {
+ final KllFloatsSketch union = new KllFloatsSketch(k);
+ if (inputTuple != null && inputTuple.size() > 0) {
+ final DataBag outerBag = (DataBag) inputTuple.get(0);
+ for (final Tuple dataTuple: outerBag) {
+ final Object f0 = dataTuple.get(0);
+ if (f0 == null) { continue; }
+ if (f0 instanceof DataBag) {
+ final DataBag innerBag = (DataBag) f0; //inputTuple.bag0.dataTupleN.f0:bag
+ if (innerBag.size() == 0) { continue; }
+ // If field 0 of a dataTuple is again a Bag all tuples of this inner bag
+ // will be passed into the union.
+ // It is due to system bagged outputs from multiple mapper Initial functions.
+ // The Intermediate stage was bypassed.
+ updateUnion(innerBag, union);
+ } else if (f0 instanceof DataByteArray) { //inputTuple.bag0.dataTupleN.f0:DBA
+ // If field 0 of a dataTuple is a DataByteArray we assume it is a sketch from a prior call
+ // It is due to system bagged outputs from multiple mapper Intermediate functions.
+ // Each dataTuple.DBA:sketch will merged into the union.
+ final DataByteArray dba = (DataByteArray) f0;
+ union.merge(KllFloatsSketch.heapify(Memory.wrap(dba.get())));
+ } else {
+ throw new IllegalArgumentException("dataTuple.Field0: Is not a DataByteArray: "
+ + f0.getClass().getName());
+ }
+ }
+ }
+ return new DataByteArray(union.toByteArray());
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/kll/package-info.java b/src/main/java/com/yahoo/sketches/pig/kll/package-info.java
new file mode 100644
index 0000000..9cae236
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/kll/package-info.java
@@ -0,0 +1,12 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+/**
+ * Pig UDFs for KLL quantiles sketches.
+ * See https://datasketches.github.io/docs/Quantiles/KLLSketch.html
+ *
+ * @author Alexander Saydakov
+ */
+package com.yahoo.sketches.pig.kll;
diff --git a/src/test/java/com/yahoo/sketches/pig/cpc/DataToSketchTest.java b/src/test/java/com/yahoo/sketches/pig/cpc/DataToSketchTest.java
new file mode 100644
index 0000000..4ee2ac0
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/cpc/DataToSketchTest.java
@@ -0,0 +1,326 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+public class DataToSketchTest {
+
+ private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+ private static final BagFactory BAG_FACTORY = BagFactory.getInstance();
+
+ @Test
+ public void execNullInputTuple() throws Exception {
+ final EvalFunc<DataByteArray> func = new DataToSketch();
+ final DataByteArray result = func.exec(null);
+ final CpcSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void execEmptyInputTupleCustomLgK() throws Exception {
+ final EvalFunc<DataByteArray> func = new DataToSketch("10");
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple());
+ final CpcSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ Assert.assertEquals(sketch.getLgK(), 10);
+ }
+
+ @Test
+ public void execEmptyBag() throws Exception {
+ final EvalFunc<DataByteArray> func = new DataToSketch();
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+ final CpcSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void execUnsupportedType() throws Exception {
+ final EvalFunc<DataByteArray> func = new DataToSketch();
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple(new Object()));
+ func.exec(TUPLE_FACTORY.newTuple(bag));
+ }
+
+ @Test
+ public void execVariousTypesOfInput() throws Exception {
+ final EvalFunc<DataByteArray> func = new DataToSketch();
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+ final Tuple tupleWithNull = TUPLE_FACTORY.newTuple(1);
+ tupleWithNull.set(0, null);
+ bag.add(tupleWithNull);
+ bag.add(TUPLE_FACTORY.newTuple(new Byte((byte) 1)));
+ bag.add(TUPLE_FACTORY.newTuple(new Integer(2)));
+ bag.add(TUPLE_FACTORY.newTuple(new Long(3)));
+ bag.add(TUPLE_FACTORY.newTuple(new Float(1)));
+ bag.add(TUPLE_FACTORY.newTuple(new Double(2)));
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(new byte[] {(byte) 1})));
+ bag.add(TUPLE_FACTORY.newTuple("a"));
+ final CpcSketch sketch = getSketch(func.exec(TUPLE_FACTORY.newTuple(bag)));
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 7.0, 0.01);
+ }
+
+ @Test
+ public void accumulator() throws Exception {
+ final Accumulator<DataByteArray> func = new DataToSketch();
+
+ // no input yet
+ DataByteArray result = func.getValue();
+ CpcSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // null input tuple
+ func.accumulate(null);
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // empty input tuple
+ func.accumulate(TUPLE_FACTORY.newTuple());
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // empty bag
+ func.accumulate(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // normal case
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple("a"));
+ bag.add(TUPLE_FACTORY.newTuple("b"));
+ func.accumulate(TUPLE_FACTORY.newTuple(bag));
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+
+ // cleanup
+ func.cleanup();
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicInitial() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new DataToSketch().getInitial()).newInstance();
+ final Tuple input = TUPLE_FACTORY.newTuple();
+ final Tuple result = func.exec(input);
+ Assert.assertEquals(result, input);
+ }
+
+ @Test
+ public void algebraicInitialWithLgK() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new DataToSketch().getInitial())
+ .getConstructor(String.class).newInstance("10");
+ final Tuple input = TUPLE_FACTORY.newTuple();
+ final Tuple result = func.exec(input);
+ Assert.assertEquals(result, input);
+ }
+
+ @Test
+ public void algebraicInitialWithLgKAndSeed() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new DataToSketch().getInitial())
+ .getConstructor(String.class, String.class).newInstance("10", "123");
+ final Tuple input = TUPLE_FACTORY.newTuple();
+ final Tuple result = func.exec(input);
+ Assert.assertEquals(result, input);
+ }
+
+ @Test
+ public void algebraicIntermediateNullInputTuple() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+ final Tuple result = func.exec(null);
+ final CpcSketch sketch = getSketch((DataByteArray) result.get(0));
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicIntermediateEmptyInputTuple() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+ final Tuple result = func.exec(TUPLE_FACTORY.newTuple());
+ final CpcSketch sketch = getSketch((DataByteArray) result.get(0));
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicIntermediateEmptyBag() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+ Tuple result = func.exec(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+ CpcSketch sketch = getSketch((DataByteArray) result.get(0));
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicIntermediateFromInitial() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+ final DataBag outerBag = BAG_FACTORY.newDefaultBag();
+ final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+ innerBag.add(TUPLE_FACTORY.newTuple("a"));
+ innerBag.add(TUPLE_FACTORY.newTuple("b"));
+ innerBag.add(TUPLE_FACTORY.newTuple("c"));
+ outerBag.add(TUPLE_FACTORY.newTuple(innerBag));
+ final Tuple result = func.exec(TUPLE_FACTORY.newTuple(outerBag));
+ final CpcSketch sketch = getSketch((DataByteArray) result.get(0));
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 3.0, 0.01);
+ }
+
+ @Test
+ public void algebraicIntermediateFromIntermediate() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+ final CpcSketch inputSketch = new CpcSketch();
+ inputSketch.update("a");
+ inputSketch.update("b");
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+ final Tuple result = func.exec(TUPLE_FACTORY.newTuple(bag));
+ final CpcSketch sketch = getSketch((DataByteArray) result.get(0));
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+ }
+
+ @Test
+ public void algebraicIntermediateFromIntermediateCustomLgKAndSeed() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed())
+ .getConstructor(String.class, String.class).newInstance("10", "123");
+ final CpcSketch inputSketch = new CpcSketch(10, 123);
+ inputSketch.update("a");
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+ final Tuple result = func.exec(TUPLE_FACTORY.newTuple(bag));
+ final CpcSketch sketch = getSketch((DataByteArray) result.get(0), 123);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 1.0, 0.01);
+ Assert.assertEquals(sketch.getLgK(), 10);
+ }
+
+ @Test
+ public void algebraicFinalNullInputTuple() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+ final DataByteArray result = func.exec(null);
+ final CpcSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicFinalEmptyInputTuple() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple());
+ final CpcSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicFinalEmptyBag() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+ final CpcSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicFinalFromInitial() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+ final DataBag outerBag = BAG_FACTORY.newDefaultBag();
+ final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+ innerBag.add(TUPLE_FACTORY.newTuple("a"));
+ innerBag.add(TUPLE_FACTORY.newTuple("b"));
+ innerBag.add(TUPLE_FACTORY.newTuple("c"));
+ outerBag.add(TUPLE_FACTORY.newTuple(innerBag));
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(outerBag));
+ final CpcSketch sketch = getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 3.0, 0.01);
+ }
+
+ @Test
+ public void algebraicFinalFromIntermediate() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+ final CpcSketch inputSketch = new CpcSketch();
+ inputSketch.update("a");
+ inputSketch.update("b");
+ DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+ DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+ final CpcSketch sketch = getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+ }
+
+ @Test
+ public void algebraicFinalFromIntermediateCustomLgKAndSeed() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal())
+ .getConstructor(String.class, String.class).newInstance("10", "123");
+ final CpcSketch inputSketch = new CpcSketch(10, 123);
+ inputSketch.update("a");
+ inputSketch.update("b");
+ DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+ DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+ final CpcSketch sketch = getSketch(result, 123);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+ Assert.assertEquals(sketch.getLgK(), 10);
+ }
+
+ static CpcSketch getSketch(final DataByteArray dba) throws Exception {
+ return getSketch(dba, DEFAULT_UPDATE_SEED);
+ }
+
+ static CpcSketch getSketch(final DataByteArray dba, final long seed) throws Exception {
+ Assert.assertNotNull(dba);
+ Assert.assertTrue(dba.size() > 0);
+ return CpcSketch.heapify(dba.get(), seed);
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/cpc/GetEstimateAndErrorBoundsTest.java b/src/test/java/com/yahoo/sketches/pig/cpc/GetEstimateAndErrorBoundsTest.java
new file mode 100644
index 0000000..de92ba3
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/cpc/GetEstimateAndErrorBoundsTest.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+public class GetEstimateAndErrorBoundsTest {
+
+ private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+ @Test
+ public void nullInputTuple() throws Exception {
+ final EvalFunc<Tuple> func = new GetEstimateAndErrorBounds();
+ final Tuple result = func.exec(null);
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void emptyInputTuple() throws Exception {
+ final EvalFunc<Tuple> func = new GetEstimateAndErrorBounds();
+ final Tuple result = func.exec(TUPLE_FACTORY.newTuple());
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void normalCase() throws Exception {
+ final EvalFunc<Tuple> func = new GetEstimateAndErrorBounds();
+ final CpcSketch sketch = new CpcSketch();
+ sketch.update(1);
+ sketch.update(2);
+ Tuple result = func.exec(TUPLE_FACTORY.newTuple(new DataByteArray(sketch.toByteArray())));
+ Assert.assertNotNull(result);
+ Assert.assertEquals((Double) result.get(0), 2.0, 0.01);
+ Assert.assertTrue((Double) result.get(1) <= 2.0);
+ Assert.assertTrue((Double) result.get(2) >= 2.0);
+ }
+
+ @Test
+ public void normalCaseWithKappa() throws Exception {
+ final EvalFunc<Tuple> func = new GetEstimateAndErrorBounds("1");
+ final CpcSketch sketch = new CpcSketch();
+ sketch.update(1);
+ sketch.update(2);
+ Tuple result = func.exec(TUPLE_FACTORY.newTuple(new DataByteArray(sketch.toByteArray())));
+ Assert.assertNotNull(result);
+ Assert.assertEquals((Double) result.get(0), 2.0, 0.01);
+ Assert.assertTrue((Double) result.get(1) <= 2.0);
+ Assert.assertTrue((Double) result.get(2) >= 2.0);
+ }
+
+ @Test
+ public void normalCaseWithKappaAndSeed() throws Exception {
+ final EvalFunc<Tuple> func = new GetEstimateAndErrorBounds("3", "123");
+ final CpcSketch sketch = new CpcSketch(12, 123);
+ sketch.update(1);
+ sketch.update(2);
+ Tuple result = func.exec(TUPLE_FACTORY.newTuple(new DataByteArray(sketch.toByteArray())));
+ Assert.assertNotNull(result);
+ Assert.assertEquals((Double) result.get(0), 2.0, 0.01);
+ Assert.assertTrue((Double) result.get(1) <= 2.0);
+ Assert.assertTrue((Double) result.get(2) >= 2.0);
+ }
+
+ @Test
+ public void schema() throws Exception {
+ EvalFunc<Tuple> func = new GetEstimateAndErrorBounds();
+ Schema inputSchema = new Schema(new Schema.FieldSchema("Sketch", DataType.BYTEARRAY));
+ Schema outputSchema = func.outputSchema(inputSchema);
+ Assert.assertNotNull(outputSchema);
+ Assert.assertEquals(outputSchema.size(), 1);
+ Assert.assertEquals(DataType.findTypeName(outputSchema.getField(0).type), "tuple");
+ Schema innerSchema = outputSchema.getField(0).schema;
+ Assert.assertEquals(innerSchema.size(), 3);
+ Assert.assertEquals(DataType.findTypeName(innerSchema.getField(0).type), "double");
+ Assert.assertEquals(DataType.findTypeName(innerSchema.getField(1).type), "double");
+ Assert.assertEquals(DataType.findTypeName(innerSchema.getField(2).type), "double");
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/cpc/GetEstimateTest.java b/src/test/java/com/yahoo/sketches/pig/cpc/GetEstimateTest.java
new file mode 100644
index 0000000..7ed7bb1
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/cpc/GetEstimateTest.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.TupleFactory;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+import junit.framework.Assert;
+
+public class GetEstimateTest {
+
+ private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+ @Test
+ public void nullInputTuple() throws Exception {
+ final EvalFunc<Double> func = new GetEstimate();
+ final Double result = func.exec(null);
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void emptyInputTuple() throws Exception {
+ final EvalFunc<Double> func = new GetEstimate();
+ final Double result = func.exec(TUPLE_FACTORY.newTuple());
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void normalCase() throws Exception {
+ final EvalFunc<Double> func = new GetEstimate();
+ final CpcSketch sketch = new CpcSketch();
+ sketch.update(1);
+ sketch.update(2);
+ Double result = func.exec(TUPLE_FACTORY.newTuple(new DataByteArray(sketch.toByteArray())));
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result, 2.0, 0.01);
+ }
+
+ @Test
+ public void normalCaseCustomSeed() throws Exception {
+ final EvalFunc<Double> func = new GetEstimate("123");
+ final CpcSketch sketch = new CpcSketch(12, 123);
+ sketch.update(1);
+ sketch.update(2);
+ Double result = func.exec(TUPLE_FACTORY.newTuple(new DataByteArray(sketch.toByteArray())));
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result, 2.0, 0.01);
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/cpc/SketchToStringTest.java b/src/test/java/com/yahoo/sketches/pig/cpc/SketchToStringTest.java
new file mode 100644
index 0000000..1783219
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/cpc/SketchToStringTest.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.TupleFactory;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+import junit.framework.Assert;
+
+public class SketchToStringTest {
+
+ private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+ @Test
+ public void nullInputTuple() throws Exception {
+ final EvalFunc<String> func = new SketchToString();
+ final String result = func.exec(null);
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void emptyInputTuple() throws Exception {
+ final EvalFunc<String> func = new SketchToString();
+ final String result = func.exec(TUPLE_FACTORY.newTuple());
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void normalCase() throws Exception {
+ final EvalFunc<String> func = new SketchToString();
+ final CpcSketch sketch = new CpcSketch();
+ final String result = func.exec(TUPLE_FACTORY.newTuple(new DataByteArray(sketch.toByteArray())));
+ Assert.assertNotNull(result);
+ Assert.assertTrue(result.length() > 0);
+ }
+
+ @Test
+ public void normalCaseWithDetail() throws Exception {
+ final EvalFunc<String> func = new SketchToString("true");
+ final CpcSketch sketch = new CpcSketch();
+ final String result = func.exec(TUPLE_FACTORY.newTuple(new DataByteArray(sketch.toByteArray())));
+ Assert.assertNotNull(result);
+ Assert.assertTrue(result.length() > 0);
+ }
+
+ @Test
+ public void normalCaseWithDetailAndSeed() throws Exception {
+ final EvalFunc<String> func = new SketchToString("true", "123");
+ final CpcSketch sketch = new CpcSketch(12, 123);
+ final String result = func.exec(TUPLE_FACTORY.newTuple(new DataByteArray(sketch.toByteArray())));
+ Assert.assertNotNull(result);
+ Assert.assertTrue(result.length() > 0);
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/cpc/UnionSketchTest.java b/src/test/java/com/yahoo/sketches/pig/cpc/UnionSketchTest.java
new file mode 100644
index 0000000..28cb96a
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/cpc/UnionSketchTest.java
@@ -0,0 +1,283 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.cpc;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.cpc.CpcSketch;
+
+public class UnionSketchTest {
+
+ private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+ private static final BagFactory BAG_FACTORY = BagFactory.getInstance();
+
+ @Test
+ public void execNullInputTuple() throws Exception {
+ final EvalFunc<DataByteArray> func = new UnionSketch();
+ final DataByteArray result = func.exec(null);
+ final CpcSketch sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void execEmptyInputTuple() throws Exception {
+ final EvalFunc<DataByteArray> func = new UnionSketch();
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple());
+ final CpcSketch sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void execEmptyBag() throws Exception {
+ final EvalFunc<DataByteArray> func = new UnionSketch();
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+ final CpcSketch sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void execNormalCase() throws Exception {
+ final EvalFunc<DataByteArray> func = new UnionSketch();
+ final CpcSketch inputSketch = new CpcSketch();
+ inputSketch.update(1);
+ inputSketch.update(2);
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+ final CpcSketch sketch = DataToSketchTest.getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+ }
+
+ @Test
+ public void execNormalCaseCustomLgKAndSeed() throws Exception {
+ final EvalFunc<DataByteArray> func = new UnionSketch("10", "123");
+ final CpcSketch inputSketch = new CpcSketch(10, 123);
+ inputSketch.update(1);
+ inputSketch.update(2);
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+ final CpcSketch sketch = DataToSketchTest.getSketch(result, 123);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+ Assert.assertEquals(sketch.getLgK(), 10);
+ }
+
+ @Test
+ public void accumulator() throws Exception {
+ final Accumulator<DataByteArray> func = new UnionSketch();
+
+ // no input yet
+ DataByteArray result = func.getValue();
+ CpcSketch sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // null input tuple
+ func.accumulate(null);
+ result = func.getValue();
+ sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // empty input tuple
+ func.accumulate(TUPLE_FACTORY.newTuple());
+ result = func.getValue();
+ sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // empty bag
+ func.accumulate(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+ result = func.getValue();
+ sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // normal case
+ CpcSketch inputSketch = new CpcSketch();
+ inputSketch.update(1);
+ inputSketch.update(2);
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+ func.accumulate(TUPLE_FACTORY.newTuple(bag));
+ result = func.getValue();
+ sketch = DataToSketchTest.getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+
+ // cleanup
+ func.cleanup();
+ result = func.getValue();
+ sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicInitial() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new UnionSketch().getInitial()).newInstance();
+ final Tuple input = TUPLE_FACTORY.newTuple();
+ final Tuple result = func.exec(input);
+ Assert.assertEquals(result, input);
+ }
+
+ @Test
+ public void algebraicIntermediateNullInputTuple() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed()).newInstance();
+ final Tuple result = func.exec(null);
+ final CpcSketch sketch = DataToSketchTest.getSketch((DataByteArray) result.get(0));
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicIntermediateEmptyInputTuple() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed()).newInstance();
+ final Tuple result = func.exec(TUPLE_FACTORY.newTuple());
+ final CpcSketch sketch = DataToSketchTest.getSketch((DataByteArray) result.get(0));
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicIntermediateFromInitial() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed()).newInstance();
+ final CpcSketch inputSketch = new CpcSketch();
+ inputSketch.update(1);
+ inputSketch.update(2);
+ inputSketch.update(3);
+ final DataBag outerBag = BAG_FACTORY.newDefaultBag();
+ final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+ innerBag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+ outerBag.add(TUPLE_FACTORY.newTuple(innerBag));
+ final Tuple result = func.exec(TUPLE_FACTORY.newTuple(outerBag));
+ final CpcSketch sketch = DataToSketchTest.getSketch((DataByteArray) result.get(0));
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 3.0, 0.01);
+ }
+
+ @Test
+ public void algebraicIntermediateFromIntermediate() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed()).newInstance();
+ final CpcSketch inputSketch = new CpcSketch();
+ inputSketch.update("a");
+ inputSketch.update("b");
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+ final Tuple result = func.exec(TUPLE_FACTORY.newTuple(bag));
+ final CpcSketch sketch = DataToSketchTest.getSketch((DataByteArray) result.get(0));
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+ }
+
+ @Test
+ public void algebraicIntermediateFromIntermediateCustomLgKAndSeed() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed())
+ .getConstructor(String.class, String.class).newInstance("10", "123");
+ final CpcSketch inputSketch = new CpcSketch(10, 123);
+ inputSketch.update("a");
+ inputSketch.update("b");
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+ final Tuple result = func.exec(TUPLE_FACTORY.newTuple(bag));
+ final CpcSketch sketch = DataToSketchTest.getSketch((DataByteArray) result.get(0), 123);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+ Assert.assertEquals(sketch.getLgK(), 10);
+ }
+
+ @Test
+ public void algebraicFinalNullInputTuple() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal()).newInstance();
+ final DataByteArray result = func.exec(null);
+ final CpcSketch sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicFinalEmptyInputTuple() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal()).newInstance();
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple());
+ final CpcSketch sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicFinalFromInitial() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal()).newInstance();
+ final CpcSketch inputSketch = new CpcSketch();
+ inputSketch.update(1);
+ inputSketch.update(2);
+ inputSketch.update(3);
+ final DataBag outerBag = BAG_FACTORY.newDefaultBag();
+ final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+ innerBag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+ outerBag.add(TUPLE_FACTORY.newTuple(innerBag));
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(outerBag));
+ final CpcSketch sketch = DataToSketchTest.getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 3.0, 0.01);
+ }
+
+ @Test
+ public void algebraicFinalFromInitialCustomLgKAndSeed() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal())
+ .getConstructor(String.class, String.class).newInstance("10", "123");
+ final CpcSketch inputSketch = new CpcSketch(10,123);
+ inputSketch.update(1);
+ inputSketch.update(2);
+ inputSketch.update(3);
+ final DataBag outerBag = BAG_FACTORY.newDefaultBag();
+ final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+ innerBag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+ outerBag.add(TUPLE_FACTORY.newTuple(innerBag));
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(outerBag));
+ final CpcSketch sketch = DataToSketchTest.getSketch(result, 123);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 3.0, 0.01);
+ Assert.assertEquals(sketch.getLgK(), 10);
+ }
+
+ @Test
+ public void algebraicFinalFromIntermediate() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal()).newInstance();
+ final CpcSketch inputSketch = new CpcSketch();
+ inputSketch.update("a");
+ inputSketch.update("b");
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+ final CpcSketch sketch = DataToSketchTest.getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/kll/DataToSketchTest.java b/src/test/java/com/yahoo/sketches/pig/kll/DataToSketchTest.java
new file mode 100644
index 0000000..dc342d3
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/kll/DataToSketchTest.java
@@ -0,0 +1,328 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+public class DataToSketchTest {
+ private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+ private static final BagFactory BAG_FACTORY = BagFactory.getInstance();
+
+ @Test
+ public void execNullInputTuple() throws Exception {
+ final EvalFunc<DataByteArray> func = new DataToSketch();
+ final DataByteArray result = func.exec(null);
+ final KllFloatsSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void execEmptyInputTuple() throws Exception {
+ final EvalFunc<DataByteArray> func = new DataToSketch();
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple());
+ final KllFloatsSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void execEmptyBag() throws Exception {
+ final EvalFunc<DataByteArray> func = new DataToSketch();
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+ final KllFloatsSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test(expectedExceptions = ClassCastException.class)
+ public void execWrongValueType() throws Exception {
+ final EvalFunc<DataByteArray> func = new DataToSketch();
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple("a"));
+ func.exec(TUPLE_FACTORY.newTuple(bag));
+ }
+
+ @Test
+ public void execNormalCase() throws Exception {
+ final EvalFunc<DataByteArray> func = new DataToSketch();
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple(Float.valueOf(1)));
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+ final KllFloatsSketch sketch = getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 1);
+ }
+
+ @Test
+ public void execMixedNullCase() throws Exception {
+ final EvalFunc<DataByteArray> func = new DataToSketch();
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple(Float.valueOf(1)));
+ bag.add(null);
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+ final KllFloatsSketch sketch = getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 1);
+ }
+
+ @Test
+ public void accumulator() throws Exception {
+ final Accumulator<DataByteArray> func = new DataToSketch();
+
+ // no input yet
+ DataByteArray result = func.getValue();
+ KllFloatsSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // null input tuple
+ func.accumulate(null);
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // empty input tuple
+ func.accumulate(TUPLE_FACTORY.newTuple());
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // empty bag
+ func.accumulate(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // normal case
+ DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple(Float.valueOf(1)));
+ func.accumulate(TUPLE_FACTORY.newTuple(bag));
+ func.accumulate(TUPLE_FACTORY.newTuple(bag));
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 2);
+
+ // mixed null case
+ bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple(Float.valueOf(1)));
+ bag.add(null);
+ func.accumulate(TUPLE_FACTORY.newTuple(bag));
+ func.accumulate(TUPLE_FACTORY.newTuple(bag));
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 4);
+
+ // cleanup
+ func.cleanup();
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void accumulatorCustomK() throws Exception {
+ final Accumulator<DataByteArray> func = new DataToSketch("400");
+ final KllFloatsSketch sketch = getSketch(func.getValue());
+ Assert.assertEquals(sketch.getK(), 400);
+ }
+
+ @Test
+ public void algebraicInitial() throws Exception {
+ final EvalFunc<Tuple> func = new DataToSketch.Initial();
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple());
+ final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(bag));
+ Assert.assertNotNull(resultTuple);
+ Assert.assertEquals(resultTuple.size(), 1);
+ Assert.assertTrue(resultTuple.get(0) instanceof DataBag);
+ Assert.assertEquals(((DataBag) resultTuple.get(0)).size(), 1);
+ }
+
+ @Test
+ public void algebraicIntermediateNullInputTupleCustomK() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func = (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed())
+ .getConstructor(String.class).newInstance("400");
+ final Tuple resultTuple = func.exec(null);
+ final KllFloatsSketch sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+ Assert.assertEquals(sketch.getK(), 400);
+ }
+
+ @Test
+ public void algebraicIntermediateEmptyInputTuple() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func = (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+ final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple());
+ final KllFloatsSketch sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicIntermediateNormalCase() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func = (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+
+ { // this is to simulate an output from Initial
+ final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+ innerBag.add(TUPLE_FACTORY.newTuple(Float.valueOf(1)));
+ bag.add(TUPLE_FACTORY.newTuple(innerBag));
+ }
+
+ { // this is to simulate an output from a prior call of IntermediateFinal
+ final KllFloatsSketch qs = new KllFloatsSketch();
+ qs.update(2);
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(qs.toByteArray())));
+ }
+
+ final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(bag));
+ final KllFloatsSketch sketch = getSketch(resultTuple);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 2);
+ }
+
+ @Test
+ public void algebraicIntermediateMixedNullCase() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func = (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+
+ { // this is to simulate an output from Initial
+ final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+ innerBag.add(TUPLE_FACTORY.newTuple(Float.valueOf(1)));
+ innerBag.add(null);
+ bag.add(TUPLE_FACTORY.newTuple(innerBag));
+ }
+
+ { // this is to simulate an output from a prior call of IntermediateFinal
+ final KllFloatsSketch qs = new KllFloatsSketch();
+ qs.update(2);
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(qs.toByteArray())));
+ }
+
+ final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(bag));
+ final KllFloatsSketch sketch = getSketch(resultTuple);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 2);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void algebraicIntermediateWrongType() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func = (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+
+ // this bag must have tuples with either bags or data byte arrays
+ bag.add(TUPLE_FACTORY.newTuple(1.0));
+ func.exec(TUPLE_FACTORY.newTuple(bag));
+ }
+
+ @Test
+ public void algebraicFinalNullInputTupleCustomK() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func = (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal())
+ .getConstructor(String.class).newInstance("400");
+ final DataByteArray result = func.exec(null);
+ final KllFloatsSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ Assert.assertEquals(sketch.getK(), 400);
+ }
+
+ @Test
+ public void algebraicFinalEmptyInputTuple() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func = (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple());
+ final KllFloatsSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicFinalNormalCase() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func = (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+
+ { // this is to simulate an output from Initial
+ final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+ innerBag.add(TUPLE_FACTORY.newTuple(Float.valueOf(1)));
+ bag.add(TUPLE_FACTORY.newTuple(innerBag));
+ }
+
+ { // this is to simulate an output from a prior call of Intermediate
+ final KllFloatsSketch qs = new KllFloatsSketch();
+ qs.update(2);
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(qs.toByteArray())));
+ }
+
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+ final KllFloatsSketch sketch = getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 2);
+ }
+
+ @Test
+ public void algebraicFinalMixedNullCase() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func = (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+
+ { // this is to simulate an output from Initial
+ final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+ innerBag.add(TUPLE_FACTORY.newTuple(Float.valueOf(1)));
+ innerBag.add(null);
+ bag.add(TUPLE_FACTORY.newTuple(innerBag));
+ }
+
+ { // this is to simulate an output from a prior call of Intermediate
+ final KllFloatsSketch qs = new KllFloatsSketch();
+ qs.update(2);
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(qs.toByteArray())));
+ }
+
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+ final KllFloatsSketch sketch = getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 2);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void algebraicFinalWrongType() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func = (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+
+ // this bag must have tuples with either bags or data byte arrays
+ bag.add(TUPLE_FACTORY.newTuple(1.0));
+ func.exec(TUPLE_FACTORY.newTuple(bag));
+ }
+
+ // end of tests
+
+ private static KllFloatsSketch getSketch(final Tuple tuple) throws Exception {
+ Assert.assertNotNull(tuple);
+ Assert.assertEquals(tuple.size(), 1);
+ final DataByteArray bytes = (DataByteArray) tuple.get(0);
+ return getSketch(bytes);
+ }
+
+ private static KllFloatsSketch getSketch(final DataByteArray bytes) throws Exception {
+ Assert.assertTrue(bytes.size() > 0);
+ return KllFloatsSketch.heapify(Memory.wrap(bytes.get()));
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/kll/GetCdfTest.java b/src/test/java/com/yahoo/sketches/pig/kll/GetCdfTest.java
new file mode 100644
index 0000000..fce1f3f
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/kll/GetCdfTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+import org.testng.Assert;
+
+public class GetCdfTest {
+ private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+ @Test
+ public void emptySketch() throws Exception {
+ final EvalFunc<Tuple> func = new GetCdf();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0f)));
+ Assert.assertNull(resultTuple);
+ }
+
+ @Test
+ public void normalCase() throws Exception {
+ final EvalFunc<Tuple> func = new GetCdf();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ for (int i = 1; i <= 10; i++) sketch.update(i);
+ final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 2f, 7f)));
+ Assert.assertNotNull(resultTuple);
+ Assert.assertEquals(resultTuple.size(), 3);
+ Assert.assertEquals(((double) resultTuple.get(0)), 0.1);
+ Assert.assertEquals(((double) resultTuple.get(1)), 0.6);
+ Assert.assertEquals(((double) resultTuple.get(2)), 1.0);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongNumberOfInputs() throws Exception {
+ final EvalFunc<Tuple> func = new GetCdf();
+ func.exec(TUPLE_FACTORY.newTuple(1));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeForSketch() throws Exception {
+ final EvalFunc<Tuple> func = new GetCdf();
+ func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(1.0, 1.0)));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeOfSplitPoint() throws Exception {
+ final EvalFunc<Tuple> func = new GetCdf();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 1)));
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/kll/GetKTest.java b/src/test/java/com/yahoo/sketches/pig/kll/GetKTest.java
new file mode 100644
index 0000000..e5d65a3
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/kll/GetKTest.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+import org.testng.Assert;
+
+public class GetKTest {
+
+ private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+ @Test
+ public void defalutK() throws Exception {
+ final EvalFunc<Integer> func = new GetK();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ final Integer result = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()))));
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result, Integer.valueOf(KllFloatsSketch.DEFAULT_K));
+ }
+
+ @Test
+ public void customK() throws Exception {
+ final EvalFunc<Integer> func = new GetK();
+ final KllFloatsSketch sketch = new KllFloatsSketch(400);
+ final Integer result = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()))));
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result, Integer.valueOf(400));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void noInputs() throws Exception {
+ final EvalFunc<Integer> func = new GetK();
+ func.exec(TUPLE_FACTORY.newTuple());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void tooManyInputs() throws Exception {
+ final EvalFunc<Integer> func = new GetK();
+ func.exec(TUPLE_FACTORY.newTuple(2));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeForSketch() throws Exception {
+ final EvalFunc<Integer> func = new GetK();
+ func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(1.0)));
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/kll/GetPmfTest.java b/src/test/java/com/yahoo/sketches/pig/kll/GetPmfTest.java
new file mode 100644
index 0000000..4693e84
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/kll/GetPmfTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+import org.testng.Assert;
+
+public class GetPmfTest {
+ private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+ @Test
+ public void emptySketch() throws Exception {
+ final EvalFunc<Tuple> func = new GetPmf();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0f)));
+ Assert.assertNull(resultTuple);
+ }
+
+ @Test
+ public void normalCase() throws Exception {
+ final EvalFunc<Tuple> func = new GetPmf();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ for (int i = 1; i <= 10; i++) sketch.update(i);
+ final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 2f, 7f)));
+ Assert.assertNotNull(resultTuple);
+ Assert.assertEquals(resultTuple.size(), 3);
+ Assert.assertEquals(((double) resultTuple.get(0)), 0.1);
+ Assert.assertEquals(((double) resultTuple.get(1)), 0.5);
+ Assert.assertEquals(((double) resultTuple.get(2)), 0.4);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongNumberOfInputs() throws Exception {
+ final EvalFunc<Tuple> func = new GetPmf();
+ func.exec(TUPLE_FACTORY.newTuple(1));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeForSketch() throws Exception {
+ final EvalFunc<Tuple> func = new GetPmf();
+ func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(1.0, 1.0)));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeOfSplitPoint() throws Exception {
+ final EvalFunc<Tuple> func = new GetPmf();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 1)));
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/kll/GetQuantileTest.java b/src/test/java/com/yahoo/sketches/pig/kll/GetQuantileTest.java
new file mode 100644
index 0000000..9b650e3
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/kll/GetQuantileTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+import org.testng.Assert;
+
+public class GetQuantileTest {
+ private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+ @Test
+ public void emptySketch() throws Exception {
+ final EvalFunc<Float> func = new GetQuantile();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ Float result = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.0)));
+ Assert.assertEquals(result, Float.NaN);
+ }
+
+ @Test
+ public void normalCase() throws Exception {
+ final EvalFunc<Float> func = new GetQuantile();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ sketch.update(1);
+ final Float result = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.5)));
+ Assert.assertEquals(result, 1f);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongNumberOfInputs() throws Exception {
+ final EvalFunc<Float> func = new GetQuantile();
+ func.exec(TUPLE_FACTORY.newTuple(1));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeForSketch() throws Exception {
+ final EvalFunc<Float> func = new GetQuantile();
+ func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(1.0, 1.0)));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeForFraction() throws Exception {
+ final EvalFunc<Float> func = new GetQuantile();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 1)));
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/kll/GetQuantilesTest.java b/src/test/java/com/yahoo/sketches/pig/kll/GetQuantilesTest.java
new file mode 100644
index 0000000..5dbc463
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/kll/GetQuantilesTest.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+import org.testng.Assert;
+
+public class GetQuantilesTest {
+ private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+ @Test
+ public void emptySketch() throws Exception {
+ final EvalFunc<Tuple> func = new GetQuantiles();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.5)));
+ Assert.assertNotNull(resultTuple);
+ Assert.assertEquals(resultTuple.size(), 1);
+ Assert.assertEquals(((float) resultTuple.get(0)), Float.NaN);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void tooFewInputs() throws Exception {
+ final EvalFunc<Tuple> func = new GetQuantiles();
+ func.exec(TUPLE_FACTORY.newTuple(1));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeForSketch() throws Exception {
+ final EvalFunc<Tuple> func = new GetQuantiles();
+ func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(1.0, 1.0)));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeForFractionOrNumberOfIntervals() throws Exception {
+ final EvalFunc<Tuple> func = new GetQuantiles();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), "")));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeAmongFractions() throws Exception {
+ final EvalFunc<Tuple> func = new GetQuantiles();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.0, 1)));
+ }
+
+ @Test
+ public void oneFraction() throws Exception {
+ final EvalFunc<Tuple> func = new GetQuantiles();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ for (int i = 1; i <= 10; i++) sketch.update(i);
+ final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.5)));
+ Assert.assertNotNull(resultTuple);
+ Assert.assertEquals(resultTuple.size(), 1);
+ Assert.assertEquals(((float) resultTuple.get(0)), 6f);
+ }
+
+ @Test
+ public void severalFractions() throws Exception {
+ final EvalFunc<Tuple> func = new GetQuantiles();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ for (int i = 1; i <= 10; i++) sketch.update(i);
+ final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.0, 0.5, 1.0)));
+ Assert.assertNotNull(resultTuple);
+ Assert.assertEquals(resultTuple.size(), 3);
+ Assert.assertEquals(((float) resultTuple.get(0)), 1f);
+ Assert.assertEquals(((float) resultTuple.get(1)), 6f);
+ Assert.assertEquals(((float) resultTuple.get(2)), 10f);
+ }
+
+ @Test
+ public void numberOfEvenlySpacedIntervals() throws Exception {
+ final EvalFunc<Tuple> func = new GetQuantiles();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ for (int i = 1; i <= 10; i++) sketch.update(i);
+ final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 3)));
+ Assert.assertNotNull(resultTuple);
+ Assert.assertEquals(resultTuple.size(), 3);
+ Assert.assertEquals(((float) resultTuple.get(0)), 1f);
+ Assert.assertEquals(((float) resultTuple.get(1)), 6f);
+ Assert.assertEquals(((float) resultTuple.get(2)), 10f);
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/kll/GetRankTest.java b/src/test/java/com/yahoo/sketches/pig/kll/GetRankTest.java
new file mode 100644
index 0000000..83d42df
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/kll/GetRankTest.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+import org.testng.Assert;
+
+public class GetRankTest {
+ private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+ @Test
+ public void emptySketch() throws Exception {
+ final EvalFunc<Double> func = new GetRank();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ final Double result = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0f)));
+ Assert.assertEquals(result, Double.NaN);
+ }
+
+ @Test
+ public void normalCase() throws Exception {
+ final EvalFunc<Double> func = new GetRank();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ sketch.update(1);
+ sketch.update(2);
+ sketch.update(3);
+ sketch.update(4);
+ final Double result = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 3f)));
+ Assert.assertEquals(result, 0.5);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongNumberOfInputs() throws Exception {
+ final EvalFunc<Double> func = new GetRank();
+ func.exec(TUPLE_FACTORY.newTuple(1));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeForSketch() throws Exception {
+ final EvalFunc<Double> func = new GetRank();
+ func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(1.0, 1.0)));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeForValue() throws Exception {
+ final EvalFunc<Double> func = new GetRank();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 1)));
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/kll/SketchToStringTest.java b/src/test/java/com/yahoo/sketches/pig/kll/SketchToStringTest.java
new file mode 100644
index 0000000..62e1d09
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/kll/SketchToStringTest.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.TupleFactory;
+
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+import org.testng.Assert;
+
+public class SketchToStringTest {
+ private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+ @Test
+ public void normalCase() throws Exception {
+ final EvalFunc<String> func = new SketchToString();
+ final KllFloatsSketch sketch = new KllFloatsSketch();
+ final String result = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()))));
+ Assert.assertNotNull(result);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void noInputs() throws Exception {
+ final EvalFunc<String> func = new SketchToString();
+ func.exec(TUPLE_FACTORY.newTuple());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void tooManyInputs() throws Exception {
+ final EvalFunc<String> func = new SketchToString();
+ func.exec(TUPLE_FACTORY.newTuple(2));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void wrongTypeForSketch() throws Exception {
+ final EvalFunc<String> func = new SketchToString();
+ func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(1.0)));
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/kll/UnionSketchTest.java b/src/test/java/com/yahoo/sketches/pig/kll/UnionSketchTest.java
new file mode 100644
index 0000000..dd87b5a
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/kll/UnionSketchTest.java
@@ -0,0 +1,266 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.kll;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.kll.KllFloatsSketch;
+
+public class UnionSketchTest {
+ private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+ private static final BagFactory BAG_FACTORY = BagFactory.getInstance();
+
+ @Test
+ public void execNullInputTuple() throws Exception {
+ final EvalFunc<DataByteArray> func = new UnionSketch();
+ final DataByteArray result = func.exec(null);
+ final KllFloatsSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void execEmptyInputTuple() throws Exception {
+ final EvalFunc<DataByteArray> func = new UnionSketch();
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple());
+ final KllFloatsSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void execEmptyBag() throws Exception {
+ final EvalFunc<DataByteArray> func = new UnionSketch();
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+ final KllFloatsSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void execNormalCase() throws Exception {
+ final EvalFunc<DataByteArray> func = new UnionSketch();
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+ final KllFloatsSketch inputSketch = new KllFloatsSketch();
+ inputSketch.update(1);
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+ final KllFloatsSketch sketch = getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 1);
+ }
+
+ @Test
+ public void accumulator() throws Exception {
+ final Accumulator<DataByteArray> func = new UnionSketch();
+
+ // no input yet
+ DataByteArray result = func.getValue();
+ KllFloatsSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // null input tuple
+ func.accumulate(null);
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // empty input tuple
+ func.accumulate(TUPLE_FACTORY.newTuple());
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // empty bag
+ func.accumulate(TUPLE_FACTORY.newTuple(BAG_FACTORY.newDefaultBag()));
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // normal case
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+ final KllFloatsSketch inputSketch = new KllFloatsSketch();
+ inputSketch.update(1);
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(inputSketch.toByteArray())));
+ func.accumulate(TUPLE_FACTORY.newTuple(bag));
+ func.accumulate(TUPLE_FACTORY.newTuple(bag));
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 2);
+
+ // cleanup
+ func.cleanup();
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void accumulatorCustomK() throws Exception {
+ final Accumulator<DataByteArray> func = new UnionSketch("400");
+ final KllFloatsSketch sketch = getSketch(func.getValue());
+ Assert.assertEquals(sketch.getK(), 400);
+ }
+
+ @Test
+ public void algebraicInitial() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new UnionSketch().getInitial())
+ .getConstructor(String.class).newInstance("400");
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple());
+ final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(bag));
+ Assert.assertNotNull(resultTuple);
+ Assert.assertEquals(resultTuple.size(), 1);
+ Assert.assertTrue(resultTuple.get(0) instanceof DataBag);
+ Assert.assertEquals(((DataBag) resultTuple.get(0)).size(), 1);
+ }
+
+ @Test
+ public void algebraicIntermediateNullInputTupleCustomK() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed())
+ .getConstructor(String.class).newInstance("400");
+ final Tuple resultTuple = func.exec(null);
+ final KllFloatsSketch sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+ Assert.assertEquals(sketch.getK(), 400);
+ }
+
+ @Test
+ public void algebraicIntermediateEmptyInputTuple() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed()).newInstance();
+ final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple());
+ final KllFloatsSketch sketch = getSketch(resultTuple);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicIntermediateNormalCase() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed()).newInstance();
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+
+ { // this is to simulate an output from Initial
+ final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+ final KllFloatsSketch qs = new KllFloatsSketch();
+ qs.update(1);
+ innerBag.add(TUPLE_FACTORY.newTuple(new DataByteArray(qs.toByteArray())));
+ bag.add(TUPLE_FACTORY.newTuple(innerBag));
+ }
+
+ { // this is to simulate an output from a prior call of Intermediate
+ final KllFloatsSketch qs = new KllFloatsSketch();
+ qs.update(2);
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(qs.toByteArray())));
+ }
+
+ final Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(bag));
+ final KllFloatsSketch sketch = getSketch(resultTuple);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 2);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void algebraicIntermediateWrongType() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed()).newInstance();
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+
+ // this bag must have tuples with either bags or data byte arrays
+ bag.add(TUPLE_FACTORY.newTuple(1.0));
+ func.exec(TUPLE_FACTORY.newTuple(bag));
+ }
+
+ @Test
+ public void algebraicFinalNullInputTupleCustomK() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal())
+ .getConstructor(String.class).newInstance("400");
+ final DataByteArray result = func.exec(null);
+ final KllFloatsSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ Assert.assertEquals(sketch.getK(), 400);
+ }
+
+ @Test
+ public void algebraicFinalEmptyInputTuple() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal()).newInstance();
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple());
+ final KllFloatsSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicFinalNormalCase() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal()).newInstance();
+ final DataBag bag = BAG_FACTORY.newDefaultBag();
+
+ { // this is to simulate an output from Initial
+ final DataBag innerBag = BAG_FACTORY.newDefaultBag();
+ final KllFloatsSketch qs = new KllFloatsSketch();
+ qs.update(1);
+ innerBag.add(TUPLE_FACTORY.newTuple(new DataByteArray(qs.toByteArray())));
+ bag.add(TUPLE_FACTORY.newTuple(innerBag));
+ }
+
+ { // this is to simulate an output from a prior call of Intermediate
+ final KllFloatsSketch qs = new KllFloatsSketch();
+ qs.update(2);
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(qs.toByteArray())));
+ }
+
+ final DataByteArray result = func.exec(TUPLE_FACTORY.newTuple(bag));
+ final KllFloatsSketch sketch = getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 2);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void algebraicIntermediateFinalWrongType() throws Exception {
+ @SuppressWarnings("unchecked")
+ final EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal()).newInstance();
+ DataBag bag = BAG_FACTORY.newDefaultBag();
+
+ // this bag must have tuples with either bags or data byte arrays
+ bag.add(TUPLE_FACTORY.newTuple(1.0));
+ func.exec(TUPLE_FACTORY.newTuple(bag));
+ }
+
+ // end of tests
+
+ private static KllFloatsSketch getSketch(final Tuple tuple) throws Exception {
+ Assert.assertNotNull(tuple);
+ Assert.assertEquals(tuple.size(), 1);
+ final DataByteArray bytes = (DataByteArray) tuple.get(0);
+ return getSketch(bytes);
+ }
+
+ private static KllFloatsSketch getSketch(final DataByteArray bytes) throws Exception {
+ Assert.assertTrue(bytes.size() > 0);
+ return KllFloatsSketch.heapify(Memory.wrap(bytes.get()));
+ }
+
+}