Provide more exception details

git-svn-id: https://svn.apache.org/repos/asf/hama/trunk@1681684 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java b/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
index f8766ce..d8b9b4f 100644
--- a/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
+++ b/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
@@ -227,7 +227,7 @@
         combiner = (Combiner<Writable>) ReflectionUtils
             .newInstance(combinerName);
       } catch (ClassNotFoundException e) {
-        e.printStackTrace();
+        throw new IOException(e);
       }
     }
   }
@@ -242,6 +242,7 @@
   private void doSuperstep(GraphJobMessage currentMessage,
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
+    this.errorCount = 0;
     long startTime = System.currentTimeMillis();
 
     this.changedVertexCnt = 0;
@@ -254,8 +255,7 @@
 
     long loopStartTime = System.currentTimeMillis();
     while (currentMessage != null) {
-      Runnable worker = new ComputeRunnable(currentMessage);
-      executor.execute(worker);
+      executor.execute(new ComputeRunnable(currentMessage));
 
       currentMessage = peer.getCurrentMessage();
     }
@@ -266,7 +266,12 @@
     try {
       executor.awaitTermination(60, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
-      LOG.error(e);
+      throw new IOException(e);
+    }
+
+    if (errorCount > 0) {
+      throw new IOException("there were " + errorCount
+          + " exceptions during compute vertices.");
     }
 
     Iterator it = vertices.iterator();
@@ -300,6 +305,7 @@
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     this.changedVertexCnt = 0;
+    this.errorCount = 0;
     vertices.startSuperstep();
 
     ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
@@ -308,45 +314,45 @@
     executor.setRejectedExecutionHandler(retryHandler);
 
     for (V v : vertices.keySet()) {
-      Runnable worker = new ComputeRunnable(v);
-      executor.execute(worker);
+      executor.execute(new ComputeRunnable(v));
     }
 
     executor.shutdown();
     try {
       executor.awaitTermination(60, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
-      LOG.error(e);
+      throw new IOException(e);
     }
 
+    if (errorCount > 0) {
+      throw new IOException("there were " + errorCount
+          + " exceptions during compute vertices.");
+    }
+    
     getAggregationRunner().sendAggregatorValues(peer, 1, this.changedVertexCnt);
     iteration++;
     finishSuperstep();
   }
 
+  private int errorCount = 0;
+
+  public synchronized void incrementErrorCount() {
+    errorCount++;
+  }
+
   class ComputeRunnable implements Runnable {
     Vertex<V, E, M> vertex;
     Iterable<M> msgs;
 
     @SuppressWarnings("unchecked")
-    public ComputeRunnable(GraphJobMessage msg) {
-      try {
-        this.vertex = vertices.get((V) msg.getVertexId());
-      } catch (IOException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
+    public ComputeRunnable(GraphJobMessage msg) throws IOException {
+      this.vertex = vertices.get((V) msg.getVertexId());
       this.msgs = (Iterable<M>) getIterableMessages(msg.getValuesBytes(),
           msg.getNumOfValues());
     }
 
-    public ComputeRunnable(V v) {
-      try {
-        this.vertex = vertices.get(v);
-      } catch (IOException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
+    public ComputeRunnable(V v) throws IOException {
+      this.vertex = vertices.get(v);
     }
 
     @Override
@@ -361,7 +367,8 @@
         vertex.compute(msgs);
         vertices.finishVertexComputation(vertex);
       } catch (IOException e) {
-        e.printStackTrace();
+        incrementErrorCount();
+        throw new RuntimeException(e);
       }
     }
   }
@@ -456,7 +463,7 @@
         vertexFinished = reader.parseVertex(next.getKey(), next.getValue(),
             vertex);
       } catch (Exception e) {
-        e.printStackTrace();
+        throw new IOException("Parse exception occured: " + e);
       }
 
       if (!vertexFinished) {
@@ -518,7 +525,7 @@
 
           addVertex(vertex);
         } catch (IOException e) {
-          e.printStackTrace();
+          throw new RuntimeException(e);
         }
       }
     }
@@ -543,7 +550,7 @@
           messages.get(partition).add(WritableUtils.serialize(vertex));
         }
       } catch (Exception e) {
-        e.printStackTrace();
+        throw new RuntimeException(e);
       }
     }
   }
@@ -742,7 +749,7 @@
             try {
               v.readFields(dis);
             } catch (IOException e) {
-              e.printStackTrace();
+              throw new RuntimeException(e);
             }
             index++;
             return v;