[NEMO-358] Recycling vertex ids while cloning a vertex (#201)
JIRA: [NEMO-358: Recycling vertex ids while cloning a vertex](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-358)
**Major changes:**
- Implementation on the IDManager so that it saves the IDs that are no longer used, to be reused later on.
- Fixes the LoopExtractionPass (where cloning occurs) to meet with the new implementations.
**Minor changes to note:**
- None
**Tests for the changes:**
- Existing tests confirm the changes.
**Other comments:**
- None
Closes #201
diff --git a/common/src/main/java/org/apache/nemo/common/ir/IdManager.java b/common/src/main/java/org/apache/nemo/common/ir/IdManager.java
index 0efef4b..389af8b 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/IdManager.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/IdManager.java
@@ -18,6 +18,12 @@
*/
package org.apache.nemo.common.ir;
+import org.apache.nemo.common.dag.Vertex;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -34,6 +40,9 @@
private static AtomicInteger edgeId = new AtomicInteger(1);
private static volatile boolean isDriver = false;
+ // Vertex ID Map to be used upon cloning in loop vertices.
+ private static final Map<Vertex, Queue<String>> VERTEX_ID_MAP = new HashMap<>();
+
/**
* @return a new operator ID.
*/
@@ -42,6 +51,35 @@
}
/**
+ * Save the vertex id for the vertices that can be cloned later on.
+ * WARN: this should guarantee that the vertex is no longer used, otherwise, it would result in duplicate IDs.
+ * @param v the original vertex that is to be cloned later on (RootLoopVertex's vertex).
+ * @param id The IDs of the identical vertices.
+ */
+ public static void saveVertexId(final Vertex v, final String id) {
+ VERTEX_ID_MAP.putIfAbsent(v, new LinkedBlockingQueue<>());
+ VERTEX_ID_MAP.get(v).add(id);
+ }
+
+ /**
+ * Used for cloning vertices. If an existing ID exists, it returns the unused ID,
+ * otherwise simply acts as the newVertexId method.
+ * WARN: the #saveVertexId method should no longer use the ID saved at that moment,
+ * in order for this method to work correctly.
+ * @param v the vertex to get the ID for.
+ * @return the ID for the vertex.
+ */
+ public static String getVertexId(final Vertex v) {
+ final Queue<String> idQueue = VERTEX_ID_MAP.getOrDefault(v, null);
+ if (idQueue == null) {
+ return newVertexId();
+ } else {
+ final String id = idQueue.poll();
+ return id != null ? id : newVertexId();
+ }
+ }
+
+ /**
* @return a new edge ID.
*/
public static String newEdgeId() {
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/IRVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/IRVertex.java
index 953d09d..378fc40 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/IRVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/IRVertex.java
@@ -50,9 +50,9 @@
* @param that the source object for copying
*/
public IRVertex(final IRVertex that) {
- super(IdManager.newVertexId());
+ super(IdManager.getVertexId(that));
this.executionProperties = ExecutionPropertyMap.of(this);
- that.getExecutionProperties().forEachProperties(this::setProperty);
+ that.copyExecutionPropertiesTo(this);
}
/**
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
index b1ce3ca..b7a29e6 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
@@ -72,7 +72,7 @@
*
* @param that the source object for copying
*/
- public LoopVertex(final LoopVertex that) {
+ private LoopVertex(final LoopVertex that) {
super(that);
this.compositeTransformFullName = new String(that.compositeTransformFullName);
// Copy all elements to the clone
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/OperatorVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/OperatorVertex.java
index 7e2b953..4be72f0 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/OperatorVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/OperatorVertex.java
@@ -41,8 +41,8 @@
* Copy Constructor of OperatorVertex.
* @param that the source object for copying
*/
- public OperatorVertex(final OperatorVertex that) {
- super();
+ private OperatorVertex(final OperatorVertex that) {
+ super(that);
this.transform = that.transform;
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
index ce10847..1c37978 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
@@ -21,6 +21,7 @@
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.dag.DAGBuilder;
import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.IdManager;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.ir.vertex.IRVertex;
@@ -184,8 +185,11 @@
*/
private DAG<IRVertex, IREdge> loopRolling(final DAG<IRVertex, IREdge> dag) {
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
+ // Map for LoopVertex --> RootLoopVertex
final HashMap<LoopVertex, LoopVertex> loopVerticesOfSameLoop = new HashMap<>();
+ // RootLoopVertex --> Map of (RolledVertex --> (Root)Vertex)
final HashMap<LoopVertex, HashMap<IRVertex, IRVertex>> equivalentVerticesOfLoops = new HashMap<>();
+ // The RootLoopVertex that we're processing now.
LoopVertex rootLoopVertex = null;
// observe the DAG in a topological order.
@@ -203,8 +207,10 @@
rootLoopVertex = loopVertex;
loopVerticesOfSameLoop.putIfAbsent(rootLoopVertex, rootLoopVertex);
equivalentVerticesOfLoops.putIfAbsent(rootLoopVertex, new HashMap<>());
+ // Add the initial vertices
for (IRVertex vertex : rootLoopVertex.getDAG().getTopologicalSort()) {
equivalentVerticesOfLoops.get(rootLoopVertex).putIfAbsent(vertex, vertex);
+ IdManager.saveVertexId(vertex, vertex.getId());
}
addVertexToBuilder(builder, dag, rootLoopVertex, loopVerticesOfSameLoop);
} else { // following loops
@@ -218,9 +224,13 @@
final Iterator<IRVertex> verticesOfRootLoopVertex =
finalRootLoopVertex.getDAG().getTopologicalSort().iterator();
final Iterator<IRVertex> verticesOfCurrentLoopVertex = loopVertex.getDAG().getTopologicalSort().iterator();
+ // Map of (RolledVertex --> (Root)Vertex)
final HashMap<IRVertex, IRVertex> equivalentVertices = equivalentVerticesOfLoops.get(finalRootLoopVertex);
while (verticesOfRootLoopVertex.hasNext() && verticesOfCurrentLoopVertex.hasNext()) {
- equivalentVertices.put(verticesOfCurrentLoopVertex.next(), verticesOfRootLoopVertex.next());
+ final IRVertex vertexOfCurrentLoopVertex = verticesOfCurrentLoopVertex.next();
+ final IRVertex vertexOfRootLoopVertex = verticesOfRootLoopVertex.next();
+ equivalentVertices.put(vertexOfCurrentLoopVertex, vertexOfRootLoopVertex);
+ IdManager.saveVertexId(vertexOfRootLoopVertex, vertexOfCurrentLoopVertex.getId());
}
// reset non iterative incoming edges.
@@ -230,7 +240,7 @@
// incoming edges to the DAG.
loopVertex.getDagIncomingEdges().forEach((dstVertex, edges) -> edges.forEach(edge -> {
final IRVertex srcVertex = edge.getSrc();
- final IRVertex equivalentDstVertex = equivalentVertices.get(dstVertex);
+ final IRVertex equivalentDstVertex = equivalentVertices.get(dstVertex); // find the (Root)Vertex
if (equivalentVertices.containsKey(srcVertex)) {
// src is from the previous loop. vertex in previous loop -> DAG.