Merge pull request #50 from juen1jp/removeNulls
Fixing the Quantiles UDFs to ignore null values.
diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/DataToDoublesSketch.java b/src/main/java/com/yahoo/sketches/pig/quantiles/DataToDoublesSketch.java
index fd5d32b..84ca5d9 100644
--- a/src/main/java/com/yahoo/sketches/pig/quantiles/DataToDoublesSketch.java
+++ b/src/main/java/com/yahoo/sketches/pig/quantiles/DataToDoublesSketch.java
@@ -128,7 +128,10 @@
final DoublesUnion union = unionBuilder_.build();
final DataBag bag = (DataBag) inputTuple.get(0);
for (final Tuple innerTuple: bag) {
- union.update((Double) innerTuple.get(0));
+ Object value = innerTuple.get(0);
+ if(value != null) {
+ union.update((Double) value);
+ }
}
final DoublesSketch resultSketch = union.getResultAndReset();
if (resultSketch != null) {
@@ -176,7 +179,10 @@
accumUnion_ = unionBuilder_.build();
}
for (final Tuple innerTuple: bag) {
- accumUnion_.update((Double) innerTuple.get(0));
+ Object value = innerTuple.get(0);
+ if(value != null) {
+ accumUnion_.update((Double) value);
+ }
}
}
@@ -317,7 +323,10 @@
// It is due to system bagged outputs from multiple mapper Initial functions.
// The Intermediate stage was bypassed.
for (final Tuple innerTuple: innerBag) {
- union.update((Double) innerTuple.get(0));
+ Object value = innerTuple.get(0);
+ if(value != null) {
+ union.update((Double) value);
+ }
}
} else if (f0 instanceof DataByteArray) { // inputTuple.bag0.dataTupleN.f0:DBA
// If field 0 of a dataTuple is a DataByteArray we assume it is a sketch
diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/DataToItemsSketch.java b/src/main/java/com/yahoo/sketches/pig/quantiles/DataToItemsSketch.java
index c00fb5b..c1d28bb 100644
--- a/src/main/java/com/yahoo/sketches/pig/quantiles/DataToItemsSketch.java
+++ b/src/main/java/com/yahoo/sketches/pig/quantiles/DataToItemsSketch.java
@@ -122,7 +122,10 @@
: ItemsUnion.getInstance(comparator_);
final DataBag bag = (DataBag) inputTuple.get(0);
for (final Tuple innerTuple: bag) {
- union.update(extractValue(innerTuple.get(0)));
+ Object value = innerTuple.get(0);
+ if(value != null) {
+ union.update(extractValue(value));
+ }
}
final ItemsSketch<T> resultSketch = union.getResultAndReset();
if (resultSketch != null) {
@@ -173,7 +176,10 @@
: ItemsUnion.getInstance(comparator_);
}
for (final Tuple innerTuple: bag) {
- accumUnion_.update(extractValue(innerTuple.get(0)));
+ Object value = innerTuple.get(0);
+ if(value != null) {
+ accumUnion_.update(extractValue(value));
+ }
}
}
@@ -307,7 +313,10 @@
// It is due to system bagged outputs from multiple mapper Initial functions.
// The Intermediate stage was bypassed.
for (final Tuple innerTuple: innerBag) {
- union.update(extractValue(innerTuple.get(0)));
+ Object value = innerTuple.get(0);
+ if(value != null) {
+ union.update(extractValue(value));
+ }
}
} else if (f0 instanceof DataByteArray) { // inputTuple.bag0.dataTupleN.f0:DBA
// If field 0 of a dataTuple is a DataByteArray we assume it is a sketch
diff --git a/src/test/java/com/yahoo/sketches/pig/quantiles/DataToDoublesSketchTest.java b/src/test/java/com/yahoo/sketches/pig/quantiles/DataToDoublesSketchTest.java
index 9064dda..f08d4a2 100644
--- a/src/test/java/com/yahoo/sketches/pig/quantiles/DataToDoublesSketchTest.java
+++ b/src/test/java/com/yahoo/sketches/pig/quantiles/DataToDoublesSketchTest.java
@@ -66,6 +66,18 @@
Assert.assertFalse(sketch.isEmpty());
Assert.assertEquals(sketch.getN(), 1);
}
+
+ @Test
+ public void execMixedNullCase() throws Exception {
+ EvalFunc<Tuple> func = new DataToDoublesSketch();
+ DataBag bag = bagFactory.newDefaultBag();
+ bag.add(tupleFactory.newTuple(1.0));
+ bag.add(null);
+ Tuple resultTuple = func.exec(tupleFactory.newTuple(bag));
+ DoublesSketch sketch = getSketch(resultTuple);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 1);
+ }
@Test
public void accumulator() throws Exception {
@@ -103,6 +115,17 @@
sketch = getSketch(resultTuple);
Assert.assertFalse(sketch.isEmpty());
Assert.assertEquals(sketch.getN(), 2);
+
+ // mixed null case
+ bag = bagFactory.newDefaultBag();
+ bag.add(tupleFactory.newTuple(1.0));
+ bag.add(null);
+ func.accumulate(tupleFactory.newTuple(bag));
+ func.accumulate(tupleFactory.newTuple(bag));
+ resultTuple = func.getValue();
+ sketch = getSketch(resultTuple);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 2);
// cleanup
func.cleanup();
@@ -161,6 +184,30 @@
Assert.assertFalse(sketch.isEmpty());
Assert.assertEquals(sketch.getN(), 2);
}
+
+ @Test
+ public void algebraicIntermediateFinalMixedNullCase() throws Exception {
+ EvalFunc<Tuple> func = new DataToDoublesSketch.IntermediateFinal();
+ DataBag bag = bagFactory.newDefaultBag();
+
+ { // this is to simulate an output from Initial
+ DataBag innerBag = bagFactory.newDefaultBag();
+ innerBag.add(tupleFactory.newTuple(1.0));
+ innerBag.add(null);
+ bag.add(tupleFactory.newTuple(innerBag));
+ }
+
+ { // this is to simulate an output from a prior call of IntermediateFinal
+ UpdateDoublesSketch qs = DoublesSketch.builder().build();
+ qs.update(2.0);
+ bag.add(tupleFactory.newTuple(new DataByteArray(qs.toByteArray())));
+ }
+
+ Tuple resultTuple = func.exec(tupleFactory.newTuple(bag));
+ DoublesSketch sketch = getSketch(resultTuple);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 2);
+ }
@Test(expectedExceptions = IllegalArgumentException.class)
public void algebraicIntermediateFinalWrongType() throws Exception {
diff --git a/src/test/java/com/yahoo/sketches/pig/quantiles/DataToStringsSketchTest.java b/src/test/java/com/yahoo/sketches/pig/quantiles/DataToStringsSketchTest.java
index a38d799..1f986c5 100644
--- a/src/test/java/com/yahoo/sketches/pig/quantiles/DataToStringsSketchTest.java
+++ b/src/test/java/com/yahoo/sketches/pig/quantiles/DataToStringsSketchTest.java
@@ -73,6 +73,18 @@
Assert.assertFalse(sketch.isEmpty());
Assert.assertEquals(sketch.getN(), 1);
}
+
+ @Test
+ public void execMixedNormalCase() throws Exception {
+ EvalFunc<Tuple> func = new DataToStringsSketch();
+ DataBag bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple("a"));
+ bag.add(null);
+ Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(bag));
+ ItemsSketch<String> sketch = getSketch(resultTuple);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 1);
+ }
@Test
public void accumulator() throws Exception {
@@ -110,6 +122,17 @@
sketch = getSketch(resultTuple);
Assert.assertFalse(sketch.isEmpty());
Assert.assertEquals(sketch.getN(), 2);
+
+ // mixed null case
+ bag = BAG_FACTORY.newDefaultBag();
+ bag.add(TUPLE_FACTORY.newTuple("a"));
+ bag.add(null);
+ func.accumulate(TUPLE_FACTORY.newTuple(bag));
+ func.accumulate(TUPLE_FACTORY.newTuple(bag));
+ resultTuple = func.getValue();
+ sketch = getSketch(resultTuple);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 2);
// cleanup
func.cleanup();
@@ -177,6 +200,31 @@
Assert.assertFalse(sketch.isEmpty());
Assert.assertEquals(sketch.getN(), 2);
}
+
+ @Test
+ public void algebraicIntermediateFinalMixedNullCase() throws Exception {
+ @SuppressWarnings("unchecked")
+ EvalFunc<Tuple> func = (EvalFunc<Tuple>) Class.forName(new DataToStringsSketch().getIntermed()).newInstance();
+ DataBag bag = BAG_FACTORY.newDefaultBag();
+
+ { // this is to simulate an output from Initial
+ DataBag innerBag = BAG_FACTORY.newDefaultBag();
+ innerBag.add(TUPLE_FACTORY.newTuple("a"));
+ innerBag.add(null);
+ bag.add(TUPLE_FACTORY.newTuple(innerBag));
+ }
+
+ { // this is to simulate an output from a prior call of IntermediateFinal
+ ItemsSketch<String> qs = ItemsSketch.getInstance(COMPARATOR);
+ qs.update("b");
+ bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(qs.toByteArray(SER_DE))));
+ }
+
+ Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(bag));
+ ItemsSketch<String> sketch = getSketch(resultTuple);
+ Assert.assertFalse(sketch.isEmpty());
+ Assert.assertEquals(sketch.getN(), 2);
+ }
@Test(expectedExceptions = IllegalArgumentException.class)
public void algebraicIntermediateFinalWrongType() throws Exception {