GIRAPH-1226

closes #112
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 401de77..6f9351b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -49,8 +49,10 @@
 import org.apache.giraph.graph.DefaultVertex;
 import org.apache.giraph.graph.DefaultVertexResolver;
 import org.apache.giraph.graph.DefaultVertexValueCombiner;
+import org.apache.giraph.graph.JobProgressTrackerClient;
 import org.apache.giraph.graph.Language;
 import org.apache.giraph.graph.MapperObserver;
+import org.apache.giraph.graph.RetryableJobProgressTrackerClient;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.VertexValueCombiner;
@@ -1201,9 +1203,17 @@
       new BooleanConfOption("giraph.trackJobProgressOnClient", false,
           "Whether to track job progress on client or not");
 
+  /** Class to use as the job progress client */
+  ClassConfOption<JobProgressTrackerClient> JOB_PROGRESS_TRACKER_CLIENT_CLASS =
+      ClassConfOption.create("giraph.jobProgressTrackerClientClass",
+        RetryableJobProgressTrackerClient.class,
+          JobProgressTrackerClient.class,
+          "Class to use to make calls to the job progress tracker service");
+
   /** Class to use to track job progress on client */
-  ClassConfOption<JobProgressTrackerService> JOB_PROGRESS_TRACKER_CLASS =
-      ClassConfOption.create("giraph.jobProgressTrackerClass",
+  ClassConfOption<JobProgressTrackerService>
+    JOB_PROGRESS_TRACKER_SERVICE_CLASS =
+      ClassConfOption.create("giraph.jobProgressTrackerServiceClass",
           DefaultJobProgressTrackerService.class,
           JobProgressTrackerService.class,
           "Class to use to track job progress on client");
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 08b45a2..3066a9e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -28,7 +28,6 @@
 import java.util.Enumeration;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import com.sun.management.GarbageCollectionNotificationInfo;
@@ -287,12 +286,15 @@
     if (!conf.trackJobProgressOnClient()) {
       jobProgressTracker = new JobProgressTrackerClientNoOp();
     } else {
+      jobProgressTracker =
+        GiraphConstants.JOB_PROGRESS_TRACKER_CLIENT_CLASS.newInstance(conf);
       try {
-        jobProgressTracker = new RetryableJobProgressTrackerClient(conf);
-      } catch (InterruptedException | ExecutionException e) {
-        LOG.warn("createJobProgressClient: Exception occurred while trying to" +
-            " connect to JobProgressTracker - not reporting progress", e);
-        jobProgressTracker = new JobProgressTrackerClientNoOp();
+        jobProgressTracker.init(conf);
+        // CHECKSTYLE: stop IllegalCatch
+      } catch (Exception e) {
+        // CHECKSTYLE: resume IllegalCatch
+        throw new RuntimeException(
+          "Failed to initialize JobProgressTrackerClient", e);
       }
     }
     jobProgressTracker.mapperStarted();
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java
index c302d9a..05586ec 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.job.JobProgressTracker;
 
 import java.io.IOException;
@@ -30,4 +31,11 @@
 public interface JobProgressTrackerClient extends JobProgressTracker {
   /** Close the connections if any */
   void cleanup() throws IOException;
+
+  /**
+   * Initialize the client.
+   * @param conf Job configuration
+   * @throws Exception
+   */
+  void init(GiraphConfiguration conf) throws Exception;
 }
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
index 6f1258d..43195c0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.master.MasterProgress;
 import org.apache.giraph.worker.WorkerProgress;
 
@@ -31,6 +32,10 @@
   }
 
   @Override
+  public void init(GiraphConfiguration conf) {
+  }
+
+  @Override
   public void mapperStarted() {
   }
 
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
index 93e8a24..8fed779 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
@@ -63,7 +63,7 @@
   private static final Logger LOG =
       Logger.getLogger(RetryableJobProgressTrackerClient.class);
   /** Configuration */
-  private final GiraphConfiguration conf;
+  private GiraphConfiguration conf;
   /** Thrift client manager to use to connect to job progress tracker */
   private ThriftClientManager clientManager;
   /** Job progress tracker */
@@ -74,6 +74,13 @@
   private int retryWaitMsec;
 
   /**
+   * Default constructor. Typically once an instance is created it should be
+   * initialized by calling {@link #init(GiraphConfiguration)}.
+   */
+  public RetryableJobProgressTrackerClient() {
+  }
+
+  /**
    * Constructor
    *
    * @param conf Giraph configuration
@@ -86,6 +93,14 @@
     resetConnection();
   }
 
+  @Override
+  public void init(GiraphConfiguration conf) throws Exception {
+    this.conf = conf;
+    numRetries = RETRYABLE_JOB_PROGRESS_CLIENT_NUM_RETRIES.get(conf);
+    retryWaitMsec = RETRYABLE_JOB_PROGRESS_CLIENT_RETRY_WAIT_MS.get(conf);
+    resetConnection();
+  }
+
   /**
    * Try to establish new connection to JobProgressTracker
    */
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
index a1e7f12..12fde42 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
@@ -270,7 +270,7 @@
     }
 
     JobProgressTrackerService jobProgressTrackerService =
-        GiraphConstants.JOB_PROGRESS_TRACKER_CLASS.newInstance(conf);
+        GiraphConstants.JOB_PROGRESS_TRACKER_SERVICE_CLASS.newInstance(conf);
     jobProgressTrackerService.init(conf, jobObserver);
     return jobProgressTrackerService;
   }