APEXCORE-700 Uniform interface between setup and runtime plugins
diff --git a/api/src/main/java/org/apache/apex/api/plugin/DAGSetupEvent.java b/api/src/main/java/org/apache/apex/api/plugin/DAGSetupEvent.java
new file mode 100644
index 0000000..95d17e2
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/plugin/DAGSetupEvent.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api.plugin;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+
+@Evolving
+public class DAGSetupEvent extends Event.BaseEvent<DAGSetupEvent.Type>
+{
+  @Evolving
+  public enum Type implements Event.Type
+  {
+    /**
+     * This event is sent before platform adds operators and streams in the DAG. i.e this method
+     * will get called just before {@link com.datatorrent.api.StreamingApplication#populateDAG(DAG, Configuration)}
+     *
+     * For Application specified using property and json file format, this will be sent
+     * before platform adds operators and streams in the DAG as per specification in the file.
+     */
+    PRE_POPULATE_DAG,
+
+    /**
+     * This event is sent after platform adds operators and streams in the DAG. i.e this method
+     * will get called just after {@link com.datatorrent.api.StreamingApplication#populateDAG(DAG, Configuration)}
+     * in case application is specified in java.
+     *
+     * For Application specified using property and json file format, this will be sent
+     * after platform has added operators and streams in the DAG as per specification in the file.
+     */
+    POST_POPULATE_DAG,
+
+    /**
+     * This event is sent before DAG is configured, i.e operator and application
+     * properties/attributes are injected from configuration files.
+     */
+    PRE_CONFIGURE_DAG,
+
+    /**
+     * This event is sent after DAG is configured, i.e operator and application
+     * properties/attributes are injected from configuration files.
+     */
+    POST_CONFIGURE_DAG,
+
+    /**
+     * This event is sent just before dag is validated before final job submission.
+     */
+    PRE_VALIDATE_DAG,
+
+    /**
+     * This event is sent after dag is validated. If plugin makes in incompatible changes
+     * to the DAG at this stage, then application may get launched incorrectly or application
+     * launch may fail.
+     */
+    POST_VALIDATE_DAG;
+
+    public final DAGSetupEvent event;
+
+    Type()
+    {
+      event = new DAGSetupEvent(this);
+    }
+  }
+
+  private DAGSetupEvent(DAGSetupEvent.Type type)
+  {
+    super(type);
+  }
+}
diff --git a/api/src/main/java/org/apache/apex/api/plugin/DAGSetupPlugin.java b/api/src/main/java/org/apache/apex/api/plugin/DAGSetupPlugin.java
index faa6798..31ea1f3 100644
--- a/api/src/main/java/org/apache/apex/api/plugin/DAGSetupPlugin.java
+++ b/api/src/main/java/org/apache/apex/api/plugin/DAGSetupPlugin.java
@@ -18,12 +18,9 @@
  */
 package org.apache.apex.api.plugin;
 
-import java.util.Collection;
-
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 
-import com.datatorrent.api.Attribute;
 import com.datatorrent.api.DAG;
 
 /**
@@ -39,96 +36,18 @@
  *   <li>After dag is validated</li>
  * </ul>
  */
-@InterfaceStability.Evolving
-public interface DAGSetupPlugin extends Plugin<DAGSetupPlugin.DAGSetupPluginContext>
+@Evolving
+public interface DAGSetupPlugin<T extends DAGSetupPlugin.Context> extends Plugin<T>
 {
-
   /**
-   * This method is called before platform adds operators and streams in the DAG. i.e this method
-   * will get called just before {@link com.datatorrent.api.StreamingApplication#populateDAG(DAG, Configuration)}
-   *
-   * For Application specified using property and json file format, this method will get called
-   * before platform adds operators and streams in the DAG as per specification in the file.
+   * The context for the setup plugins
    */
-  void prePopulateDAG();
-
-  /**
-   * This method is called after platform adds operators and streams in the DAG. i.e this method
-   * will get called just after {@link com.datatorrent.api.StreamingApplication#populateDAG(DAG, Configuration)}
-   * in case application is specified in java.
-   *
-   * For Application specified using property and json file format, this method will get called
-   * after platform has added operators and streams in the DAG as per specification in the file.
-   */
-  void postPopulateDAG();
-
-  /**
-   * This is method is called before DAG is configured, i.e operator and application
-   * properties/attributes are injected from configuration files.
-   */
-  void preConfigureDAG();
-
-  /**
-   * This is method is called after DAG is configured, i.e operator and application
-   * properties/attributes are injected from configuration files.
-   */
-  void postConfigureDAG();
-
-  /**
-   * This method is called just before dag is validated before final job submission.
-   */
-  void preValidateDAG();
-
-  /**
-   * This method is called after dag is validated. If plugin makes in incompatible changes
-   * to the DAG at this stage, then application may get launched incorrectly or application
-   * launch may fail.
-   */
-  void postValidateDAG();
-
-  class DAGSetupPluginContext implements PluginContext
+  @Evolving
+  interface Context<E extends DAGSetupEvent> extends PluginContext<DAGSetupEvent.Type, E>
   {
-    private final DAG dag;
-    private final Configuration conf;
 
-    public DAGSetupPluginContext(DAG dag, Configuration conf)
-    {
-      this.dag = dag;
-      this.conf = conf;
-    }
+    DAG getDAG();
 
-    public DAG getDAG()
-    {
-      return dag;
-    }
-
-    public Configuration getConfiguration()
-    {
-      return conf;
-    }
-
-    @Override
-    public Attribute.AttributeMap getAttributes()
-    {
-      throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public <T> T getValue(Attribute<T> key)
-    {
-      throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public void setCounters(Object counters)
-    {
-      throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public void sendMetrics(Collection<String> metricNames)
-    {
-      throw new UnsupportedOperationException("Not supported yet.");
-    }
+    Configuration getConfiguration();
   }
 }
diff --git a/api/src/main/java/org/apache/apex/api/plugin/Event.java b/api/src/main/java/org/apache/apex/api/plugin/Event.java
new file mode 100644
index 0000000..9b95187
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/plugin/Event.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api.plugin;
+
+import org.apache.apex.api.plugin.Event.Type;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * The class represents a plugin event that is delivered to plugins to notify them of important system events.
+ *
+ * Plugins express interest in receiving events by registering handlers for the event type and their handlers receive
+ * the events.
+ * @param <T> event type
+ */
+@Evolving
+public interface Event<T extends Type>
+{
+  /**
+   * Marker interface for plugin event type.
+   */
+  @Evolving
+  interface Type
+  {
+  }
+
+  T getType();
+
+  @Evolving
+  class BaseEvent<T extends Type> implements Event<T>
+  {
+    private T type;
+
+    protected BaseEvent(T type)
+    {
+      this.type = type;
+    }
+
+    public T getType()
+    {
+      return type;
+    }
+  }
+}
diff --git a/api/src/main/java/org/apache/apex/api/plugin/Plugin.java b/api/src/main/java/org/apache/apex/api/plugin/Plugin.java
index ffe52ea..e0a3872 100644
--- a/api/src/main/java/org/apache/apex/api/plugin/Plugin.java
+++ b/api/src/main/java/org/apache/apex/api/plugin/Plugin.java
@@ -18,15 +18,67 @@
  */
 package org.apache.apex.api.plugin;
 
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
 
 import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
 
 /**
- * Marker interface for ApexPlugins.
- * @param <T>
+ * An Apex plugin is user code which runs inside the Apex engine. Plugin implementations implement this interface.
+ *
+ * Plugins can identify extension points by registering interest in events in the {@link Component#setup(Context)}
+ * initialization method. They should also cleanup any additional resources created during shutdown such as helper
+ * threads and open files in the {@link Component#teardown()} method.
+ * @param <T> plugin context type
  */
-@InterfaceStability.Evolving
-public interface Plugin<T extends PluginContext> extends Component<T>
+@Evolving
+public interface Plugin<T extends Plugin.PluginContext> extends Component<T>
 {
+
+  /**
+   * An Apex plugin is user code which runs inside the Apex engine. The interaction between plugin and engine is managed
+   * by PluginContext. Plugins can register interest in different events in the engine using the
+   * ${@link PluginContext#register(Event.Type, EventHandler)} method.
+   *
+   * @param <T> the type of the Event.Type
+   * @param <E> the event type
+   */
+  @Evolving
+  interface PluginContext<T extends Event.Type, E extends Event<T>> extends Context
+  {
+
+    /**
+     * Register interest in an event.
+     *
+     * Plugins register interest in events using this method. They would need to specify the event type and a handler to
+     * handle the event, that would get called when the event occurs. A plugin can register interest in several events but
+     * should register only a single handler for any specific event. In case register is called multiple times with the
+     * same event type, then the last registered handler will be used.
+     *
+     * When an event occurs the
+     * {@link EventHandler#handle(Event event)} method gets called with the event data.
+     *
+     * @param type The event type
+     * @param handler The event handler
+     */
+    void register(T type, EventHandler<E> handler);
+  }
+
+  /**
+   * A handler that handles an event in the Apex engine. Plugins register interest in events by registering handlers
+   * using the PluginContext.
+   * @param <E> The event type
+   */
+  @Evolving
+  interface EventHandler<E extends Event>
+  {
+    /**
+     * Handle a event.
+     *
+     * This method is called when the event occurs.
+     *
+     * @param event
+     */
+    void handle(E event);
+  }
 }
diff --git a/api/src/main/java/org/apache/apex/api/plugin/PluginContext.java b/api/src/main/java/org/apache/apex/api/plugin/PluginContext.java
deleted file mode 100644
index 2bdaf00..0000000
--- a/api/src/main/java/org/apache/apex/api/plugin/PluginContext.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.api.plugin;
-
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.api.Context;
-
-/**
- * Marker interface for Context used by ApexPlugins. Plugin interfaces with
- * the Apex through the context.
- */
-@InterfaceStability.Evolving
-public interface PluginContext extends Context
-{
-}
diff --git a/common/src/main/java/org/apache/apex/common/util/BaseDAGSetupPlugin.java b/common/src/main/java/org/apache/apex/common/util/BaseDAGSetupPlugin.java
deleted file mode 100644
index 9bc5d8e..0000000
--- a/common/src/main/java/org/apache/apex/common/util/BaseDAGSetupPlugin.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.common.util;
-
-import org.apache.apex.api.plugin.DAGSetupPlugin;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * Base class for DAGSetupPlugin implementations that provides empty implementations
- * for all interface methods.
- */
-@InterfaceStability.Evolving
-public class BaseDAGSetupPlugin implements DAGSetupPlugin
-{
-  @Override
-  public void setup(DAGSetupPluginContext context)
-  {
-
-  }
-
-  @Override
-  public void prePopulateDAG()
-  {
-
-  }
-
-  @Override
-  public void teardown()
-  {
-
-  }
-
-  @Override
-  public void postPopulateDAG()
-  {
-
-  }
-
-  @Override
-  public void preConfigureDAG()
-  {
-
-  }
-
-  @Override
-  public void postConfigureDAG()
-  {
-
-  }
-
-  @Override
-  public void preValidateDAG()
-  {
-
-  }
-
-  @Override
-  public void postValidateDAG()
-  {
-
-  }
-}
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 2e88114..0ca8cd1 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -115,7 +115,6 @@
 import com.datatorrent.stram.webapp.StramWebApp;
 
 import static java.lang.Thread.sleep;
-import static org.apache.apex.engine.plugin.ApexPluginDispatcher.DAG_CHANGE_EVENT;
 
 /**
  * Streaming Application Master
@@ -600,7 +599,7 @@
     apexPluginDispatcher = new DefaultApexPluginDispatcher(locator, appContext, dnmgr, stats);
     dnmgr.apexPluginDispatcher = apexPluginDispatcher;
     addService(apexPluginDispatcher);
-    apexPluginDispatcher.dispatch(DAG_CHANGE_EVENT, dnmgr.getLogicalPlan());
+    apexPluginDispatcher.dispatch(new ApexPluginDispatcher.DAGChangeEvent(dnmgr.getLogicalPlan()));
   }
 
   @Override
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 8d99dc1..c4e76a5 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -65,6 +65,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
 import org.apache.apex.engine.plugin.ApexPluginDispatcher;
 import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher;
 import org.apache.apex.engine.util.CascadeStorageAgent;
@@ -178,11 +179,6 @@
 import net.engio.mbassy.bus.MBassador;
 import net.engio.mbassy.bus.config.BusConfiguration;
 
-import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.COMMIT_EVENT;
-import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.HEARTBEAT;
-import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.STRAM_EVENT;
-import static org.apache.apex.engine.plugin.ApexPluginDispatcher.DAG_CHANGE_EVENT;
-
 /**
  * Tracks topology provisioning/allocation to containers<p>
  * <br>
@@ -818,7 +814,7 @@
 
     committedWindowId = updateCheckpoints(waitForRecovery);
     if (lastCommittedWindowId != committedWindowId) {
-      apexPluginDispatcher.dispatch(COMMIT_EVENT, committedWindowId);
+      apexPluginDispatcher.dispatch(new DAGExecutionEvent.CommitExecutionEvent(committedWindowId));
       lastCommittedWindowId = committedWindowId;
     }
     calculateEndWindowStats();
@@ -1817,7 +1813,7 @@
     rsp.stackTraceRequired = sca.stackTraceRequested;
     sca.stackTraceRequested = false;
 
-    apexPluginDispatcher.dispatch(HEARTBEAT, heartbeat);
+    apexPluginDispatcher.dispatch(new DAGExecutionEvent.HeartbeatExecutionEvent(heartbeat));
     return rsp;
   }
 
@@ -2449,7 +2445,7 @@
   @Override
   public void recordEventAsync(StramEvent ev)
   {
-    apexPluginDispatcher.dispatch(STRAM_EVENT, ev);
+    apexPluginDispatcher.dispatch(new DAGExecutionEvent.StramExecutionEvent(ev));
     if (eventBus != null) {
       eventBus.publishAsync(ev);
     }
@@ -3083,7 +3079,7 @@
         recordEventAsync(new StramEvent.ChangeLogicalPlanEvent(request));
       }
       pm.applyChanges(StreamingContainerManager.this);
-      apexPluginDispatcher.dispatch(DAG_CHANGE_EVENT, plan.getLogicalPlan());
+      apexPluginDispatcher.dispatch(new ApexPluginDispatcher.DAGChangeEvent(plan.getLogicalPlan()));
       LOG.info("Plan changes applied: {}", requests);
       return null;
     }
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java
index c7c2767..370aaaa 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java
@@ -19,14 +19,23 @@
 package com.datatorrent.stram.plan.logical;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 import org.slf4j.Logger;
 
+import org.apache.apex.api.plugin.DAGSetupEvent;
 import org.apache.apex.api.plugin.DAGSetupPlugin;
+import org.apache.apex.api.plugin.Plugin.EventHandler;
 import org.apache.apex.engine.plugin.loaders.PropertyBasedPluginLocator;
 import org.apache.hadoop.conf.Configuration;
 
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.DAG;
+
 import static org.slf4j.LoggerFactory.getLogger;
 
 public class DAGSetupPluginManager
@@ -37,7 +46,8 @@
   private Configuration conf;
 
   public static final String DAGSETUP_PLUGINS_CONF_KEY = "apex.plugin.dag.setup";
-  private DAGSetupPlugin.DAGSetupPluginContext contex;
+
+  private final Table<DAGSetupEvent.Type, DAGSetupPlugin, EventHandler<DAGSetupEvent>> table = HashBasedTable.create();
 
   private void loadVisitors(Configuration conf)
   {
@@ -50,56 +60,82 @@
     this.plugins.addAll(locator.discoverPlugins(conf));
   }
 
-  public void setup(DAGSetupPlugin.DAGSetupPluginContext context)
+  private class DefaultDAGSetupPluginContext implements DAGSetupPlugin.Context<DAGSetupEvent>
   {
-    this.contex = context;
+    private final DAG dag;
+    private final Configuration conf;
+    private DAGSetupPlugin plugin;
+
+    public DefaultDAGSetupPluginContext(DAG dag, Configuration conf, DAGSetupPlugin plugin)
+    {
+      this.dag = dag;
+      this.conf = conf;
+      this.plugin = plugin;
+    }
+
+    @Override
+    public void register(DAGSetupEvent.Type type, EventHandler<DAGSetupEvent> handler)
+    {
+      table.put(type, plugin, handler);
+    }
+
+    public DAG getDAG()
+    {
+      return dag;
+    }
+
+    public Configuration getConfiguration()
+    {
+      return conf;
+    }
+
+    @Override
+    public Attribute.AttributeMap getAttributes()
+    {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public <T> T getValue(Attribute<T> key)
+    {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public void setCounters(Object counters)
+    {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public void sendMetrics(Collection<String> metricNames)
+    {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+  }
+
+  public void setup(DAG dag)
+  {
     for (DAGSetupPlugin plugin : plugins) {
+      DAGSetupPlugin.Context context = new DefaultDAGSetupPluginContext(dag, conf, plugin);
       plugin.setup(context);
     }
   }
 
-  public enum DispatchType
-  {
-    SETUP,
-    PRE_POPULATE,
-    POST_POPULATE,
-    PRE_CONFIGURE,
-    POST_CONFIGURE,
-    PRE_VALIDATE,
-    POST_VALIDATE,
-    TEARDOWN
-  }
-
-  public void dispatch(DispatchType type, DAGSetupPlugin.DAGSetupPluginContext context)
+  public void teardown()
   {
     for (DAGSetupPlugin plugin : plugins) {
-      switch (type) {
-        case SETUP:
-          plugin.setup(context);
-          break;
-        case PRE_POPULATE:
-          plugin.prePopulateDAG();
-          break;
-        case POST_POPULATE:
-          plugin.postPopulateDAG();
-          break;
-        case PRE_CONFIGURE:
-          plugin.preConfigureDAG();
-          break;
-        case POST_CONFIGURE:
-          plugin.postValidateDAG();
-          break;
-        case PRE_VALIDATE:
-          plugin.preValidateDAG();
-          break;
-        case POST_VALIDATE:
-          plugin.postValidateDAG();
-          break;
-        case TEARDOWN:
-          plugin.teardown();
-          break;
-        default:
-          throw new UnsupportedOperationException("Not implemented ");
+      plugin.teardown();
+    }
+  }
+
+  public void dispatch(DAGSetupEvent event)
+  {
+    for (EventHandler<DAGSetupEvent> handler : table.row(event.getType()).values()) {
+      try {
+        handler.handle(event);
+      } catch (RuntimeException e) {
+        LOG.warn("Event {} caused an exception in {} handler", event, handler, e);
       }
     }
   }
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index a7fad2a..01a4c7b 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -48,7 +48,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.api.plugin.DAGSetupPlugin.DAGSetupPluginContext;
 import org.apache.commons.beanutils.BeanMap;
 import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.collections.CollectionUtils;
@@ -86,14 +85,12 @@
 import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
 import com.datatorrent.stram.util.ObjectMapperFactory;
 
-import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_CONFIGURE;
-import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_POPULATE;
-import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_VALIDATE;
-import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_CONFIGURE;
-import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_POPULATE;
-import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_VALIDATE;
-import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.SETUP;
-import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.TEARDOWN;
+import static org.apache.apex.api.plugin.DAGSetupEvent.Type.POST_CONFIGURE_DAG;
+import static org.apache.apex.api.plugin.DAGSetupEvent.Type.POST_POPULATE_DAG;
+import static org.apache.apex.api.plugin.DAGSetupEvent.Type.POST_VALIDATE_DAG;
+import static org.apache.apex.api.plugin.DAGSetupEvent.Type.PRE_CONFIGURE_DAG;
+import static org.apache.apex.api.plugin.DAGSetupEvent.Type.PRE_POPULATE_DAG;
+import static org.apache.apex.api.plugin.DAGSetupEvent.Type.PRE_VALIDATE_DAG;
 
 /**
  *
@@ -2076,19 +2073,18 @@
   private LogicalPlan populateDAGAndValidate(LogicalPlanConfiguration tb, String appName)
   {
     LogicalPlan dag = new LogicalPlan();
-    DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf);
-    pluginManager.dispatch(SETUP, context);
-    pluginManager.dispatch(PRE_POPULATE, context);
+    pluginManager.setup(dag);
+    pluginManager.dispatch(PRE_POPULATE_DAG.event);
     tb.populateDAG(dag);
     // configure with embedded settings
     tb.prepareDAG(dag, null, appName);
-    pluginManager.dispatch(POST_POPULATE, context);
+    pluginManager.dispatch(POST_POPULATE_DAG.event);
     // configure with external settings
     prepareDAG(dag, null, appName);
-    pluginManager.dispatch(PRE_VALIDATE, context);
+    pluginManager.dispatch(PRE_VALIDATE_DAG.event);
     dag.validate();
-    pluginManager.dispatch(POST_VALIDATE, context);
-    pluginManager.dispatch(TEARDOWN, context);
+    pluginManager.dispatch(POST_VALIDATE_DAG.event);
+    pluginManager.teardown();
     return dag;
   }
 
@@ -2118,13 +2114,12 @@
   public LogicalPlan createFromStreamingApplication(StreamingApplication app, String appName)
   {
     LogicalPlan dag = new LogicalPlan();
-    DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf);
-    pluginManager.dispatch(SETUP, context);
+    pluginManager.setup(dag);
     prepareDAG(dag, app, appName);
-    pluginManager.dispatch(PRE_VALIDATE, context);
+    pluginManager.dispatch(PRE_VALIDATE_DAG.event);
     dag.validate();
-    pluginManager.dispatch(POST_VALIDATE, context);
-    pluginManager.dispatch(TEARDOWN, context);
+    pluginManager.dispatch(POST_VALIDATE_DAG.event);
+    pluginManager.teardown();
     return dag;
   }
 
@@ -2256,14 +2251,13 @@
     // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used
     String connectAddress = conf.get(KEY_GATEWAY_CONNECT_ADDRESS);
     dag.setAttribute(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null ? conf.get(GATEWAY_LISTEN_ADDRESS) : connectAddress);
-    DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf);
+    pluginManager.setup(dag);
     if (app != null) {
-      pluginManager.dispatch(SETUP, context);
-      pluginManager.dispatch(PRE_POPULATE, context);
+      pluginManager.dispatch(PRE_POPULATE_DAG.event);
       app.populateDAG(dag, conf);
-      pluginManager.dispatch(POST_POPULATE, context);
+      pluginManager.dispatch(POST_POPULATE_DAG.event);
     }
-    pluginManager.dispatch(PRE_CONFIGURE, context);
+    pluginManager.dispatch(PRE_CONFIGURE_DAG.event);
     String appAlias = getAppAlias(name);
     String appName = appAlias == null ? name : appAlias;
     List<AppConf> appConfs = stramConf.getMatchingChildConf(appName, StramElement.APPLICATION);
@@ -2279,7 +2273,8 @@
     // inject external operator configuration
     setOperatorConfiguration(dag, appConfs, appName);
     setStreamConfiguration(dag, appConfs, appName);
-    pluginManager.dispatch(POST_CONFIGURE, context);
+    pluginManager.dispatch(POST_CONFIGURE_DAG.event);
+    pluginManager.teardown();
   }
 
   private void flattenDAG(LogicalPlan dag, Configuration conf)
diff --git a/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionEvent.java b/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionEvent.java
new file mode 100644
index 0000000..dfffbca
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionEvent.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.api.plugin;
+
+import org.apache.apex.api.plugin.Event;
+
+import com.datatorrent.stram.api.StramEvent;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
+
+import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.COMMIT_EVENT;
+import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.HEARTBEAT_EVENT;
+import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.STRAM_EVENT;
+
+public class DAGExecutionEvent extends Event.BaseEvent<DAGExecutionEvent.Type>
+{
+  public enum Type implements Event.Type
+  {
+    HEARTBEAT_EVENT, STRAM_EVENT, COMMIT_EVENT
+  }
+
+  public static class HeartbeatExecutionEvent extends DAGExecutionEvent
+  {
+    private final StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat;
+
+    public HeartbeatExecutionEvent(StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat)
+    {
+      super(HEARTBEAT_EVENT);
+      this.heartbeat = heartbeat;
+    }
+
+    public StreamingContainerUmbilicalProtocol.ContainerHeartbeat getHeartbeat()
+    {
+      return heartbeat;
+    }
+  }
+
+  public static class StramExecutionEvent extends DAGExecutionEvent
+  {
+    private final StramEvent stramEvent;
+
+    public StramExecutionEvent(StramEvent stramEvent)
+    {
+      super(STRAM_EVENT);
+      this.stramEvent = stramEvent;
+    }
+
+    public StramEvent getStramEvent()
+    {
+      return stramEvent;
+    }
+  }
+
+  public static class CommitExecutionEvent extends DAGExecutionEvent
+  {
+    private final long commitWindow;
+
+    public CommitExecutionEvent(long commitWindow)
+    {
+      super(COMMIT_EVENT);
+      this.commitWindow = commitWindow;
+    }
+
+    public long getCommitWindow()
+    {
+      return commitWindow;
+    }
+  }
+
+  protected DAGExecutionEvent(DAGExecutionEvent.Type eventType)
+  {
+    super(eventType);
+  }
+}
diff --git a/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPlugin.java b/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPlugin.java
index 060b240..7cfb1fa 100644
--- a/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPlugin.java
+++ b/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPlugin.java
@@ -18,26 +18,66 @@
  */
 package org.apache.apex.engine.api.plugin;
 
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
 import org.apache.apex.api.plugin.Plugin;
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StatsListener.BatchedOperatorStats;
+import com.datatorrent.common.util.Pair;
+import com.datatorrent.stram.StramAppContext;
+import com.datatorrent.stram.util.VersionInfo;
+import com.datatorrent.stram.webapp.AppInfo;
+import com.datatorrent.stram.webapp.LogicalOperatorInfo;
 
 /**
- * An Apex plugin is a user code which runs inside Stram. The interaction
- * between plugin and Stram is managed by DAGExecutionPluginContext. Plugin can register to handle event in interest
- * with callback handler using ${@link DAGExecutionPluginContext#register(DAGExecutionPluginContext.RegistrationType, DAGExecutionPluginContext.Handler)}
+ * DAGExecutionPlugin allows user provided code to respond to various events during the application runtime.
  *
  * Following events are supported
  * <ul>
- *   <li>{@see DAGExecutionPluginContext.HEARTBEAT} The heartbeat from a container is delivered to the plugin after it has been handled by stram</li>
- *   <li>{@see DAGExecutionPluginContext.STRAM_EVENT} All the Stram event generated in Stram will be delivered to the plugin</li>
- *   <li>{@see DAGExecutionPluginContext.COMMIT_EVENT} When committedWindowId changes in the platform an event will be delivered to the plugin</li>
+ *   <li>{@see Context.HEARTBEAT} The heartbeat from a container is delivered to the plugin after it has been handled by stram</li>
+ *   <li>{@see Context.STRAM_EVENT} All the Stram event generated in Stram will be delivered to the plugin</li>
+ *   <li>{@see Context.COMMIT_EVENT} When committedWindowId changes in the platform an event will be delivered to the plugin</li>
  * </ul>
  *
- * A plugin should register a single handler for an event, In case multiple handlers are registered for an event,
- * then the last registered handler will be used. Plugin should cleanup additional resources created by it during shutdown
- * such as helper threads and open files.
  */
-@InterfaceStability.Evolving
-public interface DAGExecutionPlugin extends Plugin<DAGExecutionPluginContext>
+public interface DAGExecutionPlugin<T extends DAGExecutionPlugin.Context> extends Plugin<T>
 {
+
+  /**
+   * The context for the execution plugins.
+   *
+   * Following events are supported
+   * <ul>
+   *   <li>{@see Context.HEARTBEAT} The heartbeat from a container is delivered to the plugin after it has been handled by stram</li>
+   *   <li>{@see Context.STRAM_EVENT} All the Stram event generated in Stram will be delivered to the plugin</li>
+   *   <li>{@see Context.COMMIT_EVENT} When committedWindowId changes in the platform an event will be delivered to the plugin</li>
+   * </ul>
+   *
+   */
+  interface Context<E extends DAGExecutionEvent> extends PluginContext<DAGExecutionEvent.Type, E>
+  {
+    VersionInfo getEngineVersion();
+
+    StramAppContext getApplicationContext();
+
+    AppInfo.AppStats getApplicationStats();
+
+    Configuration getLaunchConfig();
+
+    DAG getDAG();
+
+    String getOperatorName(int id);
+
+    BatchedOperatorStats getPhysicalOperatorStats(int id);
+
+    List<LogicalOperatorInfo> getLogicalOperatorInfoList();
+
+    Queue<Pair<Long, Map<String, Object>>> getWindowMetrics(String operatorName);
+
+    long windowIdToMillis(long windowId);
+  }
 }
diff --git a/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPluginContext.java b/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPluginContext.java
deleted file mode 100644
index 73da7e6..0000000
--- a/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPluginContext.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.engine.api.plugin;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-
-import org.apache.apex.api.plugin.PluginContext;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StatsListener.BatchedOperatorStats;
-import com.datatorrent.common.util.Pair;
-import com.datatorrent.stram.StramAppContext;
-import com.datatorrent.stram.api.StramEvent;
-import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
-import com.datatorrent.stram.util.VersionInfo;
-import com.datatorrent.stram.webapp.AppInfo;
-import com.datatorrent.stram.webapp.LogicalOperatorInfo;
-
-/**
- * An Apex plugin is a user code which runs inside Stram. The interaction
- * between plugin and Stram is managed by DAGExecutionPluginContext. Plugin can register to handle event in interest
- * with callback handler using ${@link DAGExecutionPluginContext#register(DAGExecutionPluginContext.RegistrationType, DAGExecutionPluginContext.Handler)}
- *
- * Following events are supported
- * <ul>
- *   <li>{@see DAGExecutionPluginContext.HEARTBEAT} The heartbeat from a container is delivered to the plugin after it has been handled by stram</li>
- *   <li>{@see DAGExecutionPluginContext.STRAM_EVENT} All the Stram event generated in Stram will be delivered to the plugin</li>
- *   <li>{@see DAGExecutionPluginContext.COMMIT_EVENT} When committedWindowId changes in the platform an event will be delivered to the plugin</li>
- * </ul>
- *
- * A plugin should register a single handler for an event, In case multiple handlers are registered for an event,
- * then the last registered handler will be used. Plugin should cleanup additional resources created by it during shutdown
- * such as helper threads and open files.
- */
-@InterfaceStability.Evolving
-public interface DAGExecutionPluginContext extends PluginContext
-{
-  class RegistrationType<T>
-  {
-  }
-
-  RegistrationType<StreamingContainerUmbilicalProtocol.ContainerHeartbeat> HEARTBEAT = new RegistrationType<>();
-  RegistrationType<StramEvent> STRAM_EVENT = new RegistrationType<>();
-  RegistrationType<Long> COMMIT_EVENT = new RegistrationType<>();
-
-  <T> void register(RegistrationType<T> type, Handler<T> handler);
-
-  interface Handler<T>
-  {
-    void handle(T data);
-  }
-
-  VersionInfo getEngineVersion();
-
-  StramAppContext getApplicationContext();
-
-  AppInfo.AppStats getApplicationStats();
-
-  Configuration getLaunchConfig();
-
-  DAG getDAG();
-
-  String getOperatorName(int id);
-
-  BatchedOperatorStats getPhysicalOperatorStats(int id);
-
-  List<LogicalOperatorInfo> getLogicalOperatorInfoList();
-
-  Queue<Pair<Long, Map<String, Object>>> getWindowMetrics(String operatorName);
-
-  long windowIdToMillis(long windowId);
-}
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
index 2b96632..74ee0c8 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
@@ -22,15 +22,15 @@
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.api.plugin.Event;
+import org.apache.apex.api.plugin.Plugin;
+import org.apache.apex.api.plugin.Plugin.EventHandler;
+import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
 import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.Handler;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType;
 import org.apache.apex.engine.api.plugin.PluginLocator;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -40,7 +40,9 @@
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
+import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Table;
 
 import com.datatorrent.api.DAG;
 import com.datatorrent.stram.StramAppContext;
@@ -62,7 +64,7 @@
   private final AppInfo.AppStats stats;
   protected Configuration launchConfig;
   protected FileContext fileContext;
-  protected final Map<DAGExecutionPlugin, PluginInfo> pluginInfoMap = new HashMap<>();
+  protected final Table<DAGExecutionEvent.Type, DAGExecutionPlugin, EventHandler<DAGExecutionEvent>> table = HashBasedTable.create();
   private volatile DAG clonedDAG = null;
 
   public AbstractApexPluginDispatcher(PluginLocator locator, StramAppContext context, StreamingContainerManager dmgr, AppInfo.AppStats stats)
@@ -123,57 +125,18 @@
     super.serviceStop();
   }
 
-  /**
-   * Keeps information about plugin and its registrations. Dispatcher use this
-   * information while delivering events to plugin.
-   */
-  protected class PluginInfo
+  public void register(DAGExecutionEvent.Type eventType, Plugin.EventHandler<DAGExecutionEvent> handler, DAGExecutionPlugin owner)
   {
-    private final DAGExecutionPlugin plugin;
-    private final Map<RegistrationType<?>, Handler<?>> registrationMap = new HashMap<>();
-
-    <T> void put(RegistrationType<T> registrationType, Handler<T> handler)
-    {
-      registrationMap.put(registrationType, handler);
+    synchronized (table) {
+      table.put(eventType, owner, handler);
     }
-
-    <T> Handler<T> get(RegistrationType<T> registrationType)
-    {
-      return (Handler<T>)registrationMap.get(registrationType);
-    }
-
-    public PluginInfo(DAGExecutionPlugin plugin)
-    {
-      this.plugin = plugin;
-    }
-
-    public DAGExecutionPlugin getPlugin()
-    {
-      return plugin;
-    }
-  }
-
-  PluginInfo getPluginInfo(DAGExecutionPlugin plugin)
-  {
-    PluginInfo pInfo = pluginInfoMap.get(plugin);
-    if (pInfo == null) {
-      pInfo = new PluginInfo(plugin);
-      pluginInfoMap.put(plugin, pInfo);
-    }
-    return pInfo;
-  }
-
-  private <T> void register(RegistrationType<T> type, Handler<T> handler, DAGExecutionPlugin owner)
-  {
-    PluginInfo pInfo = getPluginInfo(owner);
-    pInfo.put(type, handler);
   }
 
   /**
    * A wrapper PluginManager to track registration from a plugin. with this plugin
    * don't need to pass explicit owner argument during registration.
    */
-  private class PluginManagerImpl extends AbstractDAGExecutionPluginContext
+  private class PluginManagerImpl extends AbstractDAGExecutionPluginContext<DAGExecutionEvent>
   {
     private final DAGExecutionPlugin owner;
 
@@ -184,7 +147,7 @@
     }
 
     @Override
-    public <T> void register(RegistrationType<T> type, Handler<T> handler)
+    public void register(DAGExecutionEvent.Type type, EventHandler<DAGExecutionEvent> handler)
     {
       AbstractApexPluginDispatcher.this.register(type, handler, owner);
     }
@@ -198,19 +161,19 @@
 
   /**
    * Dispatch events to plugins.
-   * @param registrationType
-   * @param data
-   * @param <T>
+   * @param event The dag execution event
    */
-  protected abstract <T> void dispatchEvent(RegistrationType<T> registrationType, T data);
+  protected abstract void dispatchExecutionEvent(DAGExecutionEvent event);
 
   @Override
-  public <T> void dispatch(RegistrationType<T> registrationType, T data)
+  public void dispatch(Event event)
   {
-    if (registrationType == ApexPluginDispatcher.DAG_CHANGE_EVENT) {
-      clonedDAG = SerializationUtils.clone((DAG)data);
-    } else {
-      dispatchEvent(registrationType, data);
+    if (!plugins.isEmpty()) {
+      if (event.getType() == ApexPluginDispatcher.DAG_CHANGE) {
+        clonedDAG = SerializationUtils.clone(((DAGChangeEvent)event).dag);
+      } else if (event instanceof DAGExecutionEvent) {
+        dispatchExecutionEvent((DAGExecutionEvent)event);
+      }
     }
   }
 }
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java
index b17d5f8..21f29f8 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java
@@ -23,7 +23,8 @@
 import java.util.Map;
 import java.util.Queue;
 
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext;
+import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
+import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.Attribute;
@@ -37,7 +38,7 @@
 import com.datatorrent.stram.webapp.AppInfo;
 import com.datatorrent.stram.webapp.LogicalOperatorInfo;
 
-public abstract class AbstractDAGExecutionPluginContext implements DAGExecutionPluginContext
+public abstract class AbstractDAGExecutionPluginContext<E extends DAGExecutionEvent> implements DAGExecutionPlugin.Context<E>
 {
   private final StreamingContainerManager dnmgr;
   private final Configuration launchConf;
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java
index 234195f..9ef2a5d 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java
@@ -18,7 +18,7 @@
  */
 package org.apache.apex.engine.plugin;
 
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType;
+import org.apache.apex.api.plugin.Event;
 import org.apache.hadoop.service.Service;
 
 import com.datatorrent.api.DAG;
@@ -29,7 +29,18 @@
   /**
    * This is internal event, which is not delivered to the plugins.
    */
-  RegistrationType<DAG> DAG_CHANGE_EVENT = new RegistrationType<>();
+  Event.Type DAG_CHANGE = new Event.Type(){};
 
-  <T> void dispatch(RegistrationType<T> registrationType, T data);
+  class DAGChangeEvent extends Event.BaseEvent<Event.Type>
+  {
+    final DAG dag;
+
+    public DAGChangeEvent(DAG dag)
+    {
+      super(DAG_CHANGE);
+      this.dag = dag;
+    }
+  }
+
+  void dispatch(Event e);
 }
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java
index bea011c..1252061 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java
@@ -28,8 +28,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.Handler;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType;
+import org.apache.apex.api.plugin.Event;
+import org.apache.apex.api.plugin.Plugin.EventHandler;
+import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
 import org.apache.apex.engine.api.plugin.PluginLocator;
 import org.apache.hadoop.conf.Configuration;
 
@@ -56,10 +57,10 @@
   }
 
   @Override
-  protected <T> void dispatchEvent(RegistrationType<T> registrationType, T data)
+  protected void dispatchExecutionEvent(DAGExecutionEvent event)
   {
     if (executorService != null) {
-      executorService.submit(new ProcessEventTask<>(registrationType, data));
+      executorService.submit(new ProcessEventTask<>(event));
     }
   }
 
@@ -98,24 +99,25 @@
     executorService = null;
   }
 
-  private class ProcessEventTask<T> implements Runnable
+  private class ProcessEventTask<T extends DAGExecutionEvent.Type> implements Runnable
   {
-    private final RegistrationType<T> registrationType;
-    private final T data;
+    private final Event<T> event;
 
-    public ProcessEventTask(RegistrationType<T> type, T data)
+    public ProcessEventTask(Event<T> event)
     {
-      this.registrationType = type;
-      this.data = data;
+      this.event = event;
     }
 
     @Override
     public void run()
     {
-      for (final PluginInfo pInfo : pluginInfoMap.values()) {
-        final Handler<T> handler = pInfo.get(registrationType);
-        if (handler != null) {
-          handler.handle(data);
+      synchronized (table) {
+        for (EventHandler handler : table.row(event.getType()).values()) {
+          try {
+            handler.handle(event);
+          } catch (RuntimeException e) {
+            LOG.warn("Event {} caused exception in handler {}", event, handler, e);
+          }
         }
       }
     }
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java
index f3f3382..a629a3f 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java
@@ -18,7 +18,7 @@
  */
 package org.apache.apex.engine.plugin;
 
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType;
+import org.apache.apex.api.plugin.Event;
 import org.apache.hadoop.service.AbstractService;
 
 public class NoOpApexPluginDispatcher extends AbstractService implements ApexPluginDispatcher
@@ -29,7 +29,7 @@
   }
 
   @Override
-  public <T> void dispatch(RegistrationType<T> registrationType, T data)
+  public void dispatch(Event event)
   {
 
   }
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java
index 4c8b4e5..7e38ee8 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java
@@ -26,24 +26,27 @@
 import javax.validation.ValidationException;
 
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import org.apache.apex.api.plugin.DAGSetupEvent;
 import org.apache.apex.api.plugin.DAGSetupPlugin;
+import org.apache.apex.api.plugin.Plugin.EventHandler;
 
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.Operator;
 
-import static org.slf4j.LoggerFactory.getLogger;
+import static org.apache.apex.api.plugin.DAGSetupEvent.Type.PRE_VALIDATE_DAG;
 
-public class PropertyInjectorVisitor implements DAGSetupPlugin
+public class PropertyInjectorVisitor implements DAGSetupPlugin<DAGSetupPlugin.Context>, EventHandler<DAGSetupEvent>
 {
-  private static final Logger LOG = getLogger(PropertyInjectorVisitor.class);
+  private static final Logger LOG = LoggerFactory.getLogger(PropertyInjectorVisitor.class);
 
   private String path;
   private Map<String, String> propertyMap = new HashMap<>();
   private DAG dag;
 
   @Override
-  public void setup(DAGSetupPluginContext context)
+  public void setup(DAGSetupPlugin.Context context)
   {
     this.dag = context.getDAG();
     try {
@@ -56,34 +59,11 @@
     } catch (IOException ex) {
       throw new ValidationException("Not able to load input file " + path);
     }
+    context.register(PRE_VALIDATE_DAG, this);
   }
 
   @Override
-  public void prePopulateDAG()
-  {
-
-  }
-
-  @Override
-  public void postPopulateDAG()
-  {
-
-  }
-
-  @Override
-  public void preConfigureDAG()
-  {
-
-  }
-
-  @Override
-  public void postConfigureDAG()
-  {
-
-  }
-
-  @Override
-  public void preValidateDAG()
+  public void handle(DAGSetupEvent event)
   {
     for (DAG.OperatorMeta ometa : dag.getAllOperatorsMeta()) {
       Operator o = ometa.getOperator();
@@ -91,12 +71,6 @@
     }
   }
 
-  @Override
-  public void postValidateDAG()
-  {
-
-  }
-
   public PropertyInjectorVisitor()
   {
   }
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
index 5b8ca11..833d69f 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
@@ -21,52 +21,56 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
 import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.Handler;
 
-import com.datatorrent.stram.api.StramEvent;
-import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
+import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.COMMIT_EVENT;
+import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.HEARTBEAT_EVENT;
+import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.STRAM_EVENT;
 
-import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.COMMIT_EVENT;
-import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.HEARTBEAT;
-import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.STRAM_EVENT;
-
-public class DebugPlugin implements DAGExecutionPlugin
+public class DebugPlugin implements DAGExecutionPlugin<DAGExecutionPlugin.Context>
 {
+  private static final Logger logger = LoggerFactory.getLogger(DebugPlugin.class);
+
   private int eventCount = 0;
   private int heartbeatCount = 0;
   private int commitCount = 0;
   CountDownLatch latch = new CountDownLatch(3);
 
   @Override
-  public void setup(DAGExecutionPluginContext context)
+  public void setup(DAGExecutionPlugin.Context context)
   {
-    context.register(STRAM_EVENT, new Handler<StramEvent>()
+    context.register(STRAM_EVENT, new EventHandler<DAGExecutionEvent.StramExecutionEvent>()
     {
       @Override
-      public void handle(StramEvent stramEvent)
+      public void handle(DAGExecutionEvent.StramExecutionEvent event)
       {
+        logger.debug("Stram Event {}", event.getStramEvent());
         eventCount++;
         latch.countDown();
       }
     });
 
-    context.register(HEARTBEAT, new Handler<StreamingContainerUmbilicalProtocol.ContainerHeartbeat>()
+    context.register(HEARTBEAT_EVENT, new EventHandler<DAGExecutionEvent.HeartbeatExecutionEvent>()
     {
       @Override
-      public void handle(StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat)
+      public void handle(DAGExecutionEvent.HeartbeatExecutionEvent event)
       {
+        logger.debug("Heartbeat {}", event.getHeartbeat());
         heartbeatCount++;
         latch.countDown();
       }
     });
 
-    context.register(COMMIT_EVENT, new Handler<Long>()
+    context.register(COMMIT_EVENT, new EventHandler<DAGExecutionEvent.CommitExecutionEvent>()
     {
       @Override
-      public void handle(Long aLong)
+      public void handle(DAGExecutionEvent.CommitExecutionEvent event)
       {
+        logger.debug("Commit window id {}", event.getCommitWindow());
         commitCount++;
         latch.countDown();
       }
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java
index 786e0d6..f2563c2 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java
@@ -22,14 +22,13 @@
 import org.slf4j.LoggerFactory;
 
 import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext;
 
-public class NoOpPlugin implements DAGExecutionPlugin
+public class NoOpPlugin implements DAGExecutionPlugin<DAGExecutionPlugin.Context>
 {
   private static final Logger LOG = LoggerFactory.getLogger(NoOpPlugin.class);
 
   @Override
-  public void setup(DAGExecutionPluginContext context)
+  public void setup(DAGExecutionPlugin.Context context)
   {
     LOG.info("NoOpPlugin plugin called init ");
   }
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
index 84dc4ba..140dc65 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
@@ -23,6 +23,7 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
 import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
 import org.apache.apex.engine.plugin.loaders.ChainedPluginLocator;
 import org.apache.apex.engine.plugin.loaders.ServiceLoaderBasedPluginLocator;
@@ -34,10 +35,6 @@
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
 import com.datatorrent.stram.support.StramTestSupport;
 
-import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.COMMIT_EVENT;
-import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.HEARTBEAT;
-import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.STRAM_EVENT;
-
 public class PluginTests
 {
 
@@ -86,16 +83,16 @@
     ApexPluginDispatcher pluginManager = new DefaultApexPluginDispatcher(locator,
         new StramTestSupport.TestAppContext(new Attribute.AttributeMap.DefaultAttributeMap()), null, null);
     pluginManager.init(new Configuration());
-    pluginManager.dispatch(STRAM_EVENT, new StramEvent(StramEvent.LogLevel.DEBUG)
+    pluginManager.dispatch(new DAGExecutionEvent.StramExecutionEvent(new StramEvent(StramEvent.LogLevel.DEBUG)
     {
       @Override
       public String getType()
       {
         return "TestEvent";
       }
-    });
-    pluginManager.dispatch(COMMIT_EVENT, new Long(1234));
-    pluginManager.dispatch(HEARTBEAT, new StreamingContainerUmbilicalProtocol.ContainerHeartbeat());
+    }));
+    pluginManager.dispatch(new DAGExecutionEvent.CommitExecutionEvent(1234));
+    pluginManager.dispatch(new DAGExecutionEvent.HeartbeatExecutionEvent(new StreamingContainerUmbilicalProtocol.ContainerHeartbeat()));
     debugPlugin.waitForEventDelivery(10);
     pluginManager.stop();