Merge pull request #50 from juen1jp/removeNulls

Fixing the Quantiles UDFs to ignore null values.
diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/DataToDoublesSketch.java b/src/main/java/com/yahoo/sketches/pig/quantiles/DataToDoublesSketch.java
index fd5d32b..84ca5d9 100644
--- a/src/main/java/com/yahoo/sketches/pig/quantiles/DataToDoublesSketch.java
+++ b/src/main/java/com/yahoo/sketches/pig/quantiles/DataToDoublesSketch.java
@@ -128,7 +128,10 @@
       final DoublesUnion union = unionBuilder_.build();
       final DataBag bag = (DataBag) inputTuple.get(0);
       for (final Tuple innerTuple: bag) {
-        union.update((Double) innerTuple.get(0));
+        Object value = innerTuple.get(0);
+        if(value != null) {
+          union.update((Double) value);
+        }
       }
       final DoublesSketch resultSketch = union.getResultAndReset();
       if (resultSketch != null) {
@@ -176,7 +179,10 @@
       accumUnion_ = unionBuilder_.build();
     }
     for (final Tuple innerTuple: bag) {
-      accumUnion_.update((Double) innerTuple.get(0));
+      Object value = innerTuple.get(0);
+        if(value != null) {
+          accumUnion_.update((Double) value);
+      }
     }
   }
 
@@ -317,7 +323,10 @@
             // It is due to system bagged outputs from multiple mapper Initial functions.
             // The Intermediate stage was bypassed.
             for (final Tuple innerTuple: innerBag) {
-              union.update((Double) innerTuple.get(0));
+              Object value = innerTuple.get(0);
+              if(value != null) {
+                union.update((Double) value);
+              }
             }
           } else if (f0 instanceof DataByteArray) { // inputTuple.bag0.dataTupleN.f0:DBA
             // If field 0 of a dataTuple is a DataByteArray we assume it is a sketch
diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/DataToItemsSketch.java b/src/main/java/com/yahoo/sketches/pig/quantiles/DataToItemsSketch.java
index c00fb5b..c1d28bb 100644
--- a/src/main/java/com/yahoo/sketches/pig/quantiles/DataToItemsSketch.java
+++ b/src/main/java/com/yahoo/sketches/pig/quantiles/DataToItemsSketch.java
@@ -122,7 +122,10 @@
           : ItemsUnion.getInstance(comparator_);
       final DataBag bag = (DataBag) inputTuple.get(0);
       for (final Tuple innerTuple: bag) {
-        union.update(extractValue(innerTuple.get(0)));
+        Object value = innerTuple.get(0);
+        if(value != null) {
+          union.update(extractValue(value));
+        }
       }
       final ItemsSketch<T> resultSketch = union.getResultAndReset();
       if (resultSketch != null) {
@@ -173,7 +176,10 @@
         : ItemsUnion.getInstance(comparator_);
     }
     for (final Tuple innerTuple: bag) {
-      accumUnion_.update(extractValue(innerTuple.get(0)));
+      Object value = innerTuple.get(0);
+      if(value != null) {
+        accumUnion_.update(extractValue(value));
+      }
     }
   }
 
@@ -307,7 +313,10 @@
             // It is due to system bagged outputs from multiple mapper Initial functions.
             // The Intermediate stage was bypassed.
             for (final Tuple innerTuple: innerBag) {
-              union.update(extractValue(innerTuple.get(0)));
+              Object value = innerTuple.get(0);
+              if(value != null) {
+                union.update(extractValue(value));
+              }
             }
           } else if (f0 instanceof DataByteArray) { // inputTuple.bag0.dataTupleN.f0:DBA
             // If field 0 of a dataTuple is a DataByteArray we assume it is a sketch
diff --git a/src/test/java/com/yahoo/sketches/pig/quantiles/DataToDoublesSketchTest.java b/src/test/java/com/yahoo/sketches/pig/quantiles/DataToDoublesSketchTest.java
index 9064dda..f08d4a2 100644
--- a/src/test/java/com/yahoo/sketches/pig/quantiles/DataToDoublesSketchTest.java
+++ b/src/test/java/com/yahoo/sketches/pig/quantiles/DataToDoublesSketchTest.java
@@ -66,6 +66,18 @@
     Assert.assertFalse(sketch.isEmpty());
     Assert.assertEquals(sketch.getN(), 1);
   }
+  
+  @Test
+  public void execMixedNullCase() throws Exception {
+    EvalFunc<Tuple> func = new DataToDoublesSketch();
+    DataBag bag = bagFactory.newDefaultBag();
+    bag.add(tupleFactory.newTuple(1.0));
+    bag.add(null);
+    Tuple resultTuple = func.exec(tupleFactory.newTuple(bag));
+    DoublesSketch sketch = getSketch(resultTuple);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 1);
+  }
 
   @Test
   public void accumulator() throws Exception {
@@ -103,6 +115,17 @@
     sketch = getSketch(resultTuple);
     Assert.assertFalse(sketch.isEmpty());
     Assert.assertEquals(sketch.getN(), 2);
+    
+    // mixed null case
+    bag = bagFactory.newDefaultBag();
+    bag.add(tupleFactory.newTuple(1.0));
+    bag.add(null);
+    func.accumulate(tupleFactory.newTuple(bag));
+    func.accumulate(tupleFactory.newTuple(bag));
+    resultTuple = func.getValue();
+    sketch = getSketch(resultTuple);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 2);
 
     // cleanup
     func.cleanup();
@@ -161,6 +184,30 @@
     Assert.assertFalse(sketch.isEmpty());
     Assert.assertEquals(sketch.getN(), 2);
   }
+  
+  @Test
+  public void algebraicIntermediateFinalMixedNullCase() throws Exception {
+    EvalFunc<Tuple> func = new DataToDoublesSketch.IntermediateFinal();
+    DataBag bag = bagFactory.newDefaultBag();
+
+    { // this is to simulate an output from Initial
+      DataBag innerBag = bagFactory.newDefaultBag();
+      innerBag.add(tupleFactory.newTuple(1.0));
+      innerBag.add(null);
+      bag.add(tupleFactory.newTuple(innerBag));
+    }
+
+    { // this is to simulate an output from a prior call of IntermediateFinal
+      UpdateDoublesSketch qs = DoublesSketch.builder().build();
+      qs.update(2.0);
+      bag.add(tupleFactory.newTuple(new DataByteArray(qs.toByteArray())));
+    }
+
+    Tuple resultTuple = func.exec(tupleFactory.newTuple(bag));
+    DoublesSketch sketch = getSketch(resultTuple);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 2);
+  }
 
   @Test(expectedExceptions = IllegalArgumentException.class)
   public void algebraicIntermediateFinalWrongType() throws Exception {
diff --git a/src/test/java/com/yahoo/sketches/pig/quantiles/DataToStringsSketchTest.java b/src/test/java/com/yahoo/sketches/pig/quantiles/DataToStringsSketchTest.java
index a38d799..1f986c5 100644
--- a/src/test/java/com/yahoo/sketches/pig/quantiles/DataToStringsSketchTest.java
+++ b/src/test/java/com/yahoo/sketches/pig/quantiles/DataToStringsSketchTest.java
@@ -73,6 +73,18 @@
     Assert.assertFalse(sketch.isEmpty());
     Assert.assertEquals(sketch.getN(), 1);
   }
+  
+  @Test
+  public void execMixedNormalCase() throws Exception {
+    EvalFunc<Tuple> func = new DataToStringsSketch();
+    DataBag bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple("a"));
+    bag.add(null);
+    Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(bag));
+    ItemsSketch<String> sketch = getSketch(resultTuple);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 1);
+  }
 
   @Test
   public void accumulator() throws Exception {
@@ -110,6 +122,17 @@
     sketch = getSketch(resultTuple);
     Assert.assertFalse(sketch.isEmpty());
     Assert.assertEquals(sketch.getN(), 2);
+    
+    // mixed null case
+    bag = BAG_FACTORY.newDefaultBag();
+    bag.add(TUPLE_FACTORY.newTuple("a"));
+    bag.add(null);
+    func.accumulate(TUPLE_FACTORY.newTuple(bag));
+    func.accumulate(TUPLE_FACTORY.newTuple(bag));
+    resultTuple = func.getValue();
+    sketch = getSketch(resultTuple);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 2);
 
     // cleanup
     func.cleanup();
@@ -177,6 +200,31 @@
     Assert.assertFalse(sketch.isEmpty());
     Assert.assertEquals(sketch.getN(), 2);
   }
+  
+  @Test
+  public void algebraicIntermediateFinalMixedNullCase() throws Exception {
+    @SuppressWarnings("unchecked")
+    EvalFunc<Tuple> func = (EvalFunc<Tuple>) Class.forName(new DataToStringsSketch().getIntermed()).newInstance();
+    DataBag bag = BAG_FACTORY.newDefaultBag();
+
+    { // this is to simulate an output from Initial
+      DataBag innerBag = BAG_FACTORY.newDefaultBag();
+      innerBag.add(TUPLE_FACTORY.newTuple("a"));
+      innerBag.add(null);
+      bag.add(TUPLE_FACTORY.newTuple(innerBag));
+    }
+
+    { // this is to simulate an output from a prior call of IntermediateFinal
+      ItemsSketch<String> qs = ItemsSketch.getInstance(COMPARATOR);
+      qs.update("b");
+      bag.add(TUPLE_FACTORY.newTuple(new DataByteArray(qs.toByteArray(SER_DE))));
+    }
+
+    Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(bag));
+    ItemsSketch<String> sketch = getSketch(resultTuple);
+    Assert.assertFalse(sketch.isEmpty());
+    Assert.assertEquals(sketch.getN(), 2);
+  }
 
   @Test(expectedExceptions = IllegalArgumentException.class)
   public void algebraicIntermediateFinalWrongType() throws Exception {