APEXCORE-720 Update cloned LogicalPlan in Context before discovery of plugins
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
index 5e468f5..a4aca46 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
@@ -170,12 +170,10 @@
@Override
public void dispatch(Event event)
{
- if (!plugins.isEmpty()) {
- if (event.getType() == ApexPluginDispatcher.DAG_CHANGE) {
- clonedDAG = SerializationUtils.clone(((DAGChangeEvent)event).dag);
- } else if (event instanceof DAGExecutionEvent) {
- dispatchExecutionEvent((DAGExecutionEvent)event);
- }
+ if (event.getType() == ApexPluginDispatcher.DAG_CHANGE) {
+ clonedDAG = SerializationUtils.clone(((DAGChangeEvent)event).dag);
+ } else if (!plugins.isEmpty() && (event instanceof DAGExecutionEvent)) {
+ dispatchExecutionEvent((DAGExecutionEvent)event);
}
}
}
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
index 833d69f..654a4ce 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
@@ -27,6 +27,8 @@
import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
+import com.datatorrent.api.DAG;
+
import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.COMMIT_EVENT;
import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.HEARTBEAT_EVENT;
import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.STRAM_EVENT;
@@ -39,10 +41,13 @@
private int heartbeatCount = 0;
private int commitCount = 0;
CountDownLatch latch = new CountDownLatch(3);
+ private Context context;
@Override
public void setup(DAGExecutionPlugin.Context context)
{
+ this.context = context;
+
context.register(STRAM_EVENT, new EventHandler<DAGExecutionEvent.StramExecutionEvent>()
{
@Override
@@ -102,4 +107,9 @@
{
latch.await(timeout, TimeUnit.SECONDS);
}
+
+ public DAG getLogicalPlan()
+ {
+ return context.getDAG();
+ }
}
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
index 140dc65..34589b0 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
@@ -33,6 +33,7 @@
import com.datatorrent.api.Attribute;
import com.datatorrent.stram.api.StramEvent;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
public class PluginTests
@@ -93,12 +94,14 @@
}));
pluginManager.dispatch(new DAGExecutionEvent.CommitExecutionEvent(1234));
pluginManager.dispatch(new DAGExecutionEvent.HeartbeatExecutionEvent(new StreamingContainerUmbilicalProtocol.ContainerHeartbeat()));
+ LogicalPlan plan = new LogicalPlan();
+ pluginManager.dispatch(new ApexPluginDispatcher.DAGChangeEvent(plan));
debugPlugin.waitForEventDelivery(10);
pluginManager.stop();
Assert.assertEquals(1, debugPlugin.getEventCount());
Assert.assertEquals(1, debugPlugin.getHeartbeatCount());
Assert.assertEquals(1, debugPlugin.getCommitCount());
+ Assert.assertEquals(plan, debugPlugin.getLogicalPlan());
}
-
}