HAMA-959: Change to atomic counter from sync counter in MapVerticesInfo
git-svn-id: https://svn.apache.org/repos/asf/hama/trunk@1682802 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java b/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
index d8b9b4f..e1f758b 100644
--- a/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
+++ b/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
@@ -28,6 +28,7 @@
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -108,6 +109,10 @@
private int maxIteration = -1;
private long iteration = 0;
+ // global counter for thread exceptions
+ // TODO find more graceful way to handle thread exceptions.
+ private AtomicInteger errorCount = new AtomicInteger(0);
+
private AggregationRunner<V, E, M> aggregationRunner;
private VertexOutputWriter<Writable, Writable, V, E, M> vertexOutputWriter;
private Combiner<Writable> combiner;
@@ -116,6 +121,10 @@
private RejectedExecutionHandler retryHandler = new RetryRejectedExecutionHandler();
+ // Below maps are used for grouping messages into single GraphJobMessage, based on vertex ID.
+ private final ConcurrentHashMap<Integer, GraphJobMessage> partitionMessages = new ConcurrentHashMap<Integer, GraphJobMessage>();
+ private final ConcurrentHashMap<V, GraphJobMessage> vertexMessages = new ConcurrentHashMap<V, GraphJobMessage>();
+
@Override
public final void setup(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
@@ -242,7 +251,7 @@
private void doSuperstep(GraphJobMessage currentMessage,
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
- this.errorCount = 0;
+ this.errorCount.set(0);
long startTime = System.currentTimeMillis();
this.changedVertexCnt = 0;
@@ -269,7 +278,7 @@
throw new IOException(e);
}
- if (errorCount > 0) {
+ if (errorCount.get() > 0) {
throw new IOException("there were " + errorCount
+ " exceptions during compute vertices.");
}
@@ -305,7 +314,7 @@
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
this.changedVertexCnt = 0;
- this.errorCount = 0;
+ this.errorCount.set(0);
vertices.startSuperstep();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
@@ -324,7 +333,7 @@
throw new IOException(e);
}
- if (errorCount > 0) {
+ if (errorCount.get() > 0) {
throw new IOException("there were " + errorCount
+ " exceptions during compute vertices.");
}
@@ -334,10 +343,8 @@
finishSuperstep();
}
- private int errorCount = 0;
-
- public synchronized void incrementErrorCount() {
- errorCount++;
+ public void incrementErrorCount() {
+ errorCount.incrementAndGet();
}
class ComputeRunnable implements Runnable {
@@ -430,8 +437,6 @@
EDGE_VALUE_CLASS = edgeValueClass;
}
- private final ConcurrentHashMap<Integer, GraphJobMessage> messages = new ConcurrentHashMap<Integer, GraphJobMessage>();
-
/**
* Loads vertices into memory of each peer.
*/
@@ -441,7 +446,7 @@
throws IOException, SyncException, InterruptedException {
for (int i = 0; i < peer.getNumPeers(); i++) {
- messages.put(i, new GraphJobMessage());
+ partitionMessages.put(i, new GraphJobMessage());
}
VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
@@ -479,7 +484,7 @@
executor.awaitTermination(60, TimeUnit.SECONDS);
Iterator<Entry<Integer, GraphJobMessage>> it;
- it = messages.entrySet().iterator();
+ it = partitionMessages.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, GraphJobMessage> e = it.next();
it.remove();
@@ -547,7 +552,7 @@
if (peer.getPeerIndex() == partition) {
addVertex(vertex);
} else {
- messages.get(partition).add(WritableUtils.serialize(vertex));
+ partitionMessages.get(partition).add(WritableUtils.serialize(vertex));
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -690,20 +695,18 @@
vertices.finishAdditions();
}
- private final ConcurrentHashMap<V, GraphJobMessage> storage = new ConcurrentHashMap<V, GraphJobMessage>();
-
public void sendMessage(V vertexID, byte[] msg) throws IOException {
- if (!storage.containsKey(vertexID)) {
+ if (!vertexMessages.containsKey(vertexID)) {
// To save bit memory we don't set vertexID twice
- storage.putIfAbsent(vertexID, new GraphJobMessage());
+ vertexMessages.putIfAbsent(vertexID, new GraphJobMessage());
}
- storage.get(vertexID).add(msg);
+ vertexMessages.get(vertexID).add(msg);
}
public void finishSuperstep() throws IOException {
vertices.finishSuperstep();
- Iterator<Entry<V, GraphJobMessage>> it = storage.entrySet().iterator();
+ Iterator<Entry<V, GraphJobMessage>> it = vertexMessages.entrySet().iterator();
while (it.hasNext()) {
Entry<V, GraphJobMessage> e = it.next();
it.remove();
diff --git a/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java b/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
index 4f324e8..b84cd96 100644
--- a/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
+++ b/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
@@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -44,7 +45,7 @@
private GraphJobRunner<V, E, M> runner;
- private int activeVertices = 0;
+ private AtomicInteger activeVertices = new AtomicInteger(0);
@Override
public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
@@ -132,8 +133,8 @@
vertices.put(vertex.getVertexID(), WritableUtils.serialize(vertex));
}
- public synchronized void incrementCount() {
- activeVertices++;
+ public void incrementCount() {
+ activeVertices.incrementAndGet();
}
@Override
@@ -150,10 +151,10 @@
@Override
public void finishSuperstep() throws IOException {
- activeVertices = 0;
+ activeVertices.set(0);
}
public int getActiveVerticesNum() {
- return activeVertices;
+ return activeVertices.get();
}
}