adapt to the new reality that getResultAndReset() in Union can return
null
diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/DataToSketch.java b/src/main/java/com/yahoo/sketches/pig/quantiles/DataToSketch.java
index 626e3f7..f8e04b3 100644
--- a/src/main/java/com/yahoo/sketches/pig/quantiles/DataToSketch.java
+++ b/src/main/java/com/yahoo/sketches/pig/quantiles/DataToSketch.java
@@ -18,6 +18,7 @@
import org.apache.pig.impl.logicalLayer.schema.Schema;
import com.yahoo.sketches.memory.NativeMemory;
+import com.yahoo.sketches.quantiles.QuantilesSketch;
import com.yahoo.sketches.quantiles.Union;
import com.yahoo.sketches.quantiles.UnionBuilder;
@@ -125,12 +126,15 @@
@Override // TOP LEVEL EXEC
public Tuple exec(final Tuple inputTuple) throws IOException {
//The exec is a stateless function. It operates on the input and returns a result.
- final Union union = unionBuilder_.build();
if (inputTuple != null && inputTuple.size() > 0) {
+ final Union union = unionBuilder_.build();
final DataBag bag = (DataBag) inputTuple.get(0);
for (final Tuple innerTuple: bag) union.update((Double) innerTuple.get(0));
+ final QuantilesSketch resultSketch = union.getResultAndReset();
+ if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));
}
- return tupleFactory_.newTuple(new DataByteArray(union.getResultAndReset().toByteArray()));
+ // return empty sketch
+ return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));
}
@Override
@@ -176,8 +180,12 @@
*/
@Override
public Tuple getValue() {
- final Union union = accumUnion_ == null ? unionBuilder_.build() : accumUnion_;
- return tupleFactory_.newTuple(new DataByteArray(union.getResultAndReset().toByteArray()));
+ if (accumUnion_ != null) {
+ final QuantilesSketch resultSketch = accumUnion_.getResultAndReset();
+ if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));
+ }
+ // return empty sketch
+ return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));
}
/**
@@ -283,8 +291,8 @@
@Override // IntermediateFinal exec
public Tuple exec(final Tuple inputTuple) throws IOException { //throws is in API
- final Union union = unionBuilder_.build();
- if (inputTuple != null && inputTuple.size() != 0) {
+ if (inputTuple != null && inputTuple.size() > 0) {
+ final Union union = unionBuilder_.build();
final DataBag outerBag = (DataBag) inputTuple.get(0);
for (final Tuple dataTuple: outerBag) {
final Object f0 = dataTuple.get(0);
@@ -308,8 +316,11 @@
+ f0.getClass().getName());
}
}
+ final QuantilesSketch resultSketch = union.getResultAndReset();
+ if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));
}
- return tupleFactory_.newTuple(new DataByteArray(union.getResultAndReset().toByteArray()));
+ // return empty sketch
+ return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));
}
} // end IntermediateFinal
diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/Merge.java b/src/main/java/com/yahoo/sketches/pig/quantiles/Merge.java
index 31fba6a..fbd18ea 100644
--- a/src/main/java/com/yahoo/sketches/pig/quantiles/Merge.java
+++ b/src/main/java/com/yahoo/sketches/pig/quantiles/Merge.java
@@ -19,6 +19,7 @@
import org.apache.pig.impl.logicalLayer.schema.Schema;
import com.yahoo.sketches.memory.NativeMemory;
+import com.yahoo.sketches.quantiles.QuantilesSketch;
import com.yahoo.sketches.quantiles.Union;
import com.yahoo.sketches.quantiles.UnionBuilder;
@@ -116,12 +117,15 @@
@Override // TOP LEVEL EXEC
public Tuple exec(final Tuple inputTuple) throws IOException {
//The exec is a stateless function. It operates on the input and returns a result.
- final Union union = unionBuilder_.build();
if (inputTuple != null && inputTuple.size() > 0) {
+ final Union union = unionBuilder_.build();
final DataBag bag = (DataBag) inputTuple.get(0);
updateUnion(bag, union);
+ final QuantilesSketch resultSketch = union.getResultAndReset();
+ if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));
}
- return tupleFactory_.newTuple(new DataByteArray(union.getResultAndReset().toByteArray()));
+ // return empty sketch
+ return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));
}
@Override
@@ -167,8 +171,12 @@
*/
@Override
public Tuple getValue() {
- final Union union = accumUnion_ == null ? unionBuilder_.build() : accumUnion_;
- return tupleFactory_.newTuple(new DataByteArray(union.getResultAndReset().toByteArray()));
+ if (accumUnion_ != null) {
+ final QuantilesSketch resultSketch = accumUnion_.getResultAndReset();
+ if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));
+ }
+ // return empty sketch
+ return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));
}
/**
@@ -297,8 +305,8 @@
@Override // IntermediateFinal exec
public Tuple exec(final Tuple inputTuple) throws IOException {
- final Union union = unionBuilder_.build();
- if (inputTuple != null && inputTuple.size() != 0) {
+ if (inputTuple != null && inputTuple.size() > 0) {
+ final Union union = unionBuilder_.build();
final DataBag outerBag = (DataBag) inputTuple.get(0);
for (final Tuple dataTuple: outerBag) {
final Object f0 = dataTuple.get(0);
@@ -322,8 +330,11 @@
+ f0.getClass().getName());
}
}
+ final QuantilesSketch resultSketch = union.getResultAndReset();
+ if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));
}
- return tupleFactory_.newTuple(new DataByteArray(union.getResultAndReset().toByteArray()));
+ // return empty sketch
+ return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));
}
} // end IntermediateFinal