HAMA-959: Change to atomic counter from sync counter in MapVerticesInfo

git-svn-id: https://svn.apache.org/repos/asf/hama/trunk@1682802 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java b/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
index d8b9b4f..e1f758b 100644
--- a/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
+++ b/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
@@ -28,6 +28,7 @@
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -108,6 +109,10 @@
   private int maxIteration = -1;
   private long iteration = 0;
 
+  // global counter for thread exceptions
+  // TODO find more graceful way to handle thread exceptions.
+  private AtomicInteger errorCount = new AtomicInteger(0);
+  
   private AggregationRunner<V, E, M> aggregationRunner;
   private VertexOutputWriter<Writable, Writable, V, E, M> vertexOutputWriter;
   private Combiner<Writable> combiner;
@@ -116,6 +121,10 @@
 
   private RejectedExecutionHandler retryHandler = new RetryRejectedExecutionHandler();
 
+  // Below maps are used for grouping messages into single GraphJobMessage, based on vertex ID.
+  private final ConcurrentHashMap<Integer, GraphJobMessage> partitionMessages = new ConcurrentHashMap<Integer, GraphJobMessage>();
+  private final ConcurrentHashMap<V, GraphJobMessage> vertexMessages = new ConcurrentHashMap<V, GraphJobMessage>();
+
   @Override
   public final void setup(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
@@ -242,7 +251,7 @@
   private void doSuperstep(GraphJobMessage currentMessage,
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
-    this.errorCount = 0;
+    this.errorCount.set(0);
     long startTime = System.currentTimeMillis();
 
     this.changedVertexCnt = 0;
@@ -269,7 +278,7 @@
       throw new IOException(e);
     }
 
-    if (errorCount > 0) {
+    if (errorCount.get() > 0) {
       throw new IOException("there were " + errorCount
           + " exceptions during compute vertices.");
     }
@@ -305,7 +314,7 @@
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     this.changedVertexCnt = 0;
-    this.errorCount = 0;
+    this.errorCount.set(0);
     vertices.startSuperstep();
 
     ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
@@ -324,7 +333,7 @@
       throw new IOException(e);
     }
 
-    if (errorCount > 0) {
+    if (errorCount.get() > 0) {
       throw new IOException("there were " + errorCount
           + " exceptions during compute vertices.");
     }
@@ -334,10 +343,8 @@
     finishSuperstep();
   }
 
-  private int errorCount = 0;
-
-  public synchronized void incrementErrorCount() {
-    errorCount++;
+  public void incrementErrorCount() {
+    errorCount.incrementAndGet();
   }
 
   class ComputeRunnable implements Runnable {
@@ -430,8 +437,6 @@
     EDGE_VALUE_CLASS = edgeValueClass;
   }
 
-  private final ConcurrentHashMap<Integer, GraphJobMessage> messages = new ConcurrentHashMap<Integer, GraphJobMessage>();
-
   /**
    * Loads vertices into memory of each peer.
    */
@@ -441,7 +446,7 @@
       throws IOException, SyncException, InterruptedException {
 
     for (int i = 0; i < peer.getNumPeers(); i++) {
-      messages.put(i, new GraphJobMessage());
+      partitionMessages.put(i, new GraphJobMessage());
     }
 
     VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
@@ -479,7 +484,7 @@
     executor.awaitTermination(60, TimeUnit.SECONDS);
 
     Iterator<Entry<Integer, GraphJobMessage>> it;
-    it = messages.entrySet().iterator();
+    it = partitionMessages.entrySet().iterator();
     while (it.hasNext()) {
       Entry<Integer, GraphJobMessage> e = it.next();
       it.remove();
@@ -547,7 +552,7 @@
         if (peer.getPeerIndex() == partition) {
           addVertex(vertex);
         } else {
-          messages.get(partition).add(WritableUtils.serialize(vertex));
+          partitionMessages.get(partition).add(WritableUtils.serialize(vertex));
         }
       } catch (Exception e) {
         throw new RuntimeException(e);
@@ -690,20 +695,18 @@
     vertices.finishAdditions();
   }
 
-  private final ConcurrentHashMap<V, GraphJobMessage> storage = new ConcurrentHashMap<V, GraphJobMessage>();
-
   public void sendMessage(V vertexID, byte[] msg) throws IOException {
-    if (!storage.containsKey(vertexID)) {
+    if (!vertexMessages.containsKey(vertexID)) {
       // To save bit memory we don't set vertexID twice
-      storage.putIfAbsent(vertexID, new GraphJobMessage());
+      vertexMessages.putIfAbsent(vertexID, new GraphJobMessage());
     }
-    storage.get(vertexID).add(msg);
+    vertexMessages.get(vertexID).add(msg);
   }
 
   public void finishSuperstep() throws IOException {
     vertices.finishSuperstep();
 
-    Iterator<Entry<V, GraphJobMessage>> it = storage.entrySet().iterator();
+    Iterator<Entry<V, GraphJobMessage>> it = vertexMessages.entrySet().iterator();
     while (it.hasNext()) {
       Entry<V, GraphJobMessage> e = it.next();
       it.remove();
diff --git a/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java b/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
index 4f324e8..b84cd96 100644
--- a/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
+++ b/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
@@ -21,6 +21,7 @@
 import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -44,7 +45,7 @@
 
   private GraphJobRunner<V, E, M> runner;
 
-  private int activeVertices = 0;
+  private AtomicInteger activeVertices = new AtomicInteger(0);
 
   @Override
   public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
@@ -132,8 +133,8 @@
     vertices.put(vertex.getVertexID(), WritableUtils.serialize(vertex));
   }
 
-  public synchronized void incrementCount() {
-    activeVertices++;
+  public void incrementCount() {
+    activeVertices.incrementAndGet();
   }
 
   @Override
@@ -150,10 +151,10 @@
 
   @Override
   public void finishSuperstep() throws IOException {
-    activeVertices = 0;
+    activeVertices.set(0);
   }
 
   public int getActiveVerticesNum() {
-    return activeVertices;
+    return activeVertices.get();
   }
 }