GIRAPH-630: Convergence detection broken in
o.a.g.examples.PageRankVertex. (ssc via aching)
diff --git a/CHANGELOG b/CHANGELOG
index 38e6e83..98d5490 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,8 +1,11 @@
Giraph Change Log
-Release 1.1 - unreleased
+Release 1.0.1 - unreleased
-Release 1.0 - 2013-04-12
+Release 1.0.0 - 2013-04-12
+
+ GIRAPH-630: Convergence detection broken in
+ o.a.g.examples.PageRankVertex. (ssc via aching)
GIRAPH-627: YARN build profile is broken. (rvs via aching)
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java
index 733ee53..9678b31 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java
@@ -47,10 +47,8 @@
getDanglingProbability() / getTotalNumVertices();
// recompute rank
- double rank = (1d - teleportationProbability) *
+ return (1d - teleportationProbability) *
(rankFromNeighbors + danglingContribution) +
teleportationProbability / getTotalNumVertices();
-
- return rank;
}
}
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
index 85c6e27..2d2c988 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
@@ -18,14 +18,11 @@
package org.apache.giraph.examples;
-import org.apache.giraph.aggregators.DoubleSumAggregator;
-import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.log4j.Logger;
import java.io.IOException;
@@ -42,17 +39,23 @@
/** Configuration parameter for the teleportation probability */
static final String TELEPORTATION_PROBABILITY = RandomWalkVertex.class
.getName() + ".teleportationProbability";
- /** Name of aggregator for collecting the probability of dangling vertices */
+ /** Name of aggregator for the probability of dangling vertices */
static final String CUMULATIVE_DANGLING_PROBABILITY = RandomWalkVertex.class
.getName() + ".cumulativeDanglingProbability";
+ /** Name of aggregator for the probability of all vertices */
+ static final String CUMULATIVE_PROBABILITY = RandomWalkVertex.class
+ .getName() + ".cumulativeProbability";
+ /** Name of aggregator for the probability of dangling vertices */
+ static final String NUM_DANGLING_VERTICES = RandomWalkVertex.class
+ .getName() + ".numDanglingVertices";
/** Name of aggregator for the L1 norm of the probability difference, used
* for covergence detection */
static final String L1_NORM_OF_PROBABILITY_DIFFERENCE = RandomWalkVertex.class
.getName() + ".l1NormOfProbabilityDifference";
- /** Logger */
- private static final Logger LOG = Logger.getLogger(RandomWalkVertex.class);
/** Reusable {@link DoubleWritable} instance to avoid object instantiation */
private final DoubleWritable doubleWritable = new DoubleWritable();
+ /** Reusable {@link LongWritable} for counting dangling vertices */
+ private final LongWritable one = new LongWritable(1);
/**
* Compute an initial probability value for the vertex. Per default,
@@ -83,34 +86,50 @@
double teleportationProbability);
/**
- * Returns the cumulative probability from dangling nodes.
- * @return The cumulative probability from dangling nodes.
+ * Returns the cumulative probability from dangling vertices.
+ * @return The cumulative probability from dangling vertices.
*/
protected double getDanglingProbability() {
return this.<DoubleWritable>getAggregatedValue(
RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get();
}
+ /**
+ * Returns the cumulative probability from dangling vertices.
+ * @return The cumulative probability from dangling vertices.
+ */
+ protected double getPreviousCumulativeProbability() {
+ return this.<DoubleWritable>getAggregatedValue(
+ RandomWalkVertex.CUMULATIVE_PROBABILITY).get();
+ }
+
@Override
public void compute(Iterable<DoubleWritable> messages) throws IOException {
double stateProbability;
if (getSuperstep() > 0) {
+
double previousStateProbability = getValue().get();
stateProbability = recompute(messages, teleportationProbability());
+ // Important: rescale for numerical stability
+ stateProbability /= getPreviousCumulativeProbability();
+
doubleWritable.set(Math.abs(stateProbability - previousStateProbability));
aggregate(L1_NORM_OF_PROBABILITY_DIFFERENCE, doubleWritable);
} else {
stateProbability = initialProbability();
}
- doubleWritable.set(stateProbability);
- setValue(doubleWritable);
+
+ getValue().set(stateProbability);
+
+ aggregate(CUMULATIVE_PROBABILITY, getValue());
// Compute dangling node contribution for next superstep
if (getNumEdges() == 0) {
- aggregate(CUMULATIVE_DANGLING_PROBABILITY, doubleWritable);
+ aggregate(NUM_DANGLING_VERTICES, one);
+ aggregate(CUMULATIVE_DANGLING_PROBABILITY, getValue());
}
if (getSuperstep() < maxSupersteps()) {
@@ -141,45 +160,4 @@
return ((RandomWalkWorkerContext) getWorkerContext())
.getTeleportationProbability();
}
-
- /**
- * Master compute associated with {@link RandomWalkVertex}. It handles
- * dangling nodes.
- */
- public static class RandomWalkVertexMasterCompute extends
- DefaultMasterCompute {
-
- /** threshold for the L1 norm of the state vector difference */
- static final double CONVERGENCE_THRESHOLD = 0.00001;
-
- @Override
- public void compute() {
- double danglingContribution =
- this.<DoubleWritable>getAggregatedValue(
- RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get();
- double l1NormOfStateDiff =
- this.<DoubleWritable>getAggregatedValue(
- RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE).get();
-
- LOG.info("[Superstep " + getSuperstep() + "] Dangling contribution = " +
- danglingContribution + ", L1 Norm of state vector difference = " +
- l1NormOfStateDiff);
-
- // Convergence check: halt once the L1 norm of the difference between the
- // state vectors fall under the threshold
- if (getSuperstep() > 1 && l1NormOfStateDiff < CONVERGENCE_THRESHOLD) {
- haltComputation();
- }
-
- }
-
- @Override
- public void initialize() throws InstantiationException,
- IllegalAccessException {
- registerAggregator(RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY,
- DoubleSumAggregator.class);
- registerAggregator(RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE,
- DoubleSumAggregator.class);
- }
- }
}
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java
new file mode 100644
index 0000000..9e5dbbf
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.aggregators.DoubleSumAggregator;
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+
+/**
+ * Master compute associated with {@link RandomWalkVertex}. It handles
+ * dangling nodes.
+ */
+public class RandomWalkVertexMasterCompute extends DefaultMasterCompute {
+
+ /** threshold for the L1 norm of the state vector difference */
+ static final double CONVERGENCE_THRESHOLD = 0.00001;
+
+ /** logger */
+ private static final Logger LOG =
+ Logger.getLogger(RandomWalkVertexMasterCompute.class);
+
+ @Override
+ public void compute() {
+ double danglingContribution =
+ this.<DoubleWritable>getAggregatedValue(
+ RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get();
+ double cumulativeProbability =
+ this.<DoubleWritable>getAggregatedValue(
+ RandomWalkVertex.CUMULATIVE_PROBABILITY).get();
+ double l1NormOfStateDiff =
+ this.<DoubleWritable>getAggregatedValue(
+ RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE).get();
+ long numDanglingVertices =
+ this.<LongWritable>getAggregatedValue(
+ RandomWalkVertex.NUM_DANGLING_VERTICES).get();
+
+ LOG.info("[Superstep " + getSuperstep() + "] Dangling contribution = " +
+ danglingContribution + ", number of dangling vertices = " +
+ numDanglingVertices + ", cumulative probability = " +
+ cumulativeProbability + ", L1 Norm of state vector difference = " +
+ l1NormOfStateDiff);
+
+ // Convergence check: halt once the L1 norm of the difference between the
+ // state vectors fall below the threshold
+ if (getSuperstep() > 1 && l1NormOfStateDiff < CONVERGENCE_THRESHOLD) {
+ haltComputation();
+ }
+ }
+
+ @Override
+ public void initialize() throws InstantiationException,
+ IllegalAccessException {
+ registerAggregator(RandomWalkVertex.NUM_DANGLING_VERTICES,
+ LongSumAggregator.class);
+ registerAggregator(RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY,
+ DoubleSumAggregator.class);
+ registerAggregator(RandomWalkVertex.CUMULATIVE_PROBABILITY,
+ DoubleSumAggregator.class);
+ registerAggregator(RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE,
+ DoubleSumAggregator.class);
+ }
+}
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java
index 2e4bbc4..b41bcf6 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java
@@ -43,7 +43,6 @@
String[] graph = new String[] {
"1 4 2 3",
"2 1",
- "3",
"4 3 2",
"5 2 4"
};
@@ -57,8 +56,7 @@
conf.setVertexOutputFormatClass(
VertexWithDoubleValueNullEdgeTextOutputFormat.class);
conf.setWorkerContextClass(RandomWalkWorkerContext.class);
- conf.setMasterComputeClass(
- RandomWalkVertex.RandomWalkVertexMasterCompute.class);
+ conf.setMasterComputeClass(RandomWalkVertexMasterCompute.class);
// Run internally
Iterable<String> results = InternalVertexRunner.run(conf, graph);
@@ -76,4 +74,5 @@
assertEquals(0.06784692727193153, steadyStateProbabilities.get(5l),
RandomWalkTestUtils.EPSILON);
}
+
}
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
index a3dbd45..6ecfefe 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
@@ -20,7 +20,6 @@
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.edge.ByteArrayEdges;
-import org.apache.giraph.examples.RandomWalkVertex.RandomWalkVertexMasterCompute;
import org.apache.giraph.utils.InternalVertexRunner;
import org.junit.Test;