Merge pull request #43 from DataSketches/hll-udfs
HLL sketch UDFs
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/AlgebraicFinal.java b/src/main/java/com/yahoo/sketches/pig/hll/AlgebraicFinal.java
new file mode 100644
index 0000000..edb84b7
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/AlgebraicFinal.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.hll.HllSketch;
+import com.yahoo.sketches.hll.TgtHllType;
+import com.yahoo.sketches.hll.Union;
+
+/**
+ * Class used to calculate the final pass of an <i>Algebraic</i> sketch
+ * operation. It will receive a bag of values returned by either the <i>Intermediate</i>
+ * stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
+ * interpret both types.
+ *
+ * @author Alexander Saydakov
+ */
+abstract class AlgebraicFinal extends EvalFunc<DataByteArray> {
+
+ private final int lgK_;
+ private final TgtHllType tgtHllType_;
+ private DataByteArray emptySketch_; // this is to cash an empty sketch tuple
+ private boolean isFirstCall_ = true; // for logging
+
+ /**
+ * Constructor with primitives for the final passes of an Algebraic function.
+ *
+ * @param lgK parameter controlling the sketch size and accuracy
+ * @param tgtHllType HLL type of the resulting sketch
+ */
+ public AlgebraicFinal(final int lgK, final TgtHllType tgtHllType) {
+ lgK_ = lgK;
+ tgtHllType_ = tgtHllType;
+ }
+
+ @Override
+ public DataByteArray exec(final Tuple inputTuple) throws IOException {
+ if (isFirstCall_) {
+ Logger.getLogger(getClass()).info("Algebraic was used");
+ isFirstCall_ = false;
+ }
+ if (inputTuple == null || inputTuple.size() == 0) {
+ return getEmptySketch();
+ }
+ final Union union = new Union(lgK_);
+ final DataBag outerBag = (DataBag) inputTuple.get(0);
+ if (outerBag == null) {
+ return getEmptySketch();
+ }
+ for (final Tuple dataTuple: outerBag) {
+ final Object f0 = dataTuple.get(0); // inputTuple.bag0.dataTupleN.f0
+ if (f0 == null) {
+ continue;
+ }
+ if (f0 instanceof DataBag) {
+ final DataBag innerBag = (DataBag) f0; // inputTuple.bag0.dataTupleN.f0:bag
+ if (innerBag.size() == 0) { continue; }
+ // If field 0 of a dataTuple is a Bag, all innerTuples of this inner bag
+ // will be passed into the union.
+ // It is due to system bagged outputs from multiple mapper Initial functions.
+ // The Intermediate stage was bypassed.
+ updateUnion(innerBag, union);
+ } else if (f0 instanceof DataByteArray) { // inputTuple.bag0.dataTupleN.f0:DBA
+ // If field 0 of a dataTuple is a DataByteArray, we assume it is a sketch
+ // due to system bagged outputs from multiple mapper Intermediate functions.
+ // Each dataTuple.DBA:sketch will merged into the union.
+ final DataByteArray dba = (DataByteArray) f0;
+ union.update(HllSketch.heapify(Memory.wrap(dba.get())));
+ } else { // we should never get here
+ throw new IllegalArgumentException("dataTuple.Field0 is not a DataBag or DataByteArray: "
+ + f0.getClass().getName());
+ }
+ }
+ return new DataByteArray(union.getResult(tgtHllType_).toCompactByteArray());
+ }
+
+ abstract void updateUnion(DataBag bag, Union union) throws ExecException;
+
+ private DataByteArray getEmptySketch() {
+ if (emptySketch_ == null) {
+ emptySketch_ = new DataByteArray(
+ new HllSketch(lgK_, tgtHllType_).toCompactByteArray());
+ }
+ return emptySketch_;
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/AlgebraicInitial.java b/src/main/java/com/yahoo/sketches/pig/hll/AlgebraicInitial.java
new file mode 100644
index 0000000..9cf33f6
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/AlgebraicInitial.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Class used to calculate the initial pass of an Algebraic sketch operation.
+ *
+ * <p>The Initial class simply passes through all records unchanged so that they can be
+ * processed by the intermediate processor instead.
+ *
+ * @author Alexander Saydakov
+ */
+public class AlgebraicInitial extends EvalFunc<Tuple> {
+
+ private boolean isFirstCall_ = true;
+
+ /**
+ * Default constructor for the initial pass of an Algebraic function.
+ */
+ public AlgebraicInitial() {}
+
+ /**
+ * Constructor for the initial pass of an Algebraic function. Pig will call this and pass the
+ * same constructor arguments as the original UDF. In this case the arguments are ignored.
+ *
+ * @param lgK in a form of a String
+ */
+ public AlgebraicInitial(final String lgK) {}
+
+ /**
+ * Constructor for the initial pass of an Algebraic function. Pig will call this and pass the
+ * same constructor arguments as the original UDF. In this case the arguments are ignored.
+ *
+ * @param lgK in a form of a String
+ * @param tgtHllType in a form of a String
+ */
+ public AlgebraicInitial(final String lgK, final String tgtHllType) {}
+
+ @Override
+ public Tuple exec(final Tuple inputTuple) throws IOException {
+ if (isFirstCall_) {
+ Logger.getLogger(getClass()).info("Algebraic was used");
+ isFirstCall_ = false;
+ }
+ return inputTuple;
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/AlgebraicIntermediate.java b/src/main/java/com/yahoo/sketches/pig/hll/AlgebraicIntermediate.java
new file mode 100644
index 0000000..48b49aa
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/AlgebraicIntermediate.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.hll.HllSketch;
+import com.yahoo.sketches.hll.TgtHllType;
+import com.yahoo.sketches.hll.Union;
+
+/**
+ * Class used to calculate the intermediate combiner pass of an <i>Algebraic</i> sketch
+ * operation. This is called from the combiner, and may be called multiple times (from a mapper
+ * and from a reducer). It will receive a bag of values returned by either the <i>Intermediate</i>
+ * stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
+ * interpret both types.
+ *
+ * @author Alexander Saydakov
+ */
+abstract class AlgebraicIntermediate extends EvalFunc<Tuple> {
+
+ private static final TupleFactory tupleFactory_ = TupleFactory.getInstance();
+
+ private final int lgK_;
+ private final TgtHllType tgtHllType_;
+ private Tuple emptySketchTuple_; // this is to cash an empty sketch tuple
+ private boolean isFirstCall_ = true; // for logging
+
+ /**
+ * Constructor with primitives for the intermediate pass of an Algebraic function.
+ *
+ * @param lgK parameter controlling the sketch size and accuracy
+ * @param tgtHllType HLL type of the resulting sketch
+ */
+ public AlgebraicIntermediate(final int lgK, final TgtHllType tgtHllType) {
+ lgK_ = lgK;
+ tgtHllType_ = tgtHllType;
+ }
+
+ @Override
+ public Tuple exec(final Tuple inputTuple) throws IOException {
+ if (isFirstCall_) {
+ Logger.getLogger(getClass()).info("Algebraic was used");
+ isFirstCall_ = false;
+ }
+ if (inputTuple == null || inputTuple.size() == 0) {
+ return getEmptySketchTuple();
+ }
+ final DataBag outerBag = (DataBag) inputTuple.get(0);
+ if (outerBag == null) {
+ return getEmptySketchTuple();
+ }
+ final Union union = new Union(lgK_);
+ for (final Tuple dataTuple: outerBag) {
+ final Object f0 = dataTuple.get(0); // inputTuple.bag0.dataTupleN.f0
+ if (f0 == null) { continue; }
+ if (f0 instanceof DataBag) {
+ final DataBag innerBag = (DataBag) f0; // inputTuple.bag0.dataTupleN.f0:bag
+ if (innerBag.size() == 0) { continue; }
+ // If field 0 of a dataTuple is a Bag, all innerTuples of this inner bag
+ // will be passed into the union.
+ // It is due to system bagged outputs from multiple mapper Initial functions.
+ // The Intermediate stage was bypassed.
+ updateUnion(innerBag, union);
+ } else if (f0 instanceof DataByteArray) { // inputTuple.bag0.dataTupleN.f0:DBA
+ // If field 0 of a dataTuple is a DataByteArray, we assume it is a sketch
+ // due to system bagged outputs from multiple mapper Intermediate functions.
+ // Each dataTuple.DBA:sketch will merged into the union.
+ final DataByteArray dba = (DataByteArray) f0;
+ union.update(HllSketch.heapify(Memory.wrap(dba.get())));
+ } else { // we should never get here
+ throw new IllegalArgumentException("dataTuple.Field0 is not a DataBag or DataByteArray: "
+ + f0.getClass().getName());
+ }
+ }
+ return tupleFactory_.newTuple(new DataByteArray(union.getResult(tgtHllType_).toCompactByteArray()));
+ }
+
+ abstract void updateUnion(DataBag bag, Union union) throws ExecException;
+
+ private Tuple getEmptySketchTuple() {
+ if (emptySketchTuple_ == null) {
+ emptySketchTuple_ = tupleFactory_.newTuple(new DataByteArray(
+ new HllSketch(lgK_, tgtHllType_).toCompactByteArray()));
+ }
+ return emptySketchTuple_;
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/DataToSketch.java b/src/main/java/com/yahoo/sketches/pig/hll/DataToSketch.java
new file mode 100644
index 0000000..8944c51
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/DataToSketch.java
@@ -0,0 +1,236 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.hll.HllSketch;
+import com.yahoo.sketches.hll.TgtHllType;
+import com.yahoo.sketches.hll.Union;
+
+/**
+ * This is a Pig UDF that builds Sketches from data.
+ * This class implements both the <i>Accumulator</i> and <i>Algebraic</i> interfaces.
+ *
+ * @author Alexander Saydakov
+ */
+public class DataToSketch extends EvalFunc<DataByteArray> implements Accumulator<DataByteArray>, Algebraic {
+
+ static final int DEFAULT_LG_K = 12;
+ static final TgtHllType DEFAULT_HLL_TYPE = TgtHllType.HLL_4;
+
+ private DataByteArray emptySketch_; // this is to cash an empty sketch
+
+ private final int lgK_;
+ private final TgtHllType tgtHllType_;
+ private Union accumUnion_;
+ private boolean isFirstCall_;
+
+ /**
+ * Constructor with default lgK and target HLL type
+ */
+ public DataToSketch() {
+ this(DEFAULT_LG_K, DEFAULT_HLL_TYPE);
+ }
+
+ /**
+ * Constructor with given lgK as string and default target HLL type.
+ *
+ * @param lgK in a form of a String
+ */
+ public DataToSketch(final String lgK) {
+ this(Integer.parseInt(lgK), DEFAULT_HLL_TYPE);
+ }
+
+ /**
+ * Constructor with given lgK and target HLL type as strings
+ *
+ * @param lgK in a form of a String
+ * @param tgtHllType in a form of a String
+ */
+ public DataToSketch(final String lgK, final String tgtHllType) {
+ this(Integer.parseInt(lgK), TgtHllType.valueOf(tgtHllType));
+ }
+
+ /**
+ * Base constructor.
+ *
+ * @param lgK parameter controlling the sketch size and accuracy
+ * @param tgtHllType HLL type of the resulting sketch
+ */
+ public DataToSketch(final int lgK, final TgtHllType tgtHllType) {
+ super();
+ lgK_ = lgK;
+ tgtHllType_ = tgtHllType;
+ }
+
+ /**
+ * Top-level exec function.
+ * This method accepts an input Tuple containing a Bag of one or more inner <b>Datum Tuples</b>
+ * and returns a single serialized HllSketch as a DataByteArray.
+ *
+ * <b>Datum Tuple</b> is a Tuple containing a single field, which can be one of the following
+ * (Java type: Pig type):
+ * <ul>
+ * <li>Byte: BYTE</li>
+ * <li>Integer: INTEGER</li>
+ * <li>Long: LONG</li>
+ * <li>Float: FLOAT</li>
+ * <li>Double: DOUBLE</li>
+ * <li>String: CHARARRAY</li>
+ * <li>DataByteArray: BYTEARRAY</li>
+ * </ul>
+ *
+ * @param inputTuple A tuple containing a single bag, containing Datum Tuples.
+ * @return serialized HllSketch
+ * @see "org.apache.pig.EvalFunc.exec(org.apache.pig.data.Tuple)"
+ * @throws IOException from Pig.
+ */
+
+ @Override
+ public DataByteArray exec(final Tuple inputTuple) throws IOException {
+ if (isFirstCall_) {
+ Logger.getLogger(getClass()).info("Exec was used");
+ isFirstCall_ = false;
+ }
+ if (inputTuple == null || inputTuple.size() == 0) {
+ if (emptySketch_ == null) {
+ emptySketch_ = new DataByteArray(new HllSketch(lgK_, tgtHllType_).toCompactByteArray());
+ }
+ return emptySketch_;
+ }
+ final Union union = new Union(lgK_);
+ final DataBag bag = (DataBag) inputTuple.get(0);
+ updateUnion(bag, union);
+ return new DataByteArray(union.getResult(tgtHllType_).toCompactByteArray());
+ }
+
+ /**
+ * An <i>Accumulator</i> version of the standard <i>exec()</i> method. Like <i>exec()</i>,
+ * accumulator is called with a bag of Datum Tuples. Unlike <i>exec()</i>, it doesn't serialize the
+ * result at the end. Instead, it can be called multiple times, each time with another bag of
+ * Datum Tuples to be input to the sketch.
+ *
+ * @param inputTuple A tuple containing a single bag, containing Datum Tuples.
+ * @see #exec
+ * @see "org.apache.pig.Accumulator.accumulate(org.apache.pig.data.Tuple)"
+ * @throws IOException by Pig
+ */
+ @Override
+ public void accumulate(final Tuple inputTuple) throws IOException {
+ if (isFirstCall_) {
+ Logger.getLogger(getClass()).info("Accumulator was used");
+ isFirstCall_ = false;
+ }
+ if (inputTuple == null || inputTuple.size() == 0) { return; }
+ final DataBag bag = (DataBag) inputTuple.get(0);
+ if (bag == null) { return; }
+ if (accumUnion_ == null) {
+ accumUnion_ = new Union(lgK_);
+ }
+ updateUnion(bag, accumUnion_);
+ }
+
+ /**
+ * Returns the sketch that has been built up by multiple calls to {@link #accumulate}.
+ *
+ * @return serialized HllSketch
+ * @see "org.apache.pig.Accumulator.getValue()"
+ */
+ @Override
+ public DataByteArray getValue() {
+ if (accumUnion_ == null) {
+ if (emptySketch_ == null) {
+ emptySketch_ = new DataByteArray(new HllSketch(lgK_, tgtHllType_).toCompactByteArray());
+ }
+ return emptySketch_;
+ }
+ return new DataByteArray(accumUnion_.getResult(tgtHllType_).toCompactByteArray());
+ }
+
+ /**
+ * Cleans up the UDF state after being called using the {@link Accumulator} interface.
+ *
+ * @see "org.apache.pig.Accumulator.cleanup()"
+ */
+ @Override
+ public void cleanup() {
+ accumUnion_ = null;
+ }
+
+ @Override
+ public String getInitial() {
+ return AlgebraicInitial.class.getName();
+ }
+
+ @Override
+ public String getIntermed() {
+ return DataToSketchAlgebraicIntermediate.class.getName();
+ }
+
+ @Override
+ public String getFinal() {
+ return DataToSketchAlgebraicFinal.class.getName();
+ }
+
+ static void updateUnion(final DataBag bag, final Union union) throws ExecException {
+ //Bag is not empty. process each innerTuple in the bag
+ for (final Tuple innerTuple: bag) {
+ final Object f0 = innerTuple.get(0); // consider only field 0
+ if (f0 == null) {
+ continue;
+ }
+ final byte type = innerTuple.getType(0);
+
+ switch (type) {
+ case DataType.NULL:
+ break;
+ case DataType.BYTE:
+ union.update((byte) f0);
+ break;
+ case DataType.INTEGER:
+ union.update((int) f0);
+ break;
+ case DataType.LONG:
+ union.update((long) f0);
+ break;
+ case DataType.FLOAT:
+ union.update((float) f0);
+ break;
+ case DataType.DOUBLE:
+ union.update((double) f0);
+ break;
+ case DataType.BYTEARRAY: {
+ final DataByteArray dba = (DataByteArray) f0;
+ union.update(dba.get());
+ break;
+ }
+ case DataType.CHARARRAY: {
+ final String str = (String) f0;
+ // conversion to char[] avoids costly UTF-8 encoding
+ union.update(str.toCharArray());
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("Field 0 of innerTuple must be one of "
+ + "NULL, BYTE, INTEGER, LONG, FLOAT, DOUBLE, BYTEARRAY or CHARARRAY. "
+ + "Given Type = " + DataType.findTypeName(type)
+ + ", Object = " + f0.toString());
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/DataToSketchAlgebraicFinal.java b/src/main/java/com/yahoo/sketches/pig/hll/DataToSketchAlgebraicFinal.java
new file mode 100644
index 0000000..f34daeb
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/DataToSketchAlgebraicFinal.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_HLL_TYPE;
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_LG_K;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+
+import com.yahoo.sketches.hll.TgtHllType;
+import com.yahoo.sketches.hll.Union;
+
+public class DataToSketchAlgebraicFinal extends AlgebraicFinal {
+
+ /**
+ * Default constructor for the final pass of an Algebraic function.
+ * Assumes default lgK and target HLL type.
+ */
+ public DataToSketchAlgebraicFinal() {
+ super(DEFAULT_LG_K, DEFAULT_HLL_TYPE);
+ }
+
+ /**
+ * Constructor for the final pass of an Algebraic function. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ * Assumes default HLL target type.
+ *
+ * @param lgK in a form of a String
+ */
+ public DataToSketchAlgebraicFinal(final String lgK) {
+ super(Integer.parseInt(lgK), DEFAULT_HLL_TYPE);
+ }
+
+ /**
+ * Constructor for the final pass of an Algebraic function. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ *
+ * @param lgK parameter controlling the sketch size and accuracy
+ * @param tgtHllType HLL type of the resulting sketch
+ */
+ public DataToSketchAlgebraicFinal(final String lgK, final String tgtHllType) {
+ super(Integer.parseInt(lgK), TgtHllType.valueOf(tgtHllType));
+ }
+
+ @Override
+ void updateUnion(final DataBag bag, final Union union) throws ExecException {
+ DataToSketch.updateUnion(bag, union);
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/DataToSketchAlgebraicIntermediate.java b/src/main/java/com/yahoo/sketches/pig/hll/DataToSketchAlgebraicIntermediate.java
new file mode 100644
index 0000000..2fd9cdb
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/DataToSketchAlgebraicIntermediate.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_HLL_TYPE;
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_LG_K;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+
+import com.yahoo.sketches.hll.TgtHllType;
+import com.yahoo.sketches.hll.Union;
+
+public class DataToSketchAlgebraicIntermediate extends AlgebraicIntermediate {
+
+ /**
+ * Default constructor for the intermediate pass of an Algebraic function.
+ * Assumes default lgK and target HLL type.
+ */
+ public DataToSketchAlgebraicIntermediate() {
+ super(DEFAULT_LG_K, DEFAULT_HLL_TYPE);
+ }
+
+ /**
+ * Constructor for the intermediate pass of an Algebraic function. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ * Assumes default HLL target type.
+ *
+ * @param lgK in a form of a String
+ */
+ public DataToSketchAlgebraicIntermediate(final String lgK) {
+ super(Integer.parseInt(lgK), DEFAULT_HLL_TYPE);
+ }
+
+ /**
+ * Constructor for the intermediate pass of an Algebraic function. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ *
+ * @param lgK parameter controlling the sketch size and accuracy
+ * @param tgtHllType HLL type of the resulting sketch
+ */
+ public DataToSketchAlgebraicIntermediate(final String lgK, final String tgtHllType) {
+ super(Integer.parseInt(lgK), TgtHllType.valueOf(tgtHllType));
+ }
+
+ @Override
+ void updateUnion(final DataBag bag, final Union union) throws ExecException {
+ DataToSketch.updateUnion(bag, union);
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/SketchToEstimate.java b/src/main/java/com/yahoo/sketches/pig/hll/SketchToEstimate.java
new file mode 100644
index 0000000..1e4a3c7
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/SketchToEstimate.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.hll.HllSketch;
+
+/**
+ * This is a User Defined Function (UDF) for getting a unique count estimate from an HllSketch
+ *
+ * @author Alexander Saydakov
+ */
+public class SketchToEstimate extends EvalFunc<Double> {
+
+ @Override
+ public Double exec(final Tuple sketchTuple) throws IOException {
+ if ((sketchTuple == null) || (sketchTuple.size() == 0)) {
+ return null;
+ }
+ final DataByteArray dba = (DataByteArray) sketchTuple.get(0);
+ final HllSketch sketch = HllSketch.heapify(dba.get());
+ return sketch.getEstimate();
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/SketchToEstimateAndErrorBounds.java b/src/main/java/com/yahoo/sketches/pig/hll/SketchToEstimateAndErrorBounds.java
new file mode 100644
index 0000000..798cea2
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/SketchToEstimateAndErrorBounds.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import com.yahoo.sketches.hll.HllSketch;
+
+/**
+ * This is a User Defined Function (UDF) for obtaining the unique count estimate
+ * along with a lower and upper bound from an HllSketch.
+ *
+ * <p>The result is a tuple with three double values: estimate, lower bound and upper bound.
+ * The bounds are given at 95.5% confidence.
+ *
+ * @author Alexander Saydakov
+ */
+public class SketchToEstimateAndErrorBounds extends EvalFunc<Tuple> {
+
+ @Override
+ public Tuple exec(final Tuple sketchTuple) throws IOException {
+ if ((sketchTuple == null) || (sketchTuple.size() == 0)) {
+ return null;
+ }
+ final DataByteArray dba = (DataByteArray) sketchTuple.get(0);
+ final HllSketch sketch = HllSketch.heapify(dba.get());
+ final Tuple outputTuple = TupleFactory.getInstance().newTuple(3);
+ outputTuple.set(0, Double.valueOf(sketch.getEstimate()));
+ outputTuple.set(1, Double.valueOf(sketch.getLowerBound(2)));
+ outputTuple.set(2, Double.valueOf(sketch.getUpperBound(2)));
+ return outputTuple;
+ }
+
+ /**
+ * The output is a Sketch Result Tuple Schema.
+ */
+ @Override
+ public Schema outputSchema(final Schema input) {
+ if (input == null) { return null; }
+ try {
+ final Schema tupleSchema = new Schema();
+ tupleSchema.add(new Schema.FieldSchema("Estimate", DataType.DOUBLE));
+ tupleSchema.add(new Schema.FieldSchema("LowerBound", DataType.DOUBLE));
+ tupleSchema.add(new Schema.FieldSchema("UpperBound", DataType.DOUBLE));
+ return new Schema(new Schema.FieldSchema(getSchemaName(this
+ .getClass().getName().toLowerCase(), input), tupleSchema, DataType.TUPLE));
+ } catch (final FrontendException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/SketchToString.java b/src/main/java/com/yahoo/sketches/pig/hll/SketchToString.java
new file mode 100644
index 0000000..1c6c9de
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/SketchToString.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.sketches.hll.HllSketch;
+
+/**
+ * This is a User Defined Function (UDF) for "pretty printing" the summary of an HllSketch
+ *
+ * @author Alexander Saydakov
+ */
+public class SketchToString extends EvalFunc<String> {
+
+ private final boolean hllDetail_;
+ private final boolean auxDetail_;
+
+ /**
+ * Prints only the sketch summary.
+ */
+ public SketchToString() {
+ this(false, false);
+ }
+
+ /**
+ * Prints the summary and details based on given input parameters.
+ *
+ * @param hllDetail flag to print the HLL sketch detail
+ * @param auxDetail flag to print the auxiliary detail
+ */
+ public SketchToString(final String hllDetail, final String auxDetail) {
+ this(Boolean.parseBoolean(hllDetail), Boolean.parseBoolean(auxDetail));
+ }
+
+ /**
+ * Internal constructor with primitive parameters.
+ *
+ * @param hllDetail flag to print the HLL sketch detail
+ * @param auxDetail flag to print the auxiliary detail
+ */
+ private SketchToString(final boolean hllDetail, final boolean auxDetail) {
+ super();
+ hllDetail_ = hllDetail;
+ auxDetail_ = auxDetail;
+ }
+
+ @Override
+ public String exec(final Tuple sketchTuple) throws IOException {
+ if ((sketchTuple == null) || (sketchTuple.size() == 0)) {
+ return null;
+ }
+ final DataByteArray dba = (DataByteArray) sketchTuple.get(0);
+ final HllSketch sketch = HllSketch.heapify(dba.get());
+ return sketch.toString(true, hllDetail_, auxDetail_);
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/UnionSketch.java b/src/main/java/com/yahoo/sketches/pig/hll/UnionSketch.java
new file mode 100644
index 0000000..814c17b
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/UnionSketch.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_HLL_TYPE;
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_LG_K;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.hll.HllSketch;
+import com.yahoo.sketches.hll.TgtHllType;
+import com.yahoo.sketches.hll.Union;
+
+/**
+ * This is a Pig UDF that performs the Union operation on HllSketches.
+ * This class implements both the <i>Accumulator</i> and <i>Algebraic</i> interfaces.
+ *
+ * @author Alexander Saydakov
+ */
+public class UnionSketch extends EvalFunc<DataByteArray> implements Accumulator<DataByteArray>, Algebraic {
+
+ private DataByteArray emptySketch_; // this is to cash an empty sketch
+
+ private final int lgK_;
+ private final TgtHllType tgtHllType_;
+ private Union accumUnion_;
+ private boolean isFirstCall_;
+
+ /**
+ * Constructor with default lgK and target HLL type
+ */
+ public UnionSketch() {
+ this(DEFAULT_LG_K, DEFAULT_HLL_TYPE);
+ }
+
+ /**
+ * Constructor with given lgK as string and default target HLL type.
+ *
+ * @param lgK in a form of a String
+ */
+ public UnionSketch(final String lgK) {
+ this(Integer.parseInt(lgK), DEFAULT_HLL_TYPE);
+ }
+
+ /**
+ * Constructor with given lgK and target HLL type as strings
+ *
+ * @param lgK in a form of a String
+ * @param tgtHllType in a form of a String
+ */
+ public UnionSketch(final String lgK, final String tgtHllType) {
+ this(Integer.parseInt(lgK), TgtHllType.valueOf(tgtHllType));
+ }
+
+ /**
+ * Base constructor.
+ *
+ * @param lgK parameter controlling the sketch size and accuracy
+ * @param tgtHllType HLL type of the resulting sketch
+ */
+ public UnionSketch(final int lgK, final TgtHllType tgtHllType) {
+ super();
+ lgK_ = lgK;
+ tgtHllType_ = tgtHllType;
+ }
+
+ /**
+ * Top-level exec function.
+ * This method accepts an input Tuple containing a Bag of one or more inner <b>Sketch Tuples</b>
+ * and returns a single serialized HllSketch as a DataByteArray.
+ *
+ * <b>Sketch Tuple</b> is a Tuple containing a single DataByteArray (BYTEARRAY in Pig), which
+ * is a serialized HllSketch.
+ *
+ * @param inputTuple A tuple containing a single bag, containing Sketch Tuples.
+ * @return serialized HllSketch
+ * @see "org.apache.pig.EvalFunc.exec(org.apache.pig.data.Tuple)"
+ * @throws IOException from Pig.
+ */
+ @Override
+ public DataByteArray exec(final Tuple inputTuple) throws IOException {
+ if (isFirstCall_) {
+ Logger.getLogger(getClass()).info("Exec was used");
+ isFirstCall_ = false;
+ }
+ if (inputTuple == null || inputTuple.size() == 0) {
+ if (emptySketch_ == null) {
+ emptySketch_ = new DataByteArray(new HllSketch(lgK_, tgtHllType_).toCompactByteArray());
+ }
+ return emptySketch_;
+ }
+ final Union union = new Union(lgK_);
+ final DataBag bag = (DataBag) inputTuple.get(0);
+ updateUnion(bag, union);
+ return new DataByteArray(union.getResult(tgtHllType_).toCompactByteArray());
+ }
+
+ /**
+ * An <i>Accumulator</i> version of the standard <i>exec()</i> method. Like <i>exec()</i>,
+ * accumulator is called with a bag of Sketch Tuples. Unlike <i>exec()</i>, it doesn't serialize the
+ * result at the end. Instead, it can be called multiple times, each time with another bag of
+ * Sketch Tuples to be input to the union.
+ *
+ * @param inputTuple A tuple containing a single bag, containing Sketch Tuples.
+ * @see #exec
+ * @see "org.apache.pig.Accumulator.accumulate(org.apache.pig.data.Tuple)"
+ * @throws IOException by Pig
+ */
+ @Override
+ public void accumulate(final Tuple inputTuple) throws IOException {
+ if (isFirstCall_) {
+ Logger.getLogger(getClass()).info("Accumulator was used");
+ isFirstCall_ = false;
+ }
+ if (inputTuple == null || inputTuple.size() == 0) { return; }
+ final DataBag bag = (DataBag) inputTuple.get(0);
+ if (bag == null) { return; }
+ if (accumUnion_ == null) {
+ accumUnion_ = new Union(lgK_);
+ }
+ updateUnion(bag, accumUnion_);
+ }
+
+ /**
+ * Returns the sketch that has been built up by multiple calls to {@link #accumulate}.
+ *
+ * @return Sketch Tuple. (see {@link #exec} for return tuple format)
+ * @see "org.apache.pig.Accumulator.getValue()"
+ */
+ @Override
+ public DataByteArray getValue() {
+ if (accumUnion_ == null) {
+ if (emptySketch_ == null) {
+ emptySketch_ = new DataByteArray(
+ new HllSketch(lgK_, tgtHllType_).toCompactByteArray());
+ }
+ return emptySketch_;
+ }
+ return new DataByteArray(
+ accumUnion_.getResult(tgtHllType_).toCompactByteArray());
+ }
+
+ /**
+ * Cleans up the UDF state after being called using the {@link Accumulator} interface.
+ *
+ * @see "org.apache.pig.Accumulator.cleanup()"
+ */
+ @Override
+ public void cleanup() {
+ accumUnion_ = null;
+ }
+
+ @Override
+ public String getInitial() {
+ return AlgebraicInitial.class.getName();
+ }
+
+ @Override
+ public String getIntermed() {
+ return UnionSketchAlgebraicIntermediate.class.getName();
+ }
+
+ @Override
+ public String getFinal() {
+ return UnionSketchAlgebraicFinal.class.getName();
+ }
+
+ static void updateUnion(final DataBag bag, final Union union) throws ExecException {
+ // Bag is not empty. process each innerTuple in the bag
+ for (final Tuple innerTuple : bag) {
+ final Object f0 = innerTuple.get(0); // consider only field 0
+ if (f0 == null) {
+ continue;
+ }
+ final byte type = innerTuple.getType(0);
+ if (type == DataType.BYTEARRAY) {
+ final DataByteArray dba = (DataByteArray) f0;
+ union.update(HllSketch.heapify(Memory.wrap(dba.get())));
+ } else {
+ throw new IllegalArgumentException("Field type was not DataType.BYTEARRAY: " + type);
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/UnionSketchAlgebraicFinal.java b/src/main/java/com/yahoo/sketches/pig/hll/UnionSketchAlgebraicFinal.java
new file mode 100644
index 0000000..07d5681
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/UnionSketchAlgebraicFinal.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_HLL_TYPE;
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_LG_K;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+
+import com.yahoo.sketches.hll.TgtHllType;
+import com.yahoo.sketches.hll.Union;
+
+public class UnionSketchAlgebraicFinal extends AlgebraicFinal {
+
+ /**
+ * Default constructor for the final pass of an Algebraic function.
+ * Assumes default lgK and target HLL type.
+ */
+ public UnionSketchAlgebraicFinal() {
+ super(DEFAULT_LG_K, DEFAULT_HLL_TYPE);
+ }
+
+ /**
+ * Constructor for the final pass of an Algebraic function. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ * Assumes default HLL target type.
+ *
+ * @param lgK in a form of a String
+ */
+ public UnionSketchAlgebraicFinal(final String lgK) {
+ super(Integer.parseInt(lgK), DEFAULT_HLL_TYPE);
+ }
+
+ /**
+ * Constructor for the final pass of an Algebraic function. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ *
+ * @param lgK parameter controlling the sketch size and accuracy
+ * @param tgtHllType HLL type of the resulting sketch
+ */
+ public UnionSketchAlgebraicFinal(final String lgK, final String tgtHllType) {
+ super(Integer.parseInt(lgK), TgtHllType.valueOf(tgtHllType));
+ }
+
+ @Override
+ void updateUnion(final DataBag bag, final Union union) throws ExecException {
+ UnionSketch.updateUnion(bag, union);
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/UnionSketchAlgebraicIntermediate.java b/src/main/java/com/yahoo/sketches/pig/hll/UnionSketchAlgebraicIntermediate.java
new file mode 100644
index 0000000..77b6335
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/UnionSketchAlgebraicIntermediate.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_HLL_TYPE;
+import static com.yahoo.sketches.pig.hll.DataToSketch.DEFAULT_LG_K;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+
+import com.yahoo.sketches.hll.TgtHllType;
+import com.yahoo.sketches.hll.Union;
+
+public class UnionSketchAlgebraicIntermediate extends AlgebraicIntermediate {
+
+ /**
+ * Default constructor of the intermediate pass of an Algebraic function.
+ * Assumes default lgK and target HLL type.
+ */
+ public UnionSketchAlgebraicIntermediate() {
+ super(DEFAULT_LG_K, DEFAULT_HLL_TYPE);
+ }
+
+ /**
+ * Constructor for the intermediate pass of an Algebraic function. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ * Assumes default HLL target type.
+ *
+ * @param lgK in a form of a String
+ */
+ public UnionSketchAlgebraicIntermediate(final String lgK) {
+ super(Integer.parseInt(lgK), DEFAULT_HLL_TYPE);
+ }
+
+ /**
+ * Constructor for the intermediate pass of an Algebraic function. Pig will call
+ * this and pass the same constructor arguments as the base UDF.
+ *
+ * @param lgK parameter controlling the sketch size and accuracy
+ * @param tgtHllType HLL type of the resulting sketch
+ */
+ public UnionSketchAlgebraicIntermediate(final String lgK, final String tgtHllType) {
+ super(Integer.parseInt(lgK), TgtHllType.valueOf(tgtHllType));
+ }
+
+ @Override
+ void updateUnion(final DataBag bag, final Union union) throws ExecException {
+ UnionSketch.updateUnion(bag, union);
+ }
+
+}
diff --git a/src/main/java/com/yahoo/sketches/pig/hll/package-info.java b/src/main/java/com/yahoo/sketches/pig/hll/package-info.java
new file mode 100644
index 0000000..f5f2f21
--- /dev/null
+++ b/src/main/java/com/yahoo/sketches/pig/hll/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2017, Yahoo! Inc. Licensed under the terms of the Apache License 2.0. See LICENSE file
+ * at the project root for terms.
+ */
+/**
+ * Pig UDFs for HLL sketches.
+ *
+ * These UDFs can be used as a replacement of corresponding Theta sketch UDFs.
+ * Notice that intersections and A-not-B operations are not supported by the HLL sketch.
+ * Also notice a small difference in the output type of DataToSketch and UnionSketch:
+ * HLL sketch UDFs return DataByteArray (BYTEARRAY in Pig), but corresponding Theta sketch
+ * UDFs return a Tuple with single DataByteArray inside. This was a historical accident,
+ * and we are reluctant to break the compatibility with existing scripts. HLL sketch UDFs
+ * don't have to keep this compatibility. As a result, HLL sketch UDFs don't need
+ * flatten() around them to remove the Tuple, and internally they don't have to spend extra
+ * resources to wrap every output DataByteArray into a Tuple.
+ *
+ * @author Alexander Saydakov
+ */
+package com.yahoo.sketches.pig.hll;
diff --git a/src/test/java/com/yahoo/sketches/pig/hll/DataToSketchTest.java b/src/test/java/com/yahoo/sketches/pig/hll/DataToSketchTest.java
new file mode 100644
index 0000000..2bf393d
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/hll/DataToSketchTest.java
@@ -0,0 +1,286 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.hll.HllSketch;
+import com.yahoo.sketches.hll.TgtHllType;
+
+public class DataToSketchTest {
+
+ private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+ private static final BagFactory bagFactory = BagFactory.getInstance();
+
+ @Test
+ public void execNullInputTuple() throws Exception {
+ EvalFunc<DataByteArray> func = new DataToSketch();
+ DataByteArray result = func.exec(null);
+ HllSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void execEmptyInputTuple() throws Exception {
+ EvalFunc<DataByteArray> func = new DataToSketch("10");
+ DataByteArray result = func.exec(tupleFactory.newTuple());
+ HllSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ Assert.assertEquals(sketch.getLgConfigK(), 10);
+ }
+
+ @Test
+ public void execEmptyBag() throws Exception {
+ EvalFunc<DataByteArray> func = new DataToSketch("10", "HLL_6");
+ DataByteArray result = func.exec(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+ HllSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ Assert.assertEquals(sketch.getLgConfigK(), 10);
+ Assert.assertEquals(sketch.getTgtHllType(), TgtHllType.HLL_6);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void execUnsupportedType() throws Exception {
+ EvalFunc<DataByteArray> func = new DataToSketch();
+ DataBag bag = bagFactory.newDefaultBag();
+ bag.add(tupleFactory.newTuple(new Object()));
+ func.exec(tupleFactory.newTuple(bag));
+ }
+
+ @Test
+ public void execVariousTypesOfInput() throws Exception {
+ EvalFunc<DataByteArray> func = new DataToSketch();
+ DataBag bag = bagFactory.newDefaultBag();
+ Tuple tupleWithNull = tupleFactory.newTuple(1);
+ tupleWithNull.set(0, null);
+ bag.add(tupleWithNull);
+ bag.add(tupleFactory.newTuple(new Byte((byte) 1)));
+ bag.add(tupleFactory.newTuple(new Integer(2)));
+ bag.add(tupleFactory.newTuple(new Long(3)));
+ bag.add(tupleFactory.newTuple(new Float(1)));
+ bag.add(tupleFactory.newTuple(new Double(2)));
+ bag.add(tupleFactory.newTuple(new DataByteArray(new byte[] {(byte) 1})));
+ bag.add(tupleFactory.newTuple("a"));
+ DataByteArray result = func.exec(tupleFactory.newTuple(bag));
+ HllSketch sketch = getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 7.0, 0.01);
+ }
+
+ @Test
+ public void accumulator() throws Exception {
+ Accumulator<DataByteArray> func = new DataToSketch();
+
+ // no input yet
+ DataByteArray result = func.getValue();
+ HllSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // null input tuple
+ func.accumulate(null);
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // empty input tuple
+ func.accumulate(tupleFactory.newTuple());
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // empty bag
+ func.accumulate(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // normal case
+ DataBag bag = bagFactory.newDefaultBag();
+ bag.add(tupleFactory.newTuple("a"));
+ bag.add(tupleFactory.newTuple("b"));
+ func.accumulate(tupleFactory.newTuple(bag));
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+
+ // cleanup
+ func.cleanup();
+ result = func.getValue();
+ sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicInitial() throws Exception {
+ EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new DataToSketch().getInitial()).newInstance();
+ Tuple input = tupleFactory.newTuple();
+ Tuple result = func.exec(input);
+ Assert.assertEquals(result, input);
+ }
+
+ @Test
+ public void algebraicInitialWithLgK() throws Exception {
+ EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new DataToSketch().getInitial())
+ .getConstructor(String.class).newInstance("10");
+ Tuple input = tupleFactory.newTuple();
+ Tuple result = func.exec(input);
+ Assert.assertEquals(result, input);
+ }
+
+ @Test
+ public void algebraicInitialWithLgKAndTgtHllType() throws Exception {
+ EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new DataToSketch().getInitial())
+ .getConstructor(String.class, String.class).newInstance("10", "HLL_6");
+ Tuple input = tupleFactory.newTuple();
+ Tuple result = func.exec(input);
+ Assert.assertEquals(result, input);
+ }
+
+ @Test
+ public void algebraicIntermediateNullInputTuple() throws Exception {
+ EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+ Tuple result = func.exec(null);
+ HllSketch sketch = getSketch((DataByteArray) result.get(0));
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicIntermediateEmptyInputTuple() throws Exception {
+ EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed())
+ .getConstructor(String.class).newInstance("10");
+ Tuple result = func.exec(tupleFactory.newTuple());
+ HllSketch sketch = getSketch((DataByteArray) result.get(0));
+ Assert.assertTrue(sketch.isEmpty());
+ Assert.assertEquals(sketch.getLgConfigK(), 10);
+ }
+
+ @Test
+ public void algebraicIntermediateEmptyBag() throws Exception {
+ EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed())
+ .getConstructor(String.class, String.class).newInstance("10", "HLL_6");
+ Tuple result = func.exec(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+ HllSketch sketch = getSketch((DataByteArray) result.get(0));
+ Assert.assertTrue(sketch.isEmpty());
+ Assert.assertEquals(sketch.getLgConfigK(), 10);
+ Assert.assertEquals(sketch.getTgtHllType(), TgtHllType.HLL_6);
+ }
+
+ @Test
+ public void algebraicIntermediateFromInitial() throws Exception {
+ EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+ DataBag outerBag = bagFactory.newDefaultBag();
+ DataBag innerBag = bagFactory.newDefaultBag();
+ innerBag.add(tupleFactory.newTuple("a"));
+ innerBag.add(tupleFactory.newTuple("b"));
+ innerBag.add(tupleFactory.newTuple("c"));
+ outerBag.add(tupleFactory.newTuple(innerBag));
+ Tuple result = func.exec(tupleFactory.newTuple(outerBag));
+ HllSketch sketch = getSketch((DataByteArray) result.get(0));
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 3.0, 0.01);
+ }
+
+ @Test
+ public void algebraicIntermediateFromIntermediate() throws Exception {
+ EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new DataToSketch().getIntermed()).newInstance();
+ HllSketch inputSketch = new HllSketch(12);
+ inputSketch.update("a");
+ inputSketch.update("b");
+ DataBag bag = bagFactory.newDefaultBag();
+ bag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toCompactByteArray())));
+ Tuple result = func.exec(tupleFactory.newTuple(bag));
+ HllSketch sketch = getSketch((DataByteArray) result.get(0));
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+ }
+
+ @Test
+ public void algebraicFinalNullInputTuple() throws Exception {
+ EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+ DataByteArray result = func.exec(null);
+ HllSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicFinalEmptyInputTuple() throws Exception {
+ EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal())
+ .getConstructor(String.class).newInstance("10");
+ DataByteArray result = func.exec(tupleFactory.newTuple());
+ HllSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ Assert.assertEquals(sketch.getLgConfigK(), 10);
+ }
+
+ @Test
+ public void algebraicFinalEmptyBag() throws Exception {
+ EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal())
+ .getConstructor(String.class, String.class).newInstance("10", "HLL_6");
+ DataByteArray result = func.exec(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+ HllSketch sketch = getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ Assert.assertEquals(sketch.getLgConfigK(), 10);
+ Assert.assertEquals(sketch.getTgtHllType(), TgtHllType.HLL_6);
+ }
+
+ @Test
+ public void algebraicFinalFromInitial() throws Exception {
+ EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+ DataBag outerBag = bagFactory.newDefaultBag();
+ DataBag innerBag = bagFactory.newDefaultBag();
+ innerBag.add(tupleFactory.newTuple("a"));
+ innerBag.add(tupleFactory.newTuple("b"));
+ innerBag.add(tupleFactory.newTuple("c"));
+ outerBag.add(tupleFactory.newTuple(innerBag));
+ DataByteArray result = func.exec(tupleFactory.newTuple(outerBag));
+ HllSketch sketch = getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 3.0, 0.01);
+ }
+
+ @Test
+ public void algebraicFinalFromIntermediate() throws Exception {
+ EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new DataToSketch().getFinal()).newInstance();
+ HllSketch inputSketch = new HllSketch(12);
+ inputSketch.update("a");
+ inputSketch.update("b");
+ DataBag bag = bagFactory.newDefaultBag();
+ bag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toCompactByteArray())));
+ DataByteArray result = func.exec(tupleFactory.newTuple(bag));
+ HllSketch sketch = getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+ }
+
+ static HllSketch getSketch(DataByteArray dba) throws Exception {
+ Assert.assertNotNull(dba);
+ Assert.assertTrue(dba.size() > 0);
+ return HllSketch.heapify(dba.get());
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/hll/SketchToEstimateAndErrorBoundsTest.java b/src/test/java/com/yahoo/sketches/pig/hll/SketchToEstimateAndErrorBoundsTest.java
new file mode 100644
index 0000000..a59f6bb
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/hll/SketchToEstimateAndErrorBoundsTest.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.hll.HllSketch;
+
+public class SketchToEstimateAndErrorBoundsTest {
+
+ private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+
+ @Test
+ public void nullInputTuple() throws Exception {
+ EvalFunc<Tuple> func = new SketchToEstimateAndErrorBounds();
+ Tuple result = func.exec(null);
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void emptyInputTuple() throws Exception {
+ EvalFunc<Tuple> func = new SketchToEstimateAndErrorBounds();
+ Tuple result = func.exec(tupleFactory.newTuple());
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void normalCase() throws Exception {
+ EvalFunc<Tuple> func = new SketchToEstimateAndErrorBounds();
+ HllSketch sketch = new HllSketch(12);
+ sketch.update(1);
+ sketch.update(2);
+ Tuple result = func.exec(tupleFactory.newTuple(new DataByteArray(sketch.toCompactByteArray())));
+ Assert.assertNotNull(result);
+ Assert.assertEquals((Double) result.get(0), 2.0, 0.01);
+ Assert.assertTrue((Double) result.get(1) <= 2.0);
+ Assert.assertTrue((Double) result.get(2) >= 2.0);
+ }
+
+ @Test
+ public void schema() throws Exception {
+ EvalFunc<Tuple> func = new SketchToEstimateAndErrorBounds();
+ Schema inputSchema = new Schema(new Schema.FieldSchema("Sketch", DataType.BYTEARRAY));
+ Schema outputSchema = func.outputSchema(inputSchema);
+ Assert.assertNotNull(outputSchema);
+ Assert.assertEquals(outputSchema.size(), 1);
+ Assert.assertEquals(DataType.findTypeName(outputSchema.getField(0).type), "tuple");
+ Schema innerSchema = outputSchema.getField(0).schema;
+ Assert.assertEquals(innerSchema.size(), 3);
+ Assert.assertEquals(DataType.findTypeName(innerSchema.getField(0).type), "double");
+ Assert.assertEquals(DataType.findTypeName(innerSchema.getField(1).type), "double");
+ Assert.assertEquals(DataType.findTypeName(innerSchema.getField(2).type), "double");
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/hll/SketchToEstimateTest.java b/src/test/java/com/yahoo/sketches/pig/hll/SketchToEstimateTest.java
new file mode 100644
index 0000000..7d5a949
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/hll/SketchToEstimateTest.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.TupleFactory;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.hll.HllSketch;
+
+import junit.framework.Assert;
+
+public class SketchToEstimateTest {
+
+ private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+
+ @Test
+ public void nullInputTuple() throws Exception {
+ EvalFunc<Double> func = new SketchToEstimate();
+ Double result = func.exec(null);
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void emptyInputTuple() throws Exception {
+ EvalFunc<Double> func = new SketchToEstimate();
+ Double result = func.exec(tupleFactory.newTuple());
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void normalCase() throws Exception {
+ EvalFunc<Double> func = new SketchToEstimate();
+ HllSketch sketch = new HllSketch(12);
+ sketch.update(1);
+ sketch.update(2);
+ Double result = func.exec(tupleFactory.newTuple(new DataByteArray(sketch.toCompactByteArray())));
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result, 2.0, 0.01);
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/hll/SketchToStringTest.java b/src/test/java/com/yahoo/sketches/pig/hll/SketchToStringTest.java
new file mode 100644
index 0000000..f6185e8
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/hll/SketchToStringTest.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.TupleFactory;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.hll.HllSketch;
+
+import junit.framework.Assert;
+
+public class SketchToStringTest {
+
+ private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+
+ @Test
+ public void nullInputTuple() throws Exception {
+ EvalFunc<String> func = new SketchToString();
+ String result = func.exec(null);
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void emptyInputTuple() throws Exception {
+ EvalFunc<String> func = new SketchToString();
+ String result = func.exec(tupleFactory.newTuple());
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void normalCase() throws Exception {
+ EvalFunc<String> func = new SketchToString();
+ HllSketch sketch = new HllSketch(12);
+ String result = func.exec(tupleFactory.newTuple(new DataByteArray(sketch.toCompactByteArray())));
+ Assert.assertNotNull(result);
+ Assert.assertTrue(result.length() > 0);
+ }
+
+}
diff --git a/src/test/java/com/yahoo/sketches/pig/hll/UnionSketchTest.java b/src/test/java/com/yahoo/sketches/pig/hll/UnionSketchTest.java
new file mode 100644
index 0000000..74dd855
--- /dev/null
+++ b/src/test/java/com/yahoo/sketches/pig/hll/UnionSketchTest.java
@@ -0,0 +1,234 @@
+/*
+ * Copyright 2017, Yahoo! Inc.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+package com.yahoo.sketches.pig.hll;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.yahoo.sketches.hll.HllSketch;
+import com.yahoo.sketches.hll.TgtHllType;
+
+public class UnionSketchTest {
+
+ private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+ private static final BagFactory bagFactory = BagFactory.getInstance();
+
+ @Test
+ public void execNullInputTuple() throws Exception {
+ EvalFunc<DataByteArray> func = new UnionSketch();
+ DataByteArray result = func.exec(null);
+ HllSketch sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void execEmptyInputTuple() throws Exception {
+ EvalFunc<DataByteArray> func = new UnionSketch("10");
+ DataByteArray result = func.exec(tupleFactory.newTuple());
+ HllSketch sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ Assert.assertEquals(sketch.getLgConfigK(), 10);
+ }
+
+ @Test
+ public void execEmptyBag() throws Exception {
+ EvalFunc<DataByteArray> func = new UnionSketch("10", "HLL_6");
+ DataByteArray result = func.exec(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+ HllSketch sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ Assert.assertEquals(sketch.getLgConfigK(), 10);
+ Assert.assertEquals(sketch.getTgtHllType(), TgtHllType.HLL_6);
+ }
+
+ @Test
+ public void execNormalCase() throws Exception {
+ EvalFunc<DataByteArray> func = new UnionSketch();
+ HllSketch inputSketch = new HllSketch(12);
+ inputSketch.update(1);
+ inputSketch.update(2);
+ DataBag bag = bagFactory.newDefaultBag();
+ bag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toCompactByteArray())));
+ DataByteArray result = func.exec(tupleFactory.newTuple(bag));
+ HllSketch sketch = DataToSketchTest.getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+ }
+
+ @Test
+ public void accumulator() throws Exception {
+ Accumulator<DataByteArray> func = new UnionSketch();
+
+ // no input yet
+ DataByteArray result = func.getValue();
+ HllSketch sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // null input tuple
+ func.accumulate(null);
+ result = func.getValue();
+ sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // empty input tuple
+ func.accumulate(tupleFactory.newTuple());
+ result = func.getValue();
+ sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // empty bag
+ func.accumulate(tupleFactory.newTuple(bagFactory.newDefaultBag()));
+ result = func.getValue();
+ sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+
+ // normal case
+ HllSketch inputSketch = new HllSketch(12);
+ inputSketch.update(1);
+ inputSketch.update(2);
+ DataBag bag = bagFactory.newDefaultBag();
+ bag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toCompactByteArray())));
+ func.accumulate(tupleFactory.newTuple(bag));
+ result = func.getValue();
+ sketch = DataToSketchTest.getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+
+ // cleanup
+ func.cleanup();
+ result = func.getValue();
+ sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicInitial() throws Exception {
+ EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new UnionSketch().getInitial()).newInstance();
+ Tuple input = tupleFactory.newTuple();
+ Tuple result = func.exec(input);
+ Assert.assertEquals(result, input);
+ }
+
+ @Test
+ public void algebraicIntermediateNullInputTuple() throws Exception {
+ EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed()).newInstance();
+ Tuple result = func.exec(null);
+ HllSketch sketch = DataToSketchTest.getSketch((DataByteArray) result.get(0));
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicIntermediateEmptyInputTuple() throws Exception {
+ EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed())
+ .getConstructor(String.class).newInstance("10");
+ Tuple result = func.exec(tupleFactory.newTuple());
+ HllSketch sketch = DataToSketchTest.getSketch((DataByteArray) result.get(0));
+ Assert.assertTrue(sketch.isEmpty());
+ Assert.assertEquals(sketch.getLgConfigK(), 10);
+ }
+
+ @Test
+ public void algebraicIntermediateFromInitial() throws Exception {
+ EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed())
+ .getConstructor(String.class, String.class).newInstance("10", "HLL_6");
+ HllSketch inputSketch = new HllSketch(12);
+ inputSketch.update(1);
+ inputSketch.update(2);
+ inputSketch.update(3);
+ DataBag outerBag = bagFactory.newDefaultBag();
+ DataBag innerBag = bagFactory.newDefaultBag();
+ innerBag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toCompactByteArray())));
+ outerBag.add(tupleFactory.newTuple(innerBag));
+ Tuple result = func.exec(tupleFactory.newTuple(outerBag));
+ HllSketch sketch = DataToSketchTest.getSketch((DataByteArray) result.get(0));
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 3.0, 0.01);
+ Assert.assertEquals(sketch.getLgConfigK(), 10);
+ Assert.assertEquals(sketch.getTgtHllType(), TgtHllType.HLL_6);
+ }
+
+ @Test
+ public void algebraicIntermediateFromIntermediate() throws Exception {
+ EvalFunc<Tuple> func =
+ (EvalFunc<Tuple>) Class.forName(new UnionSketch().getIntermed()).newInstance();
+ HllSketch inputSketch = new HllSketch(12);
+ inputSketch.update("a");
+ inputSketch.update("b");
+ DataBag bag = bagFactory.newDefaultBag();
+ bag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toCompactByteArray())));
+ Tuple result = func.exec(tupleFactory.newTuple(bag));
+ HllSketch sketch = DataToSketchTest.getSketch((DataByteArray) result.get(0));
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+ }
+
+ @Test
+ public void algebraicFinalNullInputTuple() throws Exception {
+ EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal()).newInstance();
+ DataByteArray result = func.exec(null);
+ HllSketch sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ }
+
+ @Test
+ public void algebraicFinalEmptyInputTuple() throws Exception {
+ EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal())
+ .getConstructor(String.class).newInstance("10");
+ DataByteArray result = func.exec(tupleFactory.newTuple());
+ HllSketch sketch = DataToSketchTest.getSketch(result);
+ Assert.assertTrue(sketch.isEmpty());
+ Assert.assertEquals(sketch.getLgConfigK(), 10);
+ }
+
+ @Test
+ public void algebraicFinalFromInitial() throws Exception {
+ EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal())
+ .getConstructor(String.class, String.class).newInstance("10", "HLL_6");
+ HllSketch inputSketch = new HllSketch(12);
+ inputSketch.update(1);
+ inputSketch.update(2);
+ inputSketch.update(3);
+ DataBag outerBag = bagFactory.newDefaultBag();
+ DataBag innerBag = bagFactory.newDefaultBag();
+ innerBag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toCompactByteArray())));
+ outerBag.add(tupleFactory.newTuple(innerBag));
+ DataByteArray result = func.exec(tupleFactory.newTuple(outerBag));
+ HllSketch sketch = DataToSketchTest.getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 3.0, 0.01);
+ Assert.assertEquals(sketch.getLgConfigK(), 10);
+ Assert.assertEquals(sketch.getTgtHllType(), TgtHllType.HLL_6);
+ }
+
+ @Test
+ public void algebraicFinalFromIntermediate() throws Exception {
+ EvalFunc<DataByteArray> func =
+ (EvalFunc<DataByteArray>) Class.forName(new UnionSketch().getFinal()).newInstance();
+ HllSketch inputSketch = new HllSketch(12);
+ inputSketch.update("a");
+ inputSketch.update("b");
+ DataBag bag = bagFactory.newDefaultBag();
+ bag.add(tupleFactory.newTuple(new DataByteArray(inputSketch.toCompactByteArray())));
+ DataByteArray result = func.exec(tupleFactory.newTuple(bag));
+ HllSketch sketch = DataToSketchTest.getSketch(result);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getEstimate(), 2.0, 0.01);
+ }
+
+}