GIRAPH-1162
closes #51
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 63f6aca..9340472 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -1181,6 +1181,15 @@
JobProgressTrackerService.class,
"Class to use to track job progress on client");
+ /**
+ * Minimum number of vertices to compute before adding to worker progress.
+ */
+ LongConfOption VERTICES_TO_UPDATE_PROGRESS =
+ new LongConfOption("giraph.VerticesToUpdateProgress", 100000,
+ "Minimum number of vertices to compute before " +
+ "updating worker progress");
+
+
/** Number of retries for creating the HDFS files */
IntConfOption HDFS_FILE_CREATION_RETRIES =
new IntConfOption("giraph.hdfs.file.creation.retries", 10,
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 91b740b..cdd9877 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -26,6 +26,7 @@
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.function.primitive.PrimitiveRefs.LongRef;
import org.apache.giraph.io.SimpleVertexWriter;
@@ -78,7 +79,7 @@
/** Class time object */
private static final Time TIME = SystemTime.get();
/** How often to update WorkerProgress */
- private static final long VERTICES_TO_UPDATE_PROGRESS = 100000;
+ private final long verticesToUpdateProgress;
/** Context */
private final Mapper<?, ?, ?, ?>.Context context;
/** Graph state */
@@ -140,6 +141,8 @@
metrics.getUniformHistogram("wait-per-thread-ms");
histogramProcessingTimePerThread =
metrics.getUniformHistogram("processing-per-thread-ms");
+ verticesToUpdateProgress =
+ GiraphConstants.VERTICES_TO_UPDATE_PROGRESS.get(configuration);
}
@Override
@@ -278,11 +281,12 @@
PartitionStats partitionStats =
new PartitionStats(partition.getId(), 0, 0, 0, 0, 0);
final LongRef verticesComputedProgress = new LongRef(0);
+
Progressable verticesProgressable = new Progressable() {
@Override
public void progress() {
verticesComputedProgress.value++;
- if (verticesComputedProgress.value == VERTICES_TO_UPDATE_PROGRESS) {
+ if (verticesComputedProgress.value == verticesToUpdateProgress) {
WorkerProgress.get().addVerticesComputed(
verticesComputedProgress.value);
verticesComputedProgress.value = 0;