APEXCORE-700 Uniform interface between setup and runtime plugins
diff --git a/api/src/main/java/org/apache/apex/api/plugin/DAGSetupEvent.java b/api/src/main/java/org/apache/apex/api/plugin/DAGSetupEvent.java
new file mode 100644
index 0000000..95d17e2
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/plugin/DAGSetupEvent.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api.plugin;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+
+@Evolving
+public class DAGSetupEvent extends Event.BaseEvent<DAGSetupEvent.Type>
+{
+ @Evolving
+ public enum Type implements Event.Type
+ {
+ /**
+ * This event is sent before platform adds operators and streams in the DAG. i.e this method
+ * will get called just before {@link com.datatorrent.api.StreamingApplication#populateDAG(DAG, Configuration)}
+ *
+ * For Application specified using property and json file format, this will be sent
+ * before platform adds operators and streams in the DAG as per specification in the file.
+ */
+ PRE_POPULATE_DAG,
+
+ /**
+ * This event is sent after platform adds operators and streams in the DAG. i.e this method
+ * will get called just after {@link com.datatorrent.api.StreamingApplication#populateDAG(DAG, Configuration)}
+ * in case application is specified in java.
+ *
+ * For Application specified using property and json file format, this will be sent
+ * after platform has added operators and streams in the DAG as per specification in the file.
+ */
+ POST_POPULATE_DAG,
+
+ /**
+ * This event is sent before DAG is configured, i.e operator and application
+ * properties/attributes are injected from configuration files.
+ */
+ PRE_CONFIGURE_DAG,
+
+ /**
+ * This event is sent after DAG is configured, i.e operator and application
+ * properties/attributes are injected from configuration files.
+ */
+ POST_CONFIGURE_DAG,
+
+ /**
+ * This event is sent just before dag is validated before final job submission.
+ */
+ PRE_VALIDATE_DAG,
+
+ /**
+ * This event is sent after dag is validated. If plugin makes in incompatible changes
+ * to the DAG at this stage, then application may get launched incorrectly or application
+ * launch may fail.
+ */
+ POST_VALIDATE_DAG;
+
+ public final DAGSetupEvent event;
+
+ Type()
+ {
+ event = new DAGSetupEvent(this);
+ }
+ }
+
+ private DAGSetupEvent(DAGSetupEvent.Type type)
+ {
+ super(type);
+ }
+}
diff --git a/api/src/main/java/org/apache/apex/api/plugin/DAGSetupPlugin.java b/api/src/main/java/org/apache/apex/api/plugin/DAGSetupPlugin.java
index faa6798..31ea1f3 100644
--- a/api/src/main/java/org/apache/apex/api/plugin/DAGSetupPlugin.java
+++ b/api/src/main/java/org/apache/apex/api/plugin/DAGSetupPlugin.java
@@ -18,12 +18,9 @@
*/
package org.apache.apex.api.plugin;
-import java.util.Collection;
-
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.api.Attribute;
import com.datatorrent.api.DAG;
/**
@@ -39,96 +36,18 @@
* <li>After dag is validated</li>
* </ul>
*/
-@InterfaceStability.Evolving
-public interface DAGSetupPlugin extends Plugin<DAGSetupPlugin.DAGSetupPluginContext>
+@Evolving
+public interface DAGSetupPlugin<T extends DAGSetupPlugin.Context> extends Plugin<T>
{
-
/**
- * This method is called before platform adds operators and streams in the DAG. i.e this method
- * will get called just before {@link com.datatorrent.api.StreamingApplication#populateDAG(DAG, Configuration)}
- *
- * For Application specified using property and json file format, this method will get called
- * before platform adds operators and streams in the DAG as per specification in the file.
+ * The context for the setup plugins
*/
- void prePopulateDAG();
-
- /**
- * This method is called after platform adds operators and streams in the DAG. i.e this method
- * will get called just after {@link com.datatorrent.api.StreamingApplication#populateDAG(DAG, Configuration)}
- * in case application is specified in java.
- *
- * For Application specified using property and json file format, this method will get called
- * after platform has added operators and streams in the DAG as per specification in the file.
- */
- void postPopulateDAG();
-
- /**
- * This is method is called before DAG is configured, i.e operator and application
- * properties/attributes are injected from configuration files.
- */
- void preConfigureDAG();
-
- /**
- * This is method is called after DAG is configured, i.e operator and application
- * properties/attributes are injected from configuration files.
- */
- void postConfigureDAG();
-
- /**
- * This method is called just before dag is validated before final job submission.
- */
- void preValidateDAG();
-
- /**
- * This method is called after dag is validated. If plugin makes in incompatible changes
- * to the DAG at this stage, then application may get launched incorrectly or application
- * launch may fail.
- */
- void postValidateDAG();
-
- class DAGSetupPluginContext implements PluginContext
+ @Evolving
+ interface Context<E extends DAGSetupEvent> extends PluginContext<DAGSetupEvent.Type, E>
{
- private final DAG dag;
- private final Configuration conf;
- public DAGSetupPluginContext(DAG dag, Configuration conf)
- {
- this.dag = dag;
- this.conf = conf;
- }
+ DAG getDAG();
- public DAG getDAG()
- {
- return dag;
- }
-
- public Configuration getConfiguration()
- {
- return conf;
- }
-
- @Override
- public Attribute.AttributeMap getAttributes()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public <T> T getValue(Attribute<T> key)
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public void setCounters(Object counters)
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public void sendMetrics(Collection<String> metricNames)
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
+ Configuration getConfiguration();
}
}
diff --git a/api/src/main/java/org/apache/apex/api/plugin/Event.java b/api/src/main/java/org/apache/apex/api/plugin/Event.java
new file mode 100644
index 0000000..9b95187
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/plugin/Event.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api.plugin;
+
+import org.apache.apex.api.plugin.Event.Type;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * The class represents a plugin event that is delivered to plugins to notify them of important system events.
+ *
+ * Plugins express interest in receiving events by registering handlers for the event type and their handlers receive
+ * the events.
+ * @param <T> event type
+ */
+@Evolving
+public interface Event<T extends Type>
+{
+ /**
+ * Marker interface for plugin event type.
+ */
+ @Evolving
+ interface Type
+ {
+ }
+
+ T getType();
+
+ @Evolving
+ class BaseEvent<T extends Type> implements Event<T>
+ {
+ private T type;
+
+ protected BaseEvent(T type)
+ {
+ this.type = type;
+ }
+
+ public T getType()
+ {
+ return type;
+ }
+ }
+}
diff --git a/api/src/main/java/org/apache/apex/api/plugin/Plugin.java b/api/src/main/java/org/apache/apex/api/plugin/Plugin.java
index ffe52ea..e0a3872 100644
--- a/api/src/main/java/org/apache/apex/api/plugin/Plugin.java
+++ b/api/src/main/java/org/apache/apex/api/plugin/Plugin.java
@@ -18,15 +18,67 @@
*/
package org.apache.apex.api.plugin;
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
/**
- * Marker interface for ApexPlugins.
- * @param <T>
+ * An Apex plugin is user code which runs inside the Apex engine. Plugin implementations implement this interface.
+ *
+ * Plugins can identify extension points by registering interest in events in the {@link Component#setup(Context)}
+ * initialization method. They should also cleanup any additional resources created during shutdown such as helper
+ * threads and open files in the {@link Component#teardown()} method.
+ * @param <T> plugin context type
*/
-@InterfaceStability.Evolving
-public interface Plugin<T extends PluginContext> extends Component<T>
+@Evolving
+public interface Plugin<T extends Plugin.PluginContext> extends Component<T>
{
+
+ /**
+ * An Apex plugin is user code which runs inside the Apex engine. The interaction between plugin and engine is managed
+ * by PluginContext. Plugins can register interest in different events in the engine using the
+ * ${@link PluginContext#register(Event.Type, EventHandler)} method.
+ *
+ * @param <T> the type of the Event.Type
+ * @param <E> the event type
+ */
+ @Evolving
+ interface PluginContext<T extends Event.Type, E extends Event<T>> extends Context
+ {
+
+ /**
+ * Register interest in an event.
+ *
+ * Plugins register interest in events using this method. They would need to specify the event type and a handler to
+ * handle the event, that would get called when the event occurs. A plugin can register interest in several events but
+ * should register only a single handler for any specific event. In case register is called multiple times with the
+ * same event type, then the last registered handler will be used.
+ *
+ * When an event occurs the
+ * {@link EventHandler#handle(Event event)} method gets called with the event data.
+ *
+ * @param type The event type
+ * @param handler The event handler
+ */
+ void register(T type, EventHandler<E> handler);
+ }
+
+ /**
+ * A handler that handles an event in the Apex engine. Plugins register interest in events by registering handlers
+ * using the PluginContext.
+ * @param <E> The event type
+ */
+ @Evolving
+ interface EventHandler<E extends Event>
+ {
+ /**
+ * Handle a event.
+ *
+ * This method is called when the event occurs.
+ *
+ * @param event
+ */
+ void handle(E event);
+ }
}
diff --git a/api/src/main/java/org/apache/apex/api/plugin/PluginContext.java b/api/src/main/java/org/apache/apex/api/plugin/PluginContext.java
deleted file mode 100644
index 2bdaf00..0000000
--- a/api/src/main/java/org/apache/apex/api/plugin/PluginContext.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.api.plugin;
-
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.api.Context;
-
-/**
- * Marker interface for Context used by ApexPlugins. Plugin interfaces with
- * the Apex through the context.
- */
-@InterfaceStability.Evolving
-public interface PluginContext extends Context
-{
-}
diff --git a/common/src/main/java/org/apache/apex/common/util/BaseDAGSetupPlugin.java b/common/src/main/java/org/apache/apex/common/util/BaseDAGSetupPlugin.java
deleted file mode 100644
index 9bc5d8e..0000000
--- a/common/src/main/java/org/apache/apex/common/util/BaseDAGSetupPlugin.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.common.util;
-
-import org.apache.apex.api.plugin.DAGSetupPlugin;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * Base class for DAGSetupPlugin implementations that provides empty implementations
- * for all interface methods.
- */
-@InterfaceStability.Evolving
-public class BaseDAGSetupPlugin implements DAGSetupPlugin
-{
- @Override
- public void setup(DAGSetupPluginContext context)
- {
-
- }
-
- @Override
- public void prePopulateDAG()
- {
-
- }
-
- @Override
- public void teardown()
- {
-
- }
-
- @Override
- public void postPopulateDAG()
- {
-
- }
-
- @Override
- public void preConfigureDAG()
- {
-
- }
-
- @Override
- public void postConfigureDAG()
- {
-
- }
-
- @Override
- public void preValidateDAG()
- {
-
- }
-
- @Override
- public void postValidateDAG()
- {
-
- }
-}
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 2e88114..0ca8cd1 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -115,7 +115,6 @@
import com.datatorrent.stram.webapp.StramWebApp;
import static java.lang.Thread.sleep;
-import static org.apache.apex.engine.plugin.ApexPluginDispatcher.DAG_CHANGE_EVENT;
/**
* Streaming Application Master
@@ -600,7 +599,7 @@
apexPluginDispatcher = new DefaultApexPluginDispatcher(locator, appContext, dnmgr, stats);
dnmgr.apexPluginDispatcher = apexPluginDispatcher;
addService(apexPluginDispatcher);
- apexPluginDispatcher.dispatch(DAG_CHANGE_EVENT, dnmgr.getLogicalPlan());
+ apexPluginDispatcher.dispatch(new ApexPluginDispatcher.DAGChangeEvent(dnmgr.getLogicalPlan()));
}
@Override
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 8d99dc1..c4e76a5 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -65,6 +65,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
import org.apache.apex.engine.plugin.ApexPluginDispatcher;
import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher;
import org.apache.apex.engine.util.CascadeStorageAgent;
@@ -178,11 +179,6 @@
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.config.BusConfiguration;
-import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.COMMIT_EVENT;
-import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.HEARTBEAT;
-import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.STRAM_EVENT;
-import static org.apache.apex.engine.plugin.ApexPluginDispatcher.DAG_CHANGE_EVENT;
-
/**
* Tracks topology provisioning/allocation to containers<p>
* <br>
@@ -818,7 +814,7 @@
committedWindowId = updateCheckpoints(waitForRecovery);
if (lastCommittedWindowId != committedWindowId) {
- apexPluginDispatcher.dispatch(COMMIT_EVENT, committedWindowId);
+ apexPluginDispatcher.dispatch(new DAGExecutionEvent.CommitExecutionEvent(committedWindowId));
lastCommittedWindowId = committedWindowId;
}
calculateEndWindowStats();
@@ -1817,7 +1813,7 @@
rsp.stackTraceRequired = sca.stackTraceRequested;
sca.stackTraceRequested = false;
- apexPluginDispatcher.dispatch(HEARTBEAT, heartbeat);
+ apexPluginDispatcher.dispatch(new DAGExecutionEvent.HeartbeatExecutionEvent(heartbeat));
return rsp;
}
@@ -2449,7 +2445,7 @@
@Override
public void recordEventAsync(StramEvent ev)
{
- apexPluginDispatcher.dispatch(STRAM_EVENT, ev);
+ apexPluginDispatcher.dispatch(new DAGExecutionEvent.StramExecutionEvent(ev));
if (eventBus != null) {
eventBus.publishAsync(ev);
}
@@ -3083,7 +3079,7 @@
recordEventAsync(new StramEvent.ChangeLogicalPlanEvent(request));
}
pm.applyChanges(StreamingContainerManager.this);
- apexPluginDispatcher.dispatch(DAG_CHANGE_EVENT, plan.getLogicalPlan());
+ apexPluginDispatcher.dispatch(new ApexPluginDispatcher.DAGChangeEvent(plan.getLogicalPlan()));
LOG.info("Plan changes applied: {}", requests);
return null;
}
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java
index c7c2767..370aaaa 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java
@@ -19,14 +19,23 @@
package com.datatorrent.stram.plan.logical;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import org.slf4j.Logger;
+import org.apache.apex.api.plugin.DAGSetupEvent;
import org.apache.apex.api.plugin.DAGSetupPlugin;
+import org.apache.apex.api.plugin.Plugin.EventHandler;
import org.apache.apex.engine.plugin.loaders.PropertyBasedPluginLocator;
import org.apache.hadoop.conf.Configuration;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.DAG;
+
import static org.slf4j.LoggerFactory.getLogger;
public class DAGSetupPluginManager
@@ -37,7 +46,8 @@
private Configuration conf;
public static final String DAGSETUP_PLUGINS_CONF_KEY = "apex.plugin.dag.setup";
- private DAGSetupPlugin.DAGSetupPluginContext contex;
+
+ private final Table<DAGSetupEvent.Type, DAGSetupPlugin, EventHandler<DAGSetupEvent>> table = HashBasedTable.create();
private void loadVisitors(Configuration conf)
{
@@ -50,56 +60,82 @@
this.plugins.addAll(locator.discoverPlugins(conf));
}
- public void setup(DAGSetupPlugin.DAGSetupPluginContext context)
+ private class DefaultDAGSetupPluginContext implements DAGSetupPlugin.Context<DAGSetupEvent>
{
- this.contex = context;
+ private final DAG dag;
+ private final Configuration conf;
+ private DAGSetupPlugin plugin;
+
+ public DefaultDAGSetupPluginContext(DAG dag, Configuration conf, DAGSetupPlugin plugin)
+ {
+ this.dag = dag;
+ this.conf = conf;
+ this.plugin = plugin;
+ }
+
+ @Override
+ public void register(DAGSetupEvent.Type type, EventHandler<DAGSetupEvent> handler)
+ {
+ table.put(type, plugin, handler);
+ }
+
+ public DAG getDAG()
+ {
+ return dag;
+ }
+
+ public Configuration getConfiguration()
+ {
+ return conf;
+ }
+
+ @Override
+ public Attribute.AttributeMap getAttributes()
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public <T> T getValue(Attribute<T> key)
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public void setCounters(Object counters)
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public void sendMetrics(Collection<String> metricNames)
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+ }
+
+ public void setup(DAG dag)
+ {
for (DAGSetupPlugin plugin : plugins) {
+ DAGSetupPlugin.Context context = new DefaultDAGSetupPluginContext(dag, conf, plugin);
plugin.setup(context);
}
}
- public enum DispatchType
- {
- SETUP,
- PRE_POPULATE,
- POST_POPULATE,
- PRE_CONFIGURE,
- POST_CONFIGURE,
- PRE_VALIDATE,
- POST_VALIDATE,
- TEARDOWN
- }
-
- public void dispatch(DispatchType type, DAGSetupPlugin.DAGSetupPluginContext context)
+ public void teardown()
{
for (DAGSetupPlugin plugin : plugins) {
- switch (type) {
- case SETUP:
- plugin.setup(context);
- break;
- case PRE_POPULATE:
- plugin.prePopulateDAG();
- break;
- case POST_POPULATE:
- plugin.postPopulateDAG();
- break;
- case PRE_CONFIGURE:
- plugin.preConfigureDAG();
- break;
- case POST_CONFIGURE:
- plugin.postValidateDAG();
- break;
- case PRE_VALIDATE:
- plugin.preValidateDAG();
- break;
- case POST_VALIDATE:
- plugin.postValidateDAG();
- break;
- case TEARDOWN:
- plugin.teardown();
- break;
- default:
- throw new UnsupportedOperationException("Not implemented ");
+ plugin.teardown();
+ }
+ }
+
+ public void dispatch(DAGSetupEvent event)
+ {
+ for (EventHandler<DAGSetupEvent> handler : table.row(event.getType()).values()) {
+ try {
+ handler.handle(event);
+ } catch (RuntimeException e) {
+ LOG.warn("Event {} caused an exception in {} handler", event, handler, e);
}
}
}
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index a7fad2a..01a4c7b 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -48,7 +48,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.apex.api.plugin.DAGSetupPlugin.DAGSetupPluginContext;
import org.apache.commons.beanutils.BeanMap;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections.CollectionUtils;
@@ -86,14 +85,12 @@
import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
import com.datatorrent.stram.util.ObjectMapperFactory;
-import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_CONFIGURE;
-import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_POPULATE;
-import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_VALIDATE;
-import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_CONFIGURE;
-import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_POPULATE;
-import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_VALIDATE;
-import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.SETUP;
-import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.TEARDOWN;
+import static org.apache.apex.api.plugin.DAGSetupEvent.Type.POST_CONFIGURE_DAG;
+import static org.apache.apex.api.plugin.DAGSetupEvent.Type.POST_POPULATE_DAG;
+import static org.apache.apex.api.plugin.DAGSetupEvent.Type.POST_VALIDATE_DAG;
+import static org.apache.apex.api.plugin.DAGSetupEvent.Type.PRE_CONFIGURE_DAG;
+import static org.apache.apex.api.plugin.DAGSetupEvent.Type.PRE_POPULATE_DAG;
+import static org.apache.apex.api.plugin.DAGSetupEvent.Type.PRE_VALIDATE_DAG;
/**
*
@@ -2076,19 +2073,18 @@
private LogicalPlan populateDAGAndValidate(LogicalPlanConfiguration tb, String appName)
{
LogicalPlan dag = new LogicalPlan();
- DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf);
- pluginManager.dispatch(SETUP, context);
- pluginManager.dispatch(PRE_POPULATE, context);
+ pluginManager.setup(dag);
+ pluginManager.dispatch(PRE_POPULATE_DAG.event);
tb.populateDAG(dag);
// configure with embedded settings
tb.prepareDAG(dag, null, appName);
- pluginManager.dispatch(POST_POPULATE, context);
+ pluginManager.dispatch(POST_POPULATE_DAG.event);
// configure with external settings
prepareDAG(dag, null, appName);
- pluginManager.dispatch(PRE_VALIDATE, context);
+ pluginManager.dispatch(PRE_VALIDATE_DAG.event);
dag.validate();
- pluginManager.dispatch(POST_VALIDATE, context);
- pluginManager.dispatch(TEARDOWN, context);
+ pluginManager.dispatch(POST_VALIDATE_DAG.event);
+ pluginManager.teardown();
return dag;
}
@@ -2118,13 +2114,12 @@
public LogicalPlan createFromStreamingApplication(StreamingApplication app, String appName)
{
LogicalPlan dag = new LogicalPlan();
- DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf);
- pluginManager.dispatch(SETUP, context);
+ pluginManager.setup(dag);
prepareDAG(dag, app, appName);
- pluginManager.dispatch(PRE_VALIDATE, context);
+ pluginManager.dispatch(PRE_VALIDATE_DAG.event);
dag.validate();
- pluginManager.dispatch(POST_VALIDATE, context);
- pluginManager.dispatch(TEARDOWN, context);
+ pluginManager.dispatch(POST_VALIDATE_DAG.event);
+ pluginManager.teardown();
return dag;
}
@@ -2256,14 +2251,13 @@
// EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used
String connectAddress = conf.get(KEY_GATEWAY_CONNECT_ADDRESS);
dag.setAttribute(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null ? conf.get(GATEWAY_LISTEN_ADDRESS) : connectAddress);
- DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf);
+ pluginManager.setup(dag);
if (app != null) {
- pluginManager.dispatch(SETUP, context);
- pluginManager.dispatch(PRE_POPULATE, context);
+ pluginManager.dispatch(PRE_POPULATE_DAG.event);
app.populateDAG(dag, conf);
- pluginManager.dispatch(POST_POPULATE, context);
+ pluginManager.dispatch(POST_POPULATE_DAG.event);
}
- pluginManager.dispatch(PRE_CONFIGURE, context);
+ pluginManager.dispatch(PRE_CONFIGURE_DAG.event);
String appAlias = getAppAlias(name);
String appName = appAlias == null ? name : appAlias;
List<AppConf> appConfs = stramConf.getMatchingChildConf(appName, StramElement.APPLICATION);
@@ -2279,7 +2273,8 @@
// inject external operator configuration
setOperatorConfiguration(dag, appConfs, appName);
setStreamConfiguration(dag, appConfs, appName);
- pluginManager.dispatch(POST_CONFIGURE, context);
+ pluginManager.dispatch(POST_CONFIGURE_DAG.event);
+ pluginManager.teardown();
}
private void flattenDAG(LogicalPlan dag, Configuration conf)
diff --git a/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionEvent.java b/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionEvent.java
new file mode 100644
index 0000000..dfffbca
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionEvent.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.api.plugin;
+
+import org.apache.apex.api.plugin.Event;
+
+import com.datatorrent.stram.api.StramEvent;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
+
+import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.COMMIT_EVENT;
+import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.HEARTBEAT_EVENT;
+import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.STRAM_EVENT;
+
+public class DAGExecutionEvent extends Event.BaseEvent<DAGExecutionEvent.Type>
+{
+ public enum Type implements Event.Type
+ {
+ HEARTBEAT_EVENT, STRAM_EVENT, COMMIT_EVENT
+ }
+
+ public static class HeartbeatExecutionEvent extends DAGExecutionEvent
+ {
+ private final StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat;
+
+ public HeartbeatExecutionEvent(StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat)
+ {
+ super(HEARTBEAT_EVENT);
+ this.heartbeat = heartbeat;
+ }
+
+ public StreamingContainerUmbilicalProtocol.ContainerHeartbeat getHeartbeat()
+ {
+ return heartbeat;
+ }
+ }
+
+ public static class StramExecutionEvent extends DAGExecutionEvent
+ {
+ private final StramEvent stramEvent;
+
+ public StramExecutionEvent(StramEvent stramEvent)
+ {
+ super(STRAM_EVENT);
+ this.stramEvent = stramEvent;
+ }
+
+ public StramEvent getStramEvent()
+ {
+ return stramEvent;
+ }
+ }
+
+ public static class CommitExecutionEvent extends DAGExecutionEvent
+ {
+ private final long commitWindow;
+
+ public CommitExecutionEvent(long commitWindow)
+ {
+ super(COMMIT_EVENT);
+ this.commitWindow = commitWindow;
+ }
+
+ public long getCommitWindow()
+ {
+ return commitWindow;
+ }
+ }
+
+ protected DAGExecutionEvent(DAGExecutionEvent.Type eventType)
+ {
+ super(eventType);
+ }
+}
diff --git a/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPlugin.java b/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPlugin.java
index 060b240..7cfb1fa 100644
--- a/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPlugin.java
+++ b/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPlugin.java
@@ -18,26 +18,66 @@
*/
package org.apache.apex.engine.api.plugin;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
import org.apache.apex.api.plugin.Plugin;
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StatsListener.BatchedOperatorStats;
+import com.datatorrent.common.util.Pair;
+import com.datatorrent.stram.StramAppContext;
+import com.datatorrent.stram.util.VersionInfo;
+import com.datatorrent.stram.webapp.AppInfo;
+import com.datatorrent.stram.webapp.LogicalOperatorInfo;
/**
- * An Apex plugin is a user code which runs inside Stram. The interaction
- * between plugin and Stram is managed by DAGExecutionPluginContext. Plugin can register to handle event in interest
- * with callback handler using ${@link DAGExecutionPluginContext#register(DAGExecutionPluginContext.RegistrationType, DAGExecutionPluginContext.Handler)}
+ * DAGExecutionPlugin allows user provided code to respond to various events during the application runtime.
*
* Following events are supported
* <ul>
- * <li>{@see DAGExecutionPluginContext.HEARTBEAT} The heartbeat from a container is delivered to the plugin after it has been handled by stram</li>
- * <li>{@see DAGExecutionPluginContext.STRAM_EVENT} All the Stram event generated in Stram will be delivered to the plugin</li>
- * <li>{@see DAGExecutionPluginContext.COMMIT_EVENT} When committedWindowId changes in the platform an event will be delivered to the plugin</li>
+ * <li>{@see Context.HEARTBEAT} The heartbeat from a container is delivered to the plugin after it has been handled by stram</li>
+ * <li>{@see Context.STRAM_EVENT} All the Stram event generated in Stram will be delivered to the plugin</li>
+ * <li>{@see Context.COMMIT_EVENT} When committedWindowId changes in the platform an event will be delivered to the plugin</li>
* </ul>
*
- * A plugin should register a single handler for an event, In case multiple handlers are registered for an event,
- * then the last registered handler will be used. Plugin should cleanup additional resources created by it during shutdown
- * such as helper threads and open files.
*/
-@InterfaceStability.Evolving
-public interface DAGExecutionPlugin extends Plugin<DAGExecutionPluginContext>
+public interface DAGExecutionPlugin<T extends DAGExecutionPlugin.Context> extends Plugin<T>
{
+
+ /**
+ * The context for the execution plugins.
+ *
+ * Following events are supported
+ * <ul>
+ * <li>{@see Context.HEARTBEAT} The heartbeat from a container is delivered to the plugin after it has been handled by stram</li>
+ * <li>{@see Context.STRAM_EVENT} All the Stram event generated in Stram will be delivered to the plugin</li>
+ * <li>{@see Context.COMMIT_EVENT} When committedWindowId changes in the platform an event will be delivered to the plugin</li>
+ * </ul>
+ *
+ */
+ interface Context<E extends DAGExecutionEvent> extends PluginContext<DAGExecutionEvent.Type, E>
+ {
+ VersionInfo getEngineVersion();
+
+ StramAppContext getApplicationContext();
+
+ AppInfo.AppStats getApplicationStats();
+
+ Configuration getLaunchConfig();
+
+ DAG getDAG();
+
+ String getOperatorName(int id);
+
+ BatchedOperatorStats getPhysicalOperatorStats(int id);
+
+ List<LogicalOperatorInfo> getLogicalOperatorInfoList();
+
+ Queue<Pair<Long, Map<String, Object>>> getWindowMetrics(String operatorName);
+
+ long windowIdToMillis(long windowId);
+ }
}
diff --git a/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPluginContext.java b/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPluginContext.java
deleted file mode 100644
index 73da7e6..0000000
--- a/engine/src/main/java/org/apache/apex/engine/api/plugin/DAGExecutionPluginContext.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.engine.api.plugin;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-
-import org.apache.apex.api.plugin.PluginContext;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StatsListener.BatchedOperatorStats;
-import com.datatorrent.common.util.Pair;
-import com.datatorrent.stram.StramAppContext;
-import com.datatorrent.stram.api.StramEvent;
-import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
-import com.datatorrent.stram.util.VersionInfo;
-import com.datatorrent.stram.webapp.AppInfo;
-import com.datatorrent.stram.webapp.LogicalOperatorInfo;
-
-/**
- * An Apex plugin is a user code which runs inside Stram. The interaction
- * between plugin and Stram is managed by DAGExecutionPluginContext. Plugin can register to handle event in interest
- * with callback handler using ${@link DAGExecutionPluginContext#register(DAGExecutionPluginContext.RegistrationType, DAGExecutionPluginContext.Handler)}
- *
- * Following events are supported
- * <ul>
- * <li>{@see DAGExecutionPluginContext.HEARTBEAT} The heartbeat from a container is delivered to the plugin after it has been handled by stram</li>
- * <li>{@see DAGExecutionPluginContext.STRAM_EVENT} All the Stram event generated in Stram will be delivered to the plugin</li>
- * <li>{@see DAGExecutionPluginContext.COMMIT_EVENT} When committedWindowId changes in the platform an event will be delivered to the plugin</li>
- * </ul>
- *
- * A plugin should register a single handler for an event, In case multiple handlers are registered for an event,
- * then the last registered handler will be used. Plugin should cleanup additional resources created by it during shutdown
- * such as helper threads and open files.
- */
-@InterfaceStability.Evolving
-public interface DAGExecutionPluginContext extends PluginContext
-{
- class RegistrationType<T>
- {
- }
-
- RegistrationType<StreamingContainerUmbilicalProtocol.ContainerHeartbeat> HEARTBEAT = new RegistrationType<>();
- RegistrationType<StramEvent> STRAM_EVENT = new RegistrationType<>();
- RegistrationType<Long> COMMIT_EVENT = new RegistrationType<>();
-
- <T> void register(RegistrationType<T> type, Handler<T> handler);
-
- interface Handler<T>
- {
- void handle(T data);
- }
-
- VersionInfo getEngineVersion();
-
- StramAppContext getApplicationContext();
-
- AppInfo.AppStats getApplicationStats();
-
- Configuration getLaunchConfig();
-
- DAG getDAG();
-
- String getOperatorName(int id);
-
- BatchedOperatorStats getPhysicalOperatorStats(int id);
-
- List<LogicalOperatorInfo> getLogicalOperatorInfoList();
-
- Queue<Pair<Long, Map<String, Object>>> getWindowMetrics(String operatorName);
-
- long windowIdToMillis(long windowId);
-}
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
index 2b96632..74ee0c8 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
@@ -22,15 +22,15 @@
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.api.plugin.Event;
+import org.apache.apex.api.plugin.Plugin;
+import org.apache.apex.api.plugin.Plugin.EventHandler;
+import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.Handler;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType;
import org.apache.apex.engine.api.plugin.PluginLocator;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.hadoop.conf.Configuration;
@@ -40,7 +40,9 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
+import com.google.common.collect.Table;
import com.datatorrent.api.DAG;
import com.datatorrent.stram.StramAppContext;
@@ -62,7 +64,7 @@
private final AppInfo.AppStats stats;
protected Configuration launchConfig;
protected FileContext fileContext;
- protected final Map<DAGExecutionPlugin, PluginInfo> pluginInfoMap = new HashMap<>();
+ protected final Table<DAGExecutionEvent.Type, DAGExecutionPlugin, EventHandler<DAGExecutionEvent>> table = HashBasedTable.create();
private volatile DAG clonedDAG = null;
public AbstractApexPluginDispatcher(PluginLocator locator, StramAppContext context, StreamingContainerManager dmgr, AppInfo.AppStats stats)
@@ -123,57 +125,18 @@
super.serviceStop();
}
- /**
- * Keeps information about plugin and its registrations. Dispatcher use this
- * information while delivering events to plugin.
- */
- protected class PluginInfo
+ public void register(DAGExecutionEvent.Type eventType, Plugin.EventHandler<DAGExecutionEvent> handler, DAGExecutionPlugin owner)
{
- private final DAGExecutionPlugin plugin;
- private final Map<RegistrationType<?>, Handler<?>> registrationMap = new HashMap<>();
-
- <T> void put(RegistrationType<T> registrationType, Handler<T> handler)
- {
- registrationMap.put(registrationType, handler);
+ synchronized (table) {
+ table.put(eventType, owner, handler);
}
-
- <T> Handler<T> get(RegistrationType<T> registrationType)
- {
- return (Handler<T>)registrationMap.get(registrationType);
- }
-
- public PluginInfo(DAGExecutionPlugin plugin)
- {
- this.plugin = plugin;
- }
-
- public DAGExecutionPlugin getPlugin()
- {
- return plugin;
- }
- }
-
- PluginInfo getPluginInfo(DAGExecutionPlugin plugin)
- {
- PluginInfo pInfo = pluginInfoMap.get(plugin);
- if (pInfo == null) {
- pInfo = new PluginInfo(plugin);
- pluginInfoMap.put(plugin, pInfo);
- }
- return pInfo;
- }
-
- private <T> void register(RegistrationType<T> type, Handler<T> handler, DAGExecutionPlugin owner)
- {
- PluginInfo pInfo = getPluginInfo(owner);
- pInfo.put(type, handler);
}
/**
* A wrapper PluginManager to track registration from a plugin. with this plugin
* don't need to pass explicit owner argument during registration.
*/
- private class PluginManagerImpl extends AbstractDAGExecutionPluginContext
+ private class PluginManagerImpl extends AbstractDAGExecutionPluginContext<DAGExecutionEvent>
{
private final DAGExecutionPlugin owner;
@@ -184,7 +147,7 @@
}
@Override
- public <T> void register(RegistrationType<T> type, Handler<T> handler)
+ public void register(DAGExecutionEvent.Type type, EventHandler<DAGExecutionEvent> handler)
{
AbstractApexPluginDispatcher.this.register(type, handler, owner);
}
@@ -198,19 +161,19 @@
/**
* Dispatch events to plugins.
- * @param registrationType
- * @param data
- * @param <T>
+ * @param event The dag execution event
*/
- protected abstract <T> void dispatchEvent(RegistrationType<T> registrationType, T data);
+ protected abstract void dispatchExecutionEvent(DAGExecutionEvent event);
@Override
- public <T> void dispatch(RegistrationType<T> registrationType, T data)
+ public void dispatch(Event event)
{
- if (registrationType == ApexPluginDispatcher.DAG_CHANGE_EVENT) {
- clonedDAG = SerializationUtils.clone((DAG)data);
- } else {
- dispatchEvent(registrationType, data);
+ if (!plugins.isEmpty()) {
+ if (event.getType() == ApexPluginDispatcher.DAG_CHANGE) {
+ clonedDAG = SerializationUtils.clone(((DAGChangeEvent)event).dag);
+ } else if (event instanceof DAGExecutionEvent) {
+ dispatchExecutionEvent((DAGExecutionEvent)event);
+ }
}
}
}
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java
index b17d5f8..21f29f8 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractDAGExecutionPluginContext.java
@@ -23,7 +23,8 @@
import java.util.Map;
import java.util.Queue;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext;
+import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
+import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Attribute;
@@ -37,7 +38,7 @@
import com.datatorrent.stram.webapp.AppInfo;
import com.datatorrent.stram.webapp.LogicalOperatorInfo;
-public abstract class AbstractDAGExecutionPluginContext implements DAGExecutionPluginContext
+public abstract class AbstractDAGExecutionPluginContext<E extends DAGExecutionEvent> implements DAGExecutionPlugin.Context<E>
{
private final StreamingContainerManager dnmgr;
private final Configuration launchConf;
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java
index 234195f..9ef2a5d 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/ApexPluginDispatcher.java
@@ -18,7 +18,7 @@
*/
package org.apache.apex.engine.plugin;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType;
+import org.apache.apex.api.plugin.Event;
import org.apache.hadoop.service.Service;
import com.datatorrent.api.DAG;
@@ -29,7 +29,18 @@
/**
* This is internal event, which is not delivered to the plugins.
*/
- RegistrationType<DAG> DAG_CHANGE_EVENT = new RegistrationType<>();
+ Event.Type DAG_CHANGE = new Event.Type(){};
- <T> void dispatch(RegistrationType<T> registrationType, T data);
+ class DAGChangeEvent extends Event.BaseEvent<Event.Type>
+ {
+ final DAG dag;
+
+ public DAGChangeEvent(DAG dag)
+ {
+ super(DAG_CHANGE);
+ this.dag = dag;
+ }
+ }
+
+ void dispatch(Event e);
}
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java
index bea011c..1252061 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.java
@@ -28,8 +28,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.Handler;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType;
+import org.apache.apex.api.plugin.Event;
+import org.apache.apex.api.plugin.Plugin.EventHandler;
+import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
import org.apache.apex.engine.api.plugin.PluginLocator;
import org.apache.hadoop.conf.Configuration;
@@ -56,10 +57,10 @@
}
@Override
- protected <T> void dispatchEvent(RegistrationType<T> registrationType, T data)
+ protected void dispatchExecutionEvent(DAGExecutionEvent event)
{
if (executorService != null) {
- executorService.submit(new ProcessEventTask<>(registrationType, data));
+ executorService.submit(new ProcessEventTask<>(event));
}
}
@@ -98,24 +99,25 @@
executorService = null;
}
- private class ProcessEventTask<T> implements Runnable
+ private class ProcessEventTask<T extends DAGExecutionEvent.Type> implements Runnable
{
- private final RegistrationType<T> registrationType;
- private final T data;
+ private final Event<T> event;
- public ProcessEventTask(RegistrationType<T> type, T data)
+ public ProcessEventTask(Event<T> event)
{
- this.registrationType = type;
- this.data = data;
+ this.event = event;
}
@Override
public void run()
{
- for (final PluginInfo pInfo : pluginInfoMap.values()) {
- final Handler<T> handler = pInfo.get(registrationType);
- if (handler != null) {
- handler.handle(data);
+ synchronized (table) {
+ for (EventHandler handler : table.row(event.getType()).values()) {
+ try {
+ handler.handle(event);
+ } catch (RuntimeException e) {
+ LOG.warn("Event {} caused exception in handler {}", event, handler, e);
+ }
}
}
}
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java
index f3f3382..a629a3f 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/NoOpApexPluginDispatcher.java
@@ -18,7 +18,7 @@
*/
package org.apache.apex.engine.plugin;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.RegistrationType;
+import org.apache.apex.api.plugin.Event;
import org.apache.hadoop.service.AbstractService;
public class NoOpApexPluginDispatcher extends AbstractService implements ApexPluginDispatcher
@@ -29,7 +29,7 @@
}
@Override
- public <T> void dispatch(RegistrationType<T> registrationType, T data)
+ public void dispatch(Event event)
{
}
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java
index 4c8b4e5..7e38ee8 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java
@@ -26,24 +26,27 @@
import javax.validation.ValidationException;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.api.plugin.DAGSetupEvent;
import org.apache.apex.api.plugin.DAGSetupPlugin;
+import org.apache.apex.api.plugin.Plugin.EventHandler;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
-import static org.slf4j.LoggerFactory.getLogger;
+import static org.apache.apex.api.plugin.DAGSetupEvent.Type.PRE_VALIDATE_DAG;
-public class PropertyInjectorVisitor implements DAGSetupPlugin
+public class PropertyInjectorVisitor implements DAGSetupPlugin<DAGSetupPlugin.Context>, EventHandler<DAGSetupEvent>
{
- private static final Logger LOG = getLogger(PropertyInjectorVisitor.class);
+ private static final Logger LOG = LoggerFactory.getLogger(PropertyInjectorVisitor.class);
private String path;
private Map<String, String> propertyMap = new HashMap<>();
private DAG dag;
@Override
- public void setup(DAGSetupPluginContext context)
+ public void setup(DAGSetupPlugin.Context context)
{
this.dag = context.getDAG();
try {
@@ -56,34 +59,11 @@
} catch (IOException ex) {
throw new ValidationException("Not able to load input file " + path);
}
+ context.register(PRE_VALIDATE_DAG, this);
}
@Override
- public void prePopulateDAG()
- {
-
- }
-
- @Override
- public void postPopulateDAG()
- {
-
- }
-
- @Override
- public void preConfigureDAG()
- {
-
- }
-
- @Override
- public void postConfigureDAG()
- {
-
- }
-
- @Override
- public void preValidateDAG()
+ public void handle(DAGSetupEvent event)
{
for (DAG.OperatorMeta ometa : dag.getAllOperatorsMeta()) {
Operator o = ometa.getOperator();
@@ -91,12 +71,6 @@
}
}
- @Override
- public void postValidateDAG()
- {
-
- }
-
public PropertyInjectorVisitor()
{
}
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
index 5b8ca11..833d69f 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
@@ -21,52 +21,56 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.Handler;
-import com.datatorrent.stram.api.StramEvent;
-import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
+import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.COMMIT_EVENT;
+import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.HEARTBEAT_EVENT;
+import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.STRAM_EVENT;
-import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.COMMIT_EVENT;
-import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.HEARTBEAT;
-import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.STRAM_EVENT;
-
-public class DebugPlugin implements DAGExecutionPlugin
+public class DebugPlugin implements DAGExecutionPlugin<DAGExecutionPlugin.Context>
{
+ private static final Logger logger = LoggerFactory.getLogger(DebugPlugin.class);
+
private int eventCount = 0;
private int heartbeatCount = 0;
private int commitCount = 0;
CountDownLatch latch = new CountDownLatch(3);
@Override
- public void setup(DAGExecutionPluginContext context)
+ public void setup(DAGExecutionPlugin.Context context)
{
- context.register(STRAM_EVENT, new Handler<StramEvent>()
+ context.register(STRAM_EVENT, new EventHandler<DAGExecutionEvent.StramExecutionEvent>()
{
@Override
- public void handle(StramEvent stramEvent)
+ public void handle(DAGExecutionEvent.StramExecutionEvent event)
{
+ logger.debug("Stram Event {}", event.getStramEvent());
eventCount++;
latch.countDown();
}
});
- context.register(HEARTBEAT, new Handler<StreamingContainerUmbilicalProtocol.ContainerHeartbeat>()
+ context.register(HEARTBEAT_EVENT, new EventHandler<DAGExecutionEvent.HeartbeatExecutionEvent>()
{
@Override
- public void handle(StreamingContainerUmbilicalProtocol.ContainerHeartbeat heartbeat)
+ public void handle(DAGExecutionEvent.HeartbeatExecutionEvent event)
{
+ logger.debug("Heartbeat {}", event.getHeartbeat());
heartbeatCount++;
latch.countDown();
}
});
- context.register(COMMIT_EVENT, new Handler<Long>()
+ context.register(COMMIT_EVENT, new EventHandler<DAGExecutionEvent.CommitExecutionEvent>()
{
@Override
- public void handle(Long aLong)
+ public void handle(DAGExecutionEvent.CommitExecutionEvent event)
{
+ logger.debug("Commit window id {}", event.getCommitWindow());
commitCount++;
latch.countDown();
}
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java
index 786e0d6..f2563c2 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/NoOpPlugin.java
@@ -22,14 +22,13 @@
import org.slf4j.LoggerFactory;
import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
-import org.apache.apex.engine.api.plugin.DAGExecutionPluginContext;
-public class NoOpPlugin implements DAGExecutionPlugin
+public class NoOpPlugin implements DAGExecutionPlugin<DAGExecutionPlugin.Context>
{
private static final Logger LOG = LoggerFactory.getLogger(NoOpPlugin.class);
@Override
- public void setup(DAGExecutionPluginContext context)
+ public void setup(DAGExecutionPlugin.Context context)
{
LOG.info("NoOpPlugin plugin called init ");
}
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
index 84dc4ba..140dc65 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
@@ -23,6 +23,7 @@
import org.junit.Assert;
import org.junit.Test;
+import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
import org.apache.apex.engine.plugin.loaders.ChainedPluginLocator;
import org.apache.apex.engine.plugin.loaders.ServiceLoaderBasedPluginLocator;
@@ -34,10 +35,6 @@
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.support.StramTestSupport;
-import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.COMMIT_EVENT;
-import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.HEARTBEAT;
-import static org.apache.apex.engine.api.plugin.DAGExecutionPluginContext.STRAM_EVENT;
-
public class PluginTests
{
@@ -86,16 +83,16 @@
ApexPluginDispatcher pluginManager = new DefaultApexPluginDispatcher(locator,
new StramTestSupport.TestAppContext(new Attribute.AttributeMap.DefaultAttributeMap()), null, null);
pluginManager.init(new Configuration());
- pluginManager.dispatch(STRAM_EVENT, new StramEvent(StramEvent.LogLevel.DEBUG)
+ pluginManager.dispatch(new DAGExecutionEvent.StramExecutionEvent(new StramEvent(StramEvent.LogLevel.DEBUG)
{
@Override
public String getType()
{
return "TestEvent";
}
- });
- pluginManager.dispatch(COMMIT_EVENT, new Long(1234));
- pluginManager.dispatch(HEARTBEAT, new StreamingContainerUmbilicalProtocol.ContainerHeartbeat());
+ }));
+ pluginManager.dispatch(new DAGExecutionEvent.CommitExecutionEvent(1234));
+ pluginManager.dispatch(new DAGExecutionEvent.HeartbeatExecutionEvent(new StreamingContainerUmbilicalProtocol.ContainerHeartbeat()));
debugPlugin.waitForEventDelivery(10);
pluginManager.stop();