Merge branch 'APEXCORE-107' of https://github.com/tushargosavi/incubator-apex-core
diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java
index 74448fd..1518fcf 100644
--- a/api/src/main/java/com/datatorrent/api/DAG.java
+++ b/api/src/main/java/com/datatorrent/api/DAG.java
@@ -159,14 +159,6 @@
public OutputPortMeta getMeta(Operator.OutputPort<?> port);
}
- @InterfaceStability.Evolving
- interface ModuleMeta extends Serializable, Context
- {
- String getName();
-
- Module getModule();
- }
-
/**
* Add new instance of operator under given name to the DAG.
* The operator class must have a default constructor.
@@ -272,15 +264,17 @@
*/
public abstract OperatorMeta getOperatorMeta(String operatorId);
- @InterfaceStability.Evolving
- ModuleMeta getModuleMeta(String moduleId);
-
/**
* <p>getMeta.</p>
*/
public abstract OperatorMeta getMeta(Operator operator);
- @InterfaceStability.Evolving
- ModuleMeta getMeta(Module module);
+ /**
+ * Marker interface for the Node in the DAG. Any object which can be added as a Node in the DAG
+ * needs to implement this interface.
+ */
+ interface GenericOperator
+ {
+ }
}
diff --git a/api/src/main/java/com/datatorrent/api/Module.java b/api/src/main/java/com/datatorrent/api/Module.java
index 8a85d8b..d93d16f 100644
--- a/api/src/main/java/com/datatorrent/api/Module.java
+++ b/api/src/main/java/com/datatorrent/api/Module.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG.GenericOperator;
import com.datatorrent.api.Operator.InputPort;
import com.datatorrent.api.Operator.OutputPort;
import com.datatorrent.api.Operator.Unifier;
@@ -36,7 +37,7 @@
* @since 3.3.0
*/
@InterfaceStability.Evolving
-public interface Module
+public interface Module extends GenericOperator
{
void populateDAG(DAG dag, Configuration conf);
diff --git a/api/src/main/java/com/datatorrent/api/Operator.java b/api/src/main/java/com/datatorrent/api/Operator.java
index d4a6a90..c016799 100644
--- a/api/src/main/java/com/datatorrent/api/Operator.java
+++ b/api/src/main/java/com/datatorrent/api/Operator.java
@@ -20,6 +20,7 @@
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG.GenericOperator;
/**
* <p>
@@ -27,7 +28,7 @@
*
* @since 0.3.2
*/
-public interface Operator extends Component<OperatorContext>
+public interface Operator extends Component<OperatorContext>, GenericOperator
{
/**
* One can set attribute on an Operator to indicate the mode in which it processes Tuples.
diff --git a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
index 9139566..6607321 100644
--- a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
+++ b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
@@ -362,7 +362,7 @@
Map<String, Object> moduleDetailMap = new HashMap<>();
ArrayList<String> operatorArray = new ArrayList<>();
moduleDetailMap.put("name", moduleMeta.getName());
- moduleDetailMap.put("className", moduleMeta.getModule().getClass().getName());
+ moduleDetailMap.put("className", moduleMeta.getGenericOperator().getClass().getName());
moduleDetailMap.put("operators", operatorArray);
for (OperatorMeta operatorMeta : moduleMeta.getDag().getAllOperators()) {
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 15969b7..af6b1bc 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -63,13 +63,13 @@
import org.slf4j.LoggerFactory;
import org.apache.commons.io.input.ClassLoaderObjectInputStream;
+import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Sets;
import com.datatorrent.api.AffinityRule;
@@ -195,7 +195,6 @@
private final List<OperatorMeta> rootOperators = new ArrayList<>();
private final List<OperatorMeta> leafOperators = new ArrayList<>();
private final Attribute.AttributeMap attributes = new DefaultAttributeMap();
- private transient Map<String, ArrayListMultimap<OutputPort<?>, InputPort<?>>> streamLinks = new HashMap<>();
@Override
public Attribute.AttributeMap getAttributes()
@@ -489,6 +488,10 @@
@Override
public StreamMeta setSource(Operator.OutputPort<?> port)
{
+ if (port instanceof ProxyOutputPort) {
+ proxySource = port;
+ return this;
+ }
OutputPortMeta portMeta = assertGetPortMeta(port);
OperatorMeta om = portMeta.getOperatorMeta();
if (om.outputStreams.containsKey(portMeta)) {
@@ -508,6 +511,10 @@
@Override
public StreamMeta addSink(Operator.InputPort<?> port)
{
+ if (port instanceof ProxyInputPort) {
+ proxySinks.add(port);
+ return this;
+ }
InputPortMeta portMeta = assertGetPortMeta(port);
OperatorMeta om = portMeta.getOperatorWrapper();
String portName = portMeta.getPortName();
@@ -771,12 +778,40 @@
removeOperator(persistOpMeta.getOperator());
}
}
+
+ private OutputPort<?> proxySource = null;
+ private List<InputPort<?>> proxySinks = new ArrayList<>();
+
+ /**
+ * Go over each Proxy port and find out the actual port connected to the ProxyPort
+ * and update StreamMeta.
+ */
+ private void resolvePorts()
+ {
+ if (proxySource != null && proxySource instanceof ProxyOutputPort) {
+ OutputPort<?> outputPort = proxySource;
+ while (outputPort instanceof ProxyOutputPort) {
+ outputPort = ((ProxyOutputPort<?>)outputPort).get();
+ }
+ setSource(outputPort);
+ }
+
+ for (InputPort<?> inputPort : proxySinks) {
+ while (inputPort instanceof ProxyInputPort) {
+ inputPort = ((ProxyInputPort<?>)inputPort).get();
+ }
+ addSink(inputPort);
+ }
+
+ proxySource = null;
+ proxySinks.clear();
+ }
}
/**
* Operator meta object.
*/
- public final class OperatorMeta implements DAG.OperatorMeta, Serializable
+ public class OperatorMeta implements DAG.OperatorMeta, Serializable
{
private final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams = new LinkedHashMap<>();
private final LinkedHashMap<OutputPortMeta, StreamMeta> outputStreams = new LinkedHashMap<>();
@@ -789,7 +824,7 @@
private final LogicalOperatorStatus status;
private transient Integer nindex; // for cycle detection
private transient Integer lowlink; // for cycle detection
- private transient Operator operator;
+ private transient GenericOperator operator;
private MetricAggregatorMeta metricAggregatorMeta;
private String moduleName; // Name of the module which has this operator. null if this is a top level operator.
@@ -799,13 +834,14 @@
* other value => represents the root oio node for this node
*/
private transient Integer oioRoot = null;
+ private ClassUtils genricOperator;
- private OperatorMeta(String name, Operator operator)
+ private OperatorMeta(String name, GenericOperator operator)
{
this(name, operator, new DefaultAttributeMap());
}
- private OperatorMeta(String name, Operator operator, Attribute.AttributeMap attributeMap)
+ private OperatorMeta(String name, GenericOperator operator, Attribute.AttributeMap attributeMap)
{
LOG.debug("Initializing {} as {}", name, operator.getClass().getName());
this.operatorAnnotation = operator.getClass().getAnnotation(OperatorAnnotation.class);
@@ -858,7 +894,7 @@
input.defaultReadObject();
// TODO: not working because - we don't have the storage agent in parent attribuet map
//operator = (Operator)getValue2(OperatorContext.STORAGE_AGENT).load(id, Checkpoint.STATELESS_CHECKPOINT_WINDOW_ID);
- operator = (Operator)FSStorageAgent.retrieve(input);
+ operator = (GenericOperator)FSStorageAgent.retrieve(input);
}
@Override
@@ -1000,6 +1036,7 @@
}
}
+
private class PortMapping implements Operators.OperatorDescriptor
{
private final Map<Operator.InputPort<?>, InputPortMeta> inPortMap = new HashMap<>();
@@ -1113,6 +1150,11 @@
@Override
public Operator getOperator()
{
+ return (Operator)operator;
+ }
+
+ public GenericOperator getGenericOperator()
+ {
return operator;
}
@@ -1198,90 +1240,28 @@
/**
* Module meta object.
*/
- public final class ModuleMeta implements DAG.ModuleMeta, Serializable
+ public final class ModuleMeta extends OperatorMeta implements Serializable
{
- private final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams = new LinkedHashMap<>();
- private final LinkedHashMap<OutputPortMeta, StreamMeta> outputStreams = new LinkedHashMap<>();
- private final Attribute.AttributeMap attributes;
- @NotNull
- private String name;
- private transient Module module;
private ModuleMeta parent;
private LogicalPlan dag = null;
private transient String fullName;
+ //type-casted reference to the module.
+ private transient Module module;
+ private transient boolean flattened = false;
private ModuleMeta(String name, Module module)
{
- LOG.debug("Initializing {} as {}", name, module.getClass().getName());
- this.name = name;
+ super(name, module);
this.module = module;
- this.attributes = new DefaultAttributeMap();
+ LOG.debug("Initializing {} as {}", name, module.getClass().getName());
this.dag = new LogicalPlan();
}
- @Override
- public String getName()
- {
- return name;
- }
-
- @Override
- public Module getModule()
- {
- return module;
- }
-
- @Override
- public Attribute.AttributeMap getAttributes()
- {
- return attributes;
- }
-
- @Override
- public <T> T getValue(Attribute<T> key)
- {
- return attributes.get(key);
- }
-
- @Override
- public void setCounters(Object counters)
- {
-
- }
-
- @Override
- public void sendMetrics(Collection<String> metricNames)
- {
-
- }
-
- public LinkedHashMap<InputPortMeta, StreamMeta> getInputStreams()
- {
- return inputStreams;
- }
-
- public LinkedHashMap<OutputPortMeta, StreamMeta> getOutputStreams()
- {
- return outputStreams;
- }
-
public LogicalPlan getDag()
{
return dag;
}
- private void writeObject(ObjectOutputStream out) throws IOException
- {
- out.defaultWriteObject();
- FSStorageAgent.store(out, module);
- }
-
- private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException
- {
- input.defaultReadObject();
- module = (Module)FSStorageAgent.retrieve(input);
- }
-
/**
* Expand the module and add its operator to the parentDAG. After this method finishes the module is expanded fully
* with all its submodules also expanded. The parentDAG contains the operator added by all the modules.
@@ -1291,6 +1271,10 @@
*/
public void flattenModule(LogicalPlan parentDAG, Configuration conf)
{
+ if (flattened) {
+ return;
+ }
+
module.populateDAG(dag, conf);
for (ModuleMeta subModuleMeta : dag.getAllModules()) {
subModuleMeta.setParent(this);
@@ -1298,6 +1282,7 @@
}
dag.applyStreamLinks();
parentDAG.addDAGToCurrentDAG(this);
+ flattened = true;
}
/**
@@ -1317,9 +1302,9 @@
}
if (parent == null) {
- fullName = name;
+ fullName = getName();
} else {
- fullName = parent.getFullName() + MODULE_NAMESPACE_SEPARATOR + name;
+ fullName = parent.getFullName() + MODULE_NAMESPACE_SEPARATOR + getName();
}
return fullName;
}
@@ -1401,53 +1386,25 @@
public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks)
{
StreamMeta s = addStream(id);
- id = s.id;
- ArrayListMultimap<OutputPort<?>, InputPort<?>> streamMap = ArrayListMultimap.create();
- if (!(source instanceof ProxyOutputPort)) {
- s.setSource(source);
- }
+ s.setSource(source);
for (Operator.InputPort<?> sink : sinks) {
- if (source instanceof ProxyOutputPort || sink instanceof ProxyInputPort) {
- streamMap.put(source, sink);
- streamLinks.put(id, streamMap);
- } else {
- if (s.getSource() == null) {
- s.setSource(source);
- }
- s.addSink(sink);
- }
+ s.addSink(sink);
}
return s;
}
/**
- * This will be called once the Logical Dag is expanded, and the proxy input and proxy output ports are populated with
- * the actual ports that they refer to This method adds sources and sinks for the StreamMeta objects which were left
- * empty in the addStream call.
+ * This will be called once the Logical Dag is expanded, and proxy input and proxy output ports are populated
+ * with actual ports they refer to.
*/
public void applyStreamLinks()
{
- for (String id : streamLinks.keySet()) {
- StreamMeta s = getStream(id);
- for (Map.Entry<Operator.OutputPort<?>, Operator.InputPort<?>> pair : streamLinks.get(id).entries()) {
- if (s.getSource() == null) {
- Operator.OutputPort<?> outputPort = pair.getKey();
- while (outputPort instanceof ProxyOutputPort) {
- outputPort = ((ProxyOutputPort<?>)outputPort).get();
- }
- s.setSource(outputPort);
- }
-
- Operator.InputPort<?> inputPort = pair.getValue();
- while (inputPort instanceof ProxyInputPort) {
- inputPort = ((ProxyInputPort<?>)inputPort).get();
- }
- s.addSink(inputPort);
- }
+ for (StreamMeta smeta : streams.values()) {
+ smeta.resolvePorts();
}
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ @SuppressWarnings({"unchecked", "rawtypes"})
private void addDAGToCurrentDAG(ModuleMeta moduleMeta)
{
LogicalPlan subDag = moduleMeta.getDag();
@@ -1594,12 +1551,6 @@
}
@Override
- public ModuleMeta getModuleMeta(String moduleName)
- {
- return this.modules.get(moduleName);
- }
-
- @Override
public OperatorMeta getMeta(Operator operator)
{
// TODO: cache mapping
@@ -1611,17 +1562,6 @@
throw new IllegalArgumentException("Operator not associated with the DAG: " + operator);
}
- @Override
- public ModuleMeta getMeta(Module module)
- {
- for (ModuleMeta m : getAllModules()) {
- if (m.module == module) {
- return m;
- }
- }
- throw new IllegalArgumentException("Module not associated with the DAG: " + module);
- }
-
public int getMaxContainerCount()
{
return this.getValue(CONTAINERS_MAX_COUNT);
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index 628fce2..bab414f 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -69,6 +69,8 @@
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.GenericOperator;
+import com.datatorrent.api.Module;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.StringCodec;
@@ -2111,28 +2113,28 @@
Map<String, OperatorConf> operators = appConf.getChildren(StramElement.OPERATOR);
- Map<OperatorConf, Operator> nodeMap = Maps.newHashMapWithExpectedSize(operators.size());
+ Map<OperatorConf, GenericOperator> nodeMap = Maps.newHashMapWithExpectedSize(operators.size());
// add all operators first
for (Map.Entry<String, OperatorConf> nodeConfEntry : operators.entrySet()) {
OperatorConf nodeConf = nodeConfEntry.getValue();
if (!WILDCARD.equals(nodeConf.id)) {
- Class<? extends Operator> nodeClass = StramUtils.classForName(nodeConf.getClassNameReqd(), Operator.class);
+ Class<? extends GenericOperator> nodeClass = StramUtils.classForName(nodeConf.getClassNameReqd(), GenericOperator.class);
String optJson = nodeConf.getProperties().get(nodeClass.getName());
- Operator nd = null;
+ GenericOperator operator = null;
try {
if (optJson != null) {
// if there is a special key which is the class name, it means the operator is serialized in json format
ObjectMapper mapper = ObjectMapperFactory.getOperatorValueDeserializer();
- nd = mapper.readValue("{\"" + nodeClass.getName() + "\":" + optJson + "}", nodeClass);
- dag.addOperator(nodeConfEntry.getKey(), nd);
+ operator = mapper.readValue("{\"" + nodeClass.getName() + "\":" + optJson + "}", nodeClass);
+ addOperator(dag, nodeConfEntry.getKey(), operator);
} else {
- nd = dag.addOperator(nodeConfEntry.getKey(), nodeClass);
+ operator = addOperator(dag, nodeConfEntry.getKey(), nodeClass);
}
- setOperatorProperties(nd, nodeConf.getProperties());
+ setOperatorProperties(operator, nodeConf.getProperties());
} catch (IOException e) {
throw new IllegalArgumentException("Error setting operator properties " + e.getMessage(), e);
}
- nodeMap.put(nodeConf, nd);
+ nodeMap.put(nodeConf, operator);
}
}
@@ -2157,7 +2159,7 @@
portName = e.getKey();
}
}
- Operator sourceDecl = nodeMap.get(streamConf.sourceNode);
+ GenericOperator sourceDecl = nodeMap.get(streamConf.sourceNode);
Operators.PortMappingDescriptor sourcePortMap = new Operators.PortMappingDescriptor();
Operators.describe(sourceDecl, sourcePortMap);
sd.setSource(sourcePortMap.outputPorts.get(portName).component);
@@ -2174,7 +2176,7 @@
portName = e.getKey();
}
}
- Operator targetDecl = nodeMap.get(targetNode);
+ GenericOperator targetDecl = nodeMap.get(targetNode);
Operators.PortMappingDescriptor targetPortMap = new Operators.PortMappingDescriptor();
Operators.describe(targetDecl, targetPortMap);
sd.addSink(targetPortMap.inputPorts.get(portName).component);
@@ -2187,6 +2189,27 @@
}
+ private GenericOperator addOperator(LogicalPlan dag, String name, GenericOperator operator)
+ {
+ if (operator instanceof Module) {
+ dag.addModule(name, (Module)operator);
+ } else if (operator instanceof Operator) {
+ dag.addOperator(name, (Operator)operator);
+ }
+ return operator;
+ }
+
+
+ private GenericOperator addOperator(LogicalPlan dag, String name, Class<?> clazz)
+ {
+ if (Module.class.isAssignableFrom(clazz)) {
+ return dag.addModule(name, (Class<Module>)clazz);
+ } else if (Operator.class.isAssignableFrom(clazz)) {
+ return dag.addOperator(name, (Class<Operator>)clazz);
+ }
+ return null;
+ }
+
/**
* Populate the logical plan from the streaming application definition and configuration.
* Configuration is resolved based on application alias, if any.
@@ -2323,12 +2346,7 @@
private PropertyArgs getPropertyArgs(OperatorMeta om)
{
- return new PropertyArgs(om.getName(), om.getOperator().getClass().getName());
- }
-
- private PropertyArgs getPropertyArgs(ModuleMeta mm)
- {
- return new PropertyArgs(mm.getName(), mm.getModule().getClass().getName());
+ return new PropertyArgs(om.getName(), om.getGenericOperator().getClass().getName());
}
/**
@@ -2361,7 +2379,7 @@
* @param properties
* @return Operator
*/
- public static Operator setOperatorProperties(Operator operator, Map<String, String> properties)
+ public static GenericOperator setOperatorProperties(GenericOperator operator, Map<String, String> properties)
{
try {
// populate custom opProps
@@ -2372,26 +2390,6 @@
}
}
- /**
- * Generic helper function to inject properties on the object.
- *
- * @param obj
- * @param properties
- * @param <T>
- * @return
- */
- public static <T> T setObjectProperties(T obj, Map<String, String> properties)
- {
- try {
- BeanUtils.populate(obj, properties);
- return obj;
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException("Error setting operator properties", e);
- } catch (InvocationTargetException e) {
- throw new IllegalArgumentException("Error setting operator properties", e);
- }
- }
-
public static StreamingApplication setApplicationProperties(StreamingApplication application, Map<String, String> properties)
{
try {
@@ -2421,7 +2419,7 @@
for (OperatorMeta ow : dag.getAllOperators()) {
List<OperatorConf> opConfs = getMatchingChildConf(appConfs, ow.getName(), StramElement.OPERATOR);
Map<String, String> opProps = getProperties(getPropertyArgs(ow), opConfs, applicationName);
- setOperatorProperties(ow.getOperator(), opProps);
+ setOperatorProperties(ow.getGenericOperator(), opProps);
}
}
@@ -2502,7 +2500,7 @@
for (final ModuleMeta mw : dag.getAllModules()) {
List<OperatorConf> opConfs = getMatchingChildConf(appConfs, mw.getName(), StramElement.OPERATOR);
Map<String, String> opProps = getProperties(getPropertyArgs(mw), opConfs, appName);
- setObjectProperties(mw.getModule(), opProps);
+ setOperatorProperties(mw.getGenericOperator(), opProps);
}
}
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/Operators.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/Operators.java
index 5da7383..7bb4c39 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/Operators.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/Operators.java
@@ -22,6 +22,7 @@
import java.util.LinkedHashMap;
import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG.GenericOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.InputPort;
import com.datatorrent.api.Operator.OutputPort;
@@ -80,7 +81,7 @@
}
}
- public static void describe(Operator operator, OperatorDescriptor descriptor)
+ public static void describe(GenericOperator operator, OperatorDescriptor descriptor)
{
for (Class<?> c = operator.getClass(); c != Object.class; c = c.getSuperclass()) {
Field[] fields = c.getDeclaredFields();
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
index 52be922..f09a53e 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
@@ -828,7 +828,7 @@
if (logicalModule == null) {
throw new NotFoundException();
}
- operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalModule.getModule());
+ operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalModule.getOperator());
} else {
operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator());
}
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
index 97c015e..1966678 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
@@ -118,7 +118,7 @@
static class TestModule implements Module
{
- public transient ProxyInputPort<Integer> moduleInput = new Module.ProxyInputPort<Integer>();
+ public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<Integer>();
public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<Integer>();
@Override
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
index d5af67b..97a375f 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
@@ -18,16 +18,22 @@
*/
package com.datatorrent.stram.plan.logical.module;
+import java.io.IOException;
+import java.io.InputStream;
import java.io.Serializable;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Random;
+import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Attribute;
@@ -48,7 +54,7 @@
public class TestModuleExpansion
{
- static class DummyInputOperator extends BaseOperator implements InputOperator
+ public static class DummyInputOperator extends BaseOperator implements InputOperator
{
private int inputOperatorProp = 0;
@@ -72,7 +78,7 @@
}
}
- static class DummyOperator extends BaseOperator
+ public static class DummyOperator extends BaseOperator
{
private int operatorProp = 0;
@@ -104,7 +110,7 @@
}
}
- static class TestPartitioner implements Partitioner<DummyOperator>, Serializable
+ public static class TestPartitioner implements Partitioner<DummyOperator>, Serializable
{
@Override
public Collection<Partition<DummyOperator>> definePartitions(Collection<Partition<DummyOperator>> partitions, PartitioningContext context)
@@ -121,7 +127,7 @@
}
}
- static class Level1Module implements Module
+ public static class Level1Module implements Module
{
private int level1ModuleProp = 0;
@@ -184,7 +190,7 @@
}
}
- static class Level2ModuleA implements Module
+ public static class Level2ModuleA implements Module
{
private int level2ModuleAProp1 = 0;
private int level2ModuleAProp2 = 0;
@@ -253,7 +259,7 @@
}
}
- static class Level2ModuleB implements Module
+ public static class Level2ModuleB implements Module
{
private int level2ModuleBProp1 = 0;
private int level2ModuleBProp2 = 0;
@@ -321,7 +327,7 @@
}
}
- static class Level3Module implements Module
+ public static class Level3Module implements Module
{
public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>();
@@ -344,7 +350,7 @@
}
}
- static class NestedModuleApp implements StreamingApplication
+ public static class NestedModuleApp implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
@@ -564,10 +570,6 @@
Assert.assertTrue(moduleNames.contains("Me"));
Assert.assertEquals("Number of modules are 5", 5, dag.getAllModules().size());
- // correct module meta is returned by getMeta call.
- LogicalPlan.ModuleMeta m = dag.getModuleMeta("Ma");
- Assert.assertEquals("Name of module is Ma", m.getName(), "Ma");
-
}
private static String componentName(String... names)
@@ -671,4 +673,51 @@
Assert.assertEquals("Locality is " + locality, meta.getLocality(), locality);
}
+ @Test
+ public void testLoadFromPropertiesFile() throws IOException
+ {
+ Properties props = new Properties();
+ String resourcePath = "/testModuleTopology.properties";
+ InputStream is = this.getClass().getResourceAsStream(resourcePath);
+ if (is == null) {
+ throw new RuntimeException("Could not load " + resourcePath);
+ }
+ props.load(is);
+ LogicalPlanConfiguration pb = new LogicalPlanConfiguration(new Configuration(false))
+ .addFromProperties(props, null);
+
+ LogicalPlan dag = new LogicalPlan();
+ pb.populateDAG(dag);
+ pb.prepareDAG(dag, null, "testApplication");
+ dag.validate();
+ validateTopLevelOperators(dag);
+ validateTopLevelStreams(dag);
+ validatePublicMethods(dag);
+ }
+
+ @Test
+ public void testLoadFromJson() throws Exception
+ {
+ String resourcePath = "/testModuleTopology.json";
+ InputStream is = this.getClass().getResourceAsStream(resourcePath);
+ if (is == null) {
+ throw new RuntimeException("Could not load " + resourcePath);
+ }
+ StringWriter writer = new StringWriter();
+
+ IOUtils.copy(is, writer);
+ JSONObject json = new JSONObject(writer.toString());
+
+ Configuration conf = new Configuration(false);
+ conf.set(StreamingApplication.DT_PREFIX + "operator.operator3.prop.myStringProperty", "o3StringFromConf");
+
+ LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
+ LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
+ planConf.prepareDAG(dag, null, "testApplication");
+ dag.validate();
+ validateTopLevelOperators(dag);
+ validateTopLevelStreams(dag);
+ validatePublicMethods(dag);
+ }
+
}
diff --git a/engine/src/test/resources/testModuleTopology.json b/engine/src/test/resources/testModuleTopology.json
new file mode 100644
index 0000000..8b2087a
--- /dev/null
+++ b/engine/src/test/resources/testModuleTopology.json
@@ -0,0 +1,141 @@
+{
+ "operators": [
+ {
+ "name": "O1",
+ "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyInputOperator",
+ "properties": {
+ "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyInputOperator": {
+ "inputOperatorProp": "1"
+ }
+ }
+ },
+ {
+ "name": "O2",
+ "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyOperator",
+ "properties": {
+ "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyOperator": {
+ "operatorProp": "2"
+ }
+ }
+ },
+ {
+ "name": "Ma",
+ "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA",
+ "properties": {
+ "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA": {
+ "level2ModuleAProp1": "11",
+ "level2ModuleAProp2": "12",
+ "level2ModuleAProp3": "13"
+ }
+ }
+ },
+ {
+ "name": "Mb",
+ "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB",
+ "properties": {
+ "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB": {
+ "level2ModuleBProp1": "21",
+ "level2ModuleBProp2": "22",
+ "level2ModuleBProp3": "23"
+ }
+ }
+ },
+ {
+ "name": "Mc",
+ "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA",
+ "properties": {
+ "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA": {
+ "level2ModuleAProp1": "31",
+ "level2ModuleAProp2": "32",
+ "level2ModuleAProp3": "33"
+ }
+ }
+ },
+ {
+ "name": "Md",
+ "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB",
+ "properties": {
+ "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB": {
+ "level2ModuleBProp1": "41",
+ "level2ModuleBProp2": "42",
+ "level2ModuleBProp3": "43"
+ }
+ }
+ },
+ {
+ "name": "Me",
+ "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level3Module"
+ }
+ ],
+ "streams": [
+ {
+ "name": "O1_O2",
+ "source": {
+ "operatorName": "O1",
+ "portName": "out"
+ },
+ "sinks": [
+ {
+ "operatorName": "O2",
+ "portName": "in"
+ },
+ {
+ "operatorName": "Me",
+ "portName": "mIn"
+ }
+ ]
+ },
+ {
+ "name": "O2_Ma",
+ "source": {
+ "operatorName": "O2",
+ "portName": "out1"
+ },
+ "sinks": [
+ {
+ "operatorName": "Ma",
+ "portName": "mIn"
+ }
+ ]
+ },
+ {
+ "name": "Ma_Mb",
+ "source": {
+ "operatorName": "Ma",
+ "portName": "mOut1"
+ },
+ "sinks": [
+ {
+ "operatorName": "Mb",
+ "portName": "mIn"
+ }
+ ]
+ },
+ {
+ "name": "Ma_Md",
+ "source": {
+ "operatorName": "Ma",
+ "portName": "mOut2"
+ },
+ "sinks": [
+ {
+ "operatorName": "Md",
+ "portName": "mIn"
+ }
+ ]
+ },
+ {
+ "name": "Mb_Mc",
+ "source": {
+ "operatorName": "Mb",
+ "portName": "mOut2"
+ },
+ "sinks": [
+ {
+ "operatorName": "Mc",
+ "portName": "mIn"
+ }
+ ]
+ }
+ ]
+}
diff --git a/engine/src/test/resources/testModuleTopology.properties b/engine/src/test/resources/testModuleTopology.properties
new file mode 100644
index 0000000..0679e26
--- /dev/null
+++ b/engine/src/test/resources/testModuleTopology.properties
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# test for defining topology as property file
+dt.operator.O1.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyInputOperator
+dt.operator.O1.inputOperatorProp=1
+
+dt.operator.O2.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyOperator
+dt.operator.O2.operatorProp=2
+
+dt.operator.Ma.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA
+dt.operator.Ma.level2ModuleAProp1=11
+dt.operator.Ma.level2ModuleAProp2=12
+dt.operator.Ma.level2ModuleAProp3=13
+
+dt.operator.Mb.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB
+dt.operator.Mb.level2ModuleBProp1=21
+dt.operator.Mb.level2ModuleBProp2=22
+dt.operator.Mb.level2ModuleBProp3=23
+
+dt.operator.Mc.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA
+dt.operator.Mc.level2ModuleAProp1=31
+dt.operator.Mc.level2ModuleAProp2=32
+dt.operator.Mc.level2ModuleAProp3=33
+
+dt.operator.Md.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB
+dt.operator.Md.level2ModuleBProp1=41
+dt.operator.Md.level2ModuleBProp2=42
+dt.operator.Md.level2ModuleBProp3=43
+
+dt.operator.Me.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level3Module
+
+dt.stream.O1_O2.source=O1.out
+dt.stream.O1_O2.sinks=O2.in,Me.mIn
+
+dt.stream.O2_Ma.source=O2.out1
+dt.stream.O2_Ma.sinks=Ma.mIn
+
+dt.stream.Ma_Mb.source=Ma.mOut1
+dt.stream.Ma_Mb.sinks=Mb.mIn
+
+dt.stream.Ma_Md.source=Ma.mOut2
+dt.stream.Ma_Md.sinks=Md.mIn
+
+dt.stream.Mb_Mc.source=Mb.mOut2
+dt.stream.Mb_Mc.sinks=Mc.mIn