In the current code, when both partitions happen to have zero-length, the return mean will be NaN.
Consequently, the result of mean after reducing over all partitions will also be NaN,
which is not correct if there are partitions with non-zero length. This patch fixes this issue.
diff --git a/core/src/main/scala/spark/util/StatCounter.scala b/core/src/main/scala/spark/util/StatCounter.scala
index 5f80180..2b98034 100644
--- a/core/src/main/scala/spark/util/StatCounter.scala
+++ b/core/src/main/scala/spark/util/StatCounter.scala
@@ -37,17 +37,23 @@
     if (other == this) {
       merge(other.copy())  // Avoid overwriting fields in a weird order
     } else {
-      val delta = other.mu - mu
-      if (other.n * 10 < n) {
-        mu = mu + (delta * other.n) / (n + other.n)
-      } else if (n * 10 < other.n) {
-        mu = other.mu - (delta * n) / (n + other.n)
-      } else {
-        mu = (mu * n + other.mu * other.n) / (n + other.n)
+      if (n == 0) {
+        mu = other.mu
+        m2 = other.m2
+        n = other.n       
+      } else if (other.n != 0) {        
+        val delta = other.mu - mu
+        if (other.n * 10 < n) {
+          mu = mu + (delta * other.n) / (n + other.n)
+        } else if (n * 10 < other.n) {
+          mu = other.mu - (delta * n) / (n + other.n)
+        } else {
+          mu = (mu * n + other.mu * other.n) / (n + other.n)
+        }
+        m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n)
+        n += other.n
       }
-      m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n)
-      n += other.n
-      this
+      this	   
     }
   }