GIRAPH-630: Convergence detection broken in
o.a.g.examples.PageRankVertex. (ssc via aching)
diff --git a/CHANGELOG b/CHANGELOG
index 38e6e83..98d5490 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,8 +1,11 @@
 Giraph Change Log
 
-Release 1.1 - unreleased
+Release 1.0.1 - unreleased
   
-Release 1.0 - 2013-04-12
+Release 1.0.0 - 2013-04-12
+
+  GIRAPH-630: Convergence detection broken in
+  o.a.g.examples.PageRankVertex. (ssc via aching)
 
   GIRAPH-627: YARN build profile is broken. (rvs via aching)
 
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java
index 733ee53..9678b31 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java
@@ -47,10 +47,8 @@
         getDanglingProbability() / getTotalNumVertices();
 
     // recompute rank
-    double rank = (1d - teleportationProbability) *
+    return (1d - teleportationProbability) *
         (rankFromNeighbors + danglingContribution) +
         teleportationProbability / getTotalNumVertices();
-
-    return rank;
   }
 }
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
index 85c6e27..2d2c988 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
@@ -18,14 +18,11 @@
 
 package org.apache.giraph.examples;
 
-import org.apache.giraph.aggregators.DoubleSumAggregator;
-import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.log4j.Logger;
 
 import java.io.IOException;
 
@@ -42,17 +39,23 @@
   /** Configuration parameter for the teleportation probability */
   static final String TELEPORTATION_PROBABILITY = RandomWalkVertex.class
       .getName() + ".teleportationProbability";
-  /** Name of aggregator for collecting the probability of dangling vertices */
+  /** Name of aggregator for the probability of dangling vertices */
   static final String CUMULATIVE_DANGLING_PROBABILITY = RandomWalkVertex.class
       .getName() + ".cumulativeDanglingProbability";
+  /** Name of aggregator for the probability of all vertices */
+  static final String CUMULATIVE_PROBABILITY = RandomWalkVertex.class
+      .getName() + ".cumulativeProbability";
+    /** Name of aggregator for the probability of dangling vertices */
+  static final String NUM_DANGLING_VERTICES = RandomWalkVertex.class
+      .getName() + ".numDanglingVertices";
   /** Name of aggregator for the L1 norm of the probability difference, used
    * for covergence detection */
   static final String L1_NORM_OF_PROBABILITY_DIFFERENCE = RandomWalkVertex.class
       .getName() + ".l1NormOfProbabilityDifference";
-  /** Logger */
-  private static final Logger LOG = Logger.getLogger(RandomWalkVertex.class);
   /** Reusable {@link DoubleWritable} instance to avoid object instantiation */
   private final DoubleWritable doubleWritable = new DoubleWritable();
+  /** Reusable {@link LongWritable} for counting dangling vertices */
+  private final LongWritable one = new LongWritable(1);
 
   /**
    * Compute an initial probability value for the vertex. Per default,
@@ -83,34 +86,50 @@
       double teleportationProbability);
 
   /**
-   * Returns the cumulative probability from dangling nodes.
-   * @return The cumulative probability from dangling nodes.
+   * Returns the cumulative probability from dangling vertices.
+   * @return The cumulative probability from dangling vertices.
    */
   protected double getDanglingProbability() {
     return this.<DoubleWritable>getAggregatedValue(
         RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get();
   }
 
+  /**
+   * Returns the cumulative probability from dangling vertices.
+   * @return The cumulative probability from dangling vertices.
+   */
+  protected double getPreviousCumulativeProbability() {
+    return this.<DoubleWritable>getAggregatedValue(
+        RandomWalkVertex.CUMULATIVE_PROBABILITY).get();
+  }
+
   @Override
   public void compute(Iterable<DoubleWritable> messages) throws IOException {
     double stateProbability;
 
     if (getSuperstep() > 0) {
+
       double previousStateProbability = getValue().get();
       stateProbability = recompute(messages, teleportationProbability());
 
+      // Important: rescale for numerical stability
+      stateProbability /= getPreviousCumulativeProbability();
+
       doubleWritable.set(Math.abs(stateProbability - previousStateProbability));
       aggregate(L1_NORM_OF_PROBABILITY_DIFFERENCE, doubleWritable);
 
     } else {
       stateProbability = initialProbability();
     }
-    doubleWritable.set(stateProbability);
-    setValue(doubleWritable);
+
+    getValue().set(stateProbability);
+
+    aggregate(CUMULATIVE_PROBABILITY, getValue());
 
     // Compute dangling node contribution for next superstep
     if (getNumEdges() == 0) {
-      aggregate(CUMULATIVE_DANGLING_PROBABILITY, doubleWritable);
+      aggregate(NUM_DANGLING_VERTICES, one);
+      aggregate(CUMULATIVE_DANGLING_PROBABILITY, getValue());
     }
 
     if (getSuperstep() < maxSupersteps()) {
@@ -141,45 +160,4 @@
     return ((RandomWalkWorkerContext) getWorkerContext())
         .getTeleportationProbability();
   }
-
-  /**
-   * Master compute associated with {@link RandomWalkVertex}. It handles
-   * dangling nodes.
-   */
-  public static class RandomWalkVertexMasterCompute extends
-      DefaultMasterCompute {
-
-    /** threshold for the L1 norm of the state vector difference  */
-    static final double CONVERGENCE_THRESHOLD = 0.00001;
-
-    @Override
-    public void compute() {
-      double danglingContribution =
-          this.<DoubleWritable>getAggregatedValue(
-          RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get();
-      double l1NormOfStateDiff =
-          this.<DoubleWritable>getAggregatedValue(
-          RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE).get();
-
-      LOG.info("[Superstep " + getSuperstep() + "] Dangling contribution = " +
-          danglingContribution + ", L1 Norm of state vector difference = " +
-          l1NormOfStateDiff);
-
-      // Convergence check: halt once the L1 norm of the difference between the
-      // state vectors fall under the threshold
-      if (getSuperstep() > 1 && l1NormOfStateDiff < CONVERGENCE_THRESHOLD) {
-        haltComputation();
-      }
-
-    }
-
-    @Override
-    public void initialize() throws InstantiationException,
-        IllegalAccessException {
-      registerAggregator(RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY,
-          DoubleSumAggregator.class);
-      registerAggregator(RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE,
-          DoubleSumAggregator.class);
-    }
-  }
 }
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java
new file mode 100644
index 0000000..9e5dbbf
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.aggregators.DoubleSumAggregator;
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+
+/**
+ * Master compute associated with {@link RandomWalkVertex}. It handles
+ * dangling nodes.
+ */
+public class RandomWalkVertexMasterCompute extends DefaultMasterCompute {
+
+  /** threshold for the L1 norm of the state vector difference  */
+  static final double CONVERGENCE_THRESHOLD = 0.00001;
+
+  /** logger */
+  private static final Logger LOG =
+      Logger.getLogger(RandomWalkVertexMasterCompute.class);
+
+  @Override
+  public void compute() {
+    double danglingContribution =
+        this.<DoubleWritable>getAggregatedValue(
+            RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get();
+    double cumulativeProbability =
+        this.<DoubleWritable>getAggregatedValue(
+            RandomWalkVertex.CUMULATIVE_PROBABILITY).get();
+    double l1NormOfStateDiff =
+        this.<DoubleWritable>getAggregatedValue(
+            RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE).get();
+    long numDanglingVertices =
+        this.<LongWritable>getAggregatedValue(
+            RandomWalkVertex.NUM_DANGLING_VERTICES).get();
+
+    LOG.info("[Superstep " + getSuperstep() + "] Dangling contribution = " +
+        danglingContribution + ", number of dangling vertices = " +
+        numDanglingVertices + ", cumulative probability = " +
+        cumulativeProbability + ", L1 Norm of state vector difference = " +
+        l1NormOfStateDiff);
+
+    // Convergence check: halt once the L1 norm of the difference between the
+    // state vectors fall below the threshold
+    if (getSuperstep() > 1 && l1NormOfStateDiff < CONVERGENCE_THRESHOLD) {
+      haltComputation();
+    }
+  }
+
+  @Override
+  public void initialize() throws InstantiationException,
+      IllegalAccessException {
+    registerAggregator(RandomWalkVertex.NUM_DANGLING_VERTICES,
+        LongSumAggregator.class);
+    registerAggregator(RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY,
+        DoubleSumAggregator.class);
+    registerAggregator(RandomWalkVertex.CUMULATIVE_PROBABILITY,
+        DoubleSumAggregator.class);
+    registerAggregator(RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE,
+        DoubleSumAggregator.class);
+  }
+}
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java
index 2e4bbc4..b41bcf6 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java
@@ -43,7 +43,6 @@
     String[] graph = new String[] {
       "1 4 2 3",
       "2 1",
-      "3",
       "4 3 2",
       "5 2 4"
     };
@@ -57,8 +56,7 @@
     conf.setVertexOutputFormatClass(
         VertexWithDoubleValueNullEdgeTextOutputFormat.class);
     conf.setWorkerContextClass(RandomWalkWorkerContext.class);
-    conf.setMasterComputeClass(
-        RandomWalkVertex.RandomWalkVertexMasterCompute.class);
+    conf.setMasterComputeClass(RandomWalkVertexMasterCompute.class);
     // Run internally
     Iterable<String> results = InternalVertexRunner.run(conf, graph);
 
@@ -76,4 +74,5 @@
     assertEquals(0.06784692727193153, steadyStateProbabilities.get(5l),
         RandomWalkTestUtils.EPSILON);
   }
+
 }
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
index a3dbd45..6ecfefe 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
@@ -20,7 +20,6 @@
 
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.edge.ByteArrayEdges;
-import org.apache.giraph.examples.RandomWalkVertex.RandomWalkVertexMasterCompute;
 import org.apache.giraph.utils.InternalVertexRunner;
 import org.junit.Test;