GIRAPH-1226
closes #112
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 401de77..6f9351b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -49,8 +49,10 @@
import org.apache.giraph.graph.DefaultVertex;
import org.apache.giraph.graph.DefaultVertexResolver;
import org.apache.giraph.graph.DefaultVertexValueCombiner;
+import org.apache.giraph.graph.JobProgressTrackerClient;
import org.apache.giraph.graph.Language;
import org.apache.giraph.graph.MapperObserver;
+import org.apache.giraph.graph.RetryableJobProgressTrackerClient;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.VertexValueCombiner;
@@ -1201,9 +1203,17 @@
new BooleanConfOption("giraph.trackJobProgressOnClient", false,
"Whether to track job progress on client or not");
+ /** Class to use as the job progress client */
+ ClassConfOption<JobProgressTrackerClient> JOB_PROGRESS_TRACKER_CLIENT_CLASS =
+ ClassConfOption.create("giraph.jobProgressTrackerClientClass",
+ RetryableJobProgressTrackerClient.class,
+ JobProgressTrackerClient.class,
+ "Class to use to make calls to the job progress tracker service");
+
/** Class to use to track job progress on client */
- ClassConfOption<JobProgressTrackerService> JOB_PROGRESS_TRACKER_CLASS =
- ClassConfOption.create("giraph.jobProgressTrackerClass",
+ ClassConfOption<JobProgressTrackerService>
+ JOB_PROGRESS_TRACKER_SERVICE_CLASS =
+ ClassConfOption.create("giraph.jobProgressTrackerServiceClass",
DefaultJobProgressTrackerService.class,
JobProgressTrackerService.class,
"Class to use to track job progress on client");
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 08b45a2..3066a9e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -28,7 +28,6 @@
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import com.sun.management.GarbageCollectionNotificationInfo;
@@ -287,12 +286,15 @@
if (!conf.trackJobProgressOnClient()) {
jobProgressTracker = new JobProgressTrackerClientNoOp();
} else {
+ jobProgressTracker =
+ GiraphConstants.JOB_PROGRESS_TRACKER_CLIENT_CLASS.newInstance(conf);
try {
- jobProgressTracker = new RetryableJobProgressTrackerClient(conf);
- } catch (InterruptedException | ExecutionException e) {
- LOG.warn("createJobProgressClient: Exception occurred while trying to" +
- " connect to JobProgressTracker - not reporting progress", e);
- jobProgressTracker = new JobProgressTrackerClientNoOp();
+ jobProgressTracker.init(conf);
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Exception e) {
+ // CHECKSTYLE: resume IllegalCatch
+ throw new RuntimeException(
+ "Failed to initialize JobProgressTrackerClient", e);
}
}
jobProgressTracker.mapperStarted();
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java
index c302d9a..05586ec 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java
@@ -18,6 +18,7 @@
package org.apache.giraph.graph;
+import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.job.JobProgressTracker;
import java.io.IOException;
@@ -30,4 +31,11 @@
public interface JobProgressTrackerClient extends JobProgressTracker {
/** Close the connections if any */
void cleanup() throws IOException;
+
+ /**
+ * Initialize the client.
+ * @param conf Job configuration
+ * @throws Exception
+ */
+ void init(GiraphConfiguration conf) throws Exception;
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
index 6f1258d..43195c0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
@@ -18,6 +18,7 @@
package org.apache.giraph.graph;
+import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.master.MasterProgress;
import org.apache.giraph.worker.WorkerProgress;
@@ -31,6 +32,10 @@
}
@Override
+ public void init(GiraphConfiguration conf) {
+ }
+
+ @Override
public void mapperStarted() {
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
index 93e8a24..8fed779 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
@@ -63,7 +63,7 @@
private static final Logger LOG =
Logger.getLogger(RetryableJobProgressTrackerClient.class);
/** Configuration */
- private final GiraphConfiguration conf;
+ private GiraphConfiguration conf;
/** Thrift client manager to use to connect to job progress tracker */
private ThriftClientManager clientManager;
/** Job progress tracker */
@@ -74,6 +74,13 @@
private int retryWaitMsec;
/**
+ * Default constructor. Typically once an instance is created it should be
+ * initialized by calling {@link #init(GiraphConfiguration)}.
+ */
+ public RetryableJobProgressTrackerClient() {
+ }
+
+ /**
* Constructor
*
* @param conf Giraph configuration
@@ -86,6 +93,14 @@
resetConnection();
}
+ @Override
+ public void init(GiraphConfiguration conf) throws Exception {
+ this.conf = conf;
+ numRetries = RETRYABLE_JOB_PROGRESS_CLIENT_NUM_RETRIES.get(conf);
+ retryWaitMsec = RETRYABLE_JOB_PROGRESS_CLIENT_RETRY_WAIT_MS.get(conf);
+ resetConnection();
+ }
+
/**
* Try to establish new connection to JobProgressTracker
*/
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
index a1e7f12..12fde42 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
@@ -270,7 +270,7 @@
}
JobProgressTrackerService jobProgressTrackerService =
- GiraphConstants.JOB_PROGRESS_TRACKER_CLASS.newInstance(conf);
+ GiraphConstants.JOB_PROGRESS_TRACKER_SERVICE_CLASS.newInstance(conf);
jobProgressTrackerService.init(conf, jobObserver);
return jobProgressTrackerService;
}