| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package com.datatorrent.stram.plan.logical; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.Serializable; |
| import java.io.StringWriter; |
| import java.lang.reflect.Field; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| |
| import javax.validation.ValidationException; |
| |
| import org.codehaus.jettison.json.JSONObject; |
| import org.junit.Assert; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.lang.mutable.MutableBoolean; |
| import org.apache.hadoop.conf.Configuration; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| |
| import com.datatorrent.api.Attribute; |
| import com.datatorrent.api.Attribute.AttributeMap; |
| import com.datatorrent.api.Attribute.AttributeMap.AttributeInitializer; |
| import com.datatorrent.api.Context; |
| import com.datatorrent.api.Context.DAGContext; |
| import com.datatorrent.api.Context.OperatorContext; |
| import com.datatorrent.api.Context.PortContext; |
| import com.datatorrent.api.DAG; |
| import com.datatorrent.api.DefaultInputPort; |
| import com.datatorrent.api.DefaultOutputPort; |
| import com.datatorrent.api.Module; |
| import com.datatorrent.api.Operator; |
| import com.datatorrent.api.StatsListener; |
| import com.datatorrent.api.StreamingApplication; |
| import com.datatorrent.api.StringCodec; |
| import com.datatorrent.api.StringCodec.Integer2String; |
| import com.datatorrent.api.annotation.ApplicationAnnotation; |
| import com.datatorrent.common.codec.JsonStreamCodec; |
| import com.datatorrent.common.partitioner.StatelessPartitioner; |
| import com.datatorrent.common.util.BaseOperator; |
| import com.datatorrent.common.util.BasicContainerOptConfigurator; |
| import com.datatorrent.stram.PartitioningTest.PartitionLoadWatch; |
| import com.datatorrent.stram.client.StramClientUtils; |
| import com.datatorrent.stram.engine.GenericTestOperator; |
| import com.datatorrent.stram.engine.TestGeneratorInputOperator; |
| import com.datatorrent.stram.plan.SchemaTestOperator; |
| import com.datatorrent.stram.plan.TestPlanContext; |
| import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta; |
| import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; |
| import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta; |
| import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta; |
| import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.AttributeParseUtils; |
| import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.ConfElement; |
| import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.ContextUtils; |
| import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.StramElement; |
| import com.datatorrent.stram.plan.logical.LogicalPlanTest.ValidationTestOperator; |
| import com.datatorrent.stram.plan.physical.PTContainer; |
| import com.datatorrent.stram.plan.physical.PTOperator; |
| import com.datatorrent.stram.plan.physical.PhysicalPlan; |
| import com.datatorrent.stram.support.StramTestSupport.RegexMatcher; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| public class LogicalPlanConfigurationTest |
| { |
| |
| static { |
| @SuppressWarnings("MismatchedReadAndWriteOfArray") |
| Object[] serial = new Object[]{MockContext1.serialVersionUID, MockContext2.serialVersionUID}; |
| } |
| |
| private static OperatorMeta assertNode(LogicalPlan dag, String id) |
| { |
| OperatorMeta n = dag.getOperatorMeta(id); |
| assertNotNull("operator exists id=" + id, n); |
| return n; |
| } |
| |
| public static class TestStreamCodec<T> extends JsonStreamCodec<T> implements Serializable |
| { |
| private static final long serialVersionUID = 1L; |
| } |
| |
| /** |
| * Test read from dt-site.xml in Hadoop configuration format. |
| */ |
| @Test |
| public void testLoadFromConfigXml() |
| { |
| Configuration conf = new Configuration(false); |
| conf.addResource(StramClientUtils.DT_SITE_XML_FILE); |
| |
| LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf); |
| |
| LogicalPlan dag = new LogicalPlan(); |
| builder.populateDAG(dag); |
| dag.validate(); |
| |
| assertEquals("number of operator confs", 6, dag.getAllOperators().size()); |
| |
| OperatorMeta operator1 = assertNode(dag, "operator1"); |
| OperatorMeta operator2 = assertNode(dag, "operator2"); |
| OperatorMeta operator3 = assertNode(dag, "operator3"); |
| OperatorMeta operator4 = assertNode(dag, "operator4"); |
| |
| assertNotNull("operatorConf for root", operator1); |
| assertEquals("operatorId set", "operator1", operator1.getName()); |
| |
| // verify operator instantiation |
| assertEquals(operator1.getOperator().getClass(), TestGeneratorInputOperator.class); |
| TestGeneratorInputOperator GenericTestNode = (TestGeneratorInputOperator)operator1.getOperator(); |
| assertEquals("myStringPropertyValue", GenericTestNode.getMyStringProperty()); |
| |
| // check links |
| assertEquals("operator1 inputs", 0, operator1.getInputStreams().size()); |
| assertEquals("operator1 outputs", 1, operator1.getOutputStreams().size()); |
| StreamMeta n1n2 = operator2.getInputStreams().get(operator2.getMeta(((GenericTestOperator)operator2.getOperator()).inport1)); |
| assertNotNull("n1n2", n1n2); |
| |
| // output/input stream object same |
| assertEquals("rootNode out is operator2 in", n1n2, operator1.getOutputStreams().get(operator1.getMeta(((TestGeneratorInputOperator)operator1.getOperator()).outport))); |
| assertEquals("n1n2 source", operator1, n1n2.getSource().getOperatorMeta()); |
| Assert.assertEquals("n1n2 targets", 1, n1n2.getSinks().size()); |
| Assert.assertEquals("n1n2 target", operator2, n1n2.getSinks().iterator().next().getOperatorMeta()); |
| |
| assertEquals("stream name", "n1n2", n1n2.getName()); |
| Assert.assertEquals("n1n2 not inline (default)", null, n1n2.getLocality()); |
| |
| // operator 2 streams to operator 3 and operator 4 |
| assertEquals("operator 2 number of outputs", 1, operator2.getOutputStreams().size()); |
| StreamMeta fromNode2 = operator2.getOutputStreams().values().iterator().next(); |
| |
| Set<OperatorMeta> targetNodes = Sets.newHashSet(); |
| for (LogicalPlan.InputPortMeta ip : fromNode2.getSinks()) { |
| targetNodes.add(ip.getOperatorMeta()); |
| } |
| Assert.assertEquals("outputs " + fromNode2, Sets.newHashSet(operator3, operator4), targetNodes); |
| |
| OperatorMeta operator6 = assertNode(dag, "operator6"); |
| |
| List<OperatorMeta> rootNodes = dag.getRootOperators(); |
| assertEquals("number root operators", 2, rootNodes.size()); |
| assertTrue("root operator2", rootNodes.contains(operator1)); |
| assertTrue("root operator6", rootNodes.contains(operator6)); |
| |
| for (OperatorMeta n : rootNodes) { |
| printTopology(n, dag, 0); |
| } |
| |
| } |
| |
| private void printTopology(OperatorMeta operator, DAG tplg, int level) |
| { |
| String prefix = ""; |
| if (level > 0) { |
| prefix = StringUtils.repeat(" ", 20 * (level - 1)) + " |" + StringUtils.repeat("-", 17); |
| } |
| logger.debug(prefix + operator.getName()); |
| for (StreamMeta downStream : operator.getOutputStreams().values()) { |
| if (!downStream.getSinks().isEmpty()) { |
| for (LogicalPlan.InputPortMeta targetNode : downStream.getSinks()) { |
| printTopology(targetNode.getOperatorMeta(), tplg, level + 1); |
| } |
| } |
| } |
| } |
| |
| @Test |
| public void testLoadFromPropertiesFile() throws IOException |
| { |
| Properties props = new Properties(); |
| String resourcePath = "/testTopology.properties"; |
| InputStream is = this.getClass().getResourceAsStream(resourcePath); |
| if (is == null) { |
| fail("Could not load " + resourcePath); |
| } |
| props.load(is); |
| LogicalPlanConfiguration pb = new LogicalPlanConfiguration(new Configuration(false)).addFromProperties(props, null); |
| |
| LogicalPlan dag = new LogicalPlan(); |
| pb.populateDAG(dag); |
| dag.validate(); |
| |
| assertEquals("number of operator confs", 5, dag.getAllOperators().size()); |
| assertEquals("number of root operators", 1, dag.getRootOperators().size()); |
| |
| StreamMeta s1 = dag.getStream("n1n2"); |
| assertNotNull(s1); |
| assertTrue("n1n2 inline", DAG.Locality.CONTAINER_LOCAL == s1.getLocality()); |
| |
| OperatorMeta operator3 = dag.getOperatorMeta("operator3"); |
| assertEquals("operator3.classname", GenericTestOperator.class, operator3.getOperator().getClass()); |
| |
| GenericTestOperator doperator3 = (GenericTestOperator)operator3.getOperator(); |
| assertEquals("myStringProperty " + doperator3, "myStringPropertyValueFromTemplate", doperator3.getMyStringProperty()); |
| assertFalse("booleanProperty " + doperator3, doperator3.booleanProperty); |
| |
| OperatorMeta operator4 = dag.getOperatorMeta("operator4"); |
| GenericTestOperator doperator4 = (GenericTestOperator)operator4.getOperator(); |
| assertEquals("myStringProperty " + doperator4, "overrideOperator4", doperator4.getMyStringProperty()); |
| assertEquals("setterOnlyOperator4 " + doperator4, "setterOnlyOperator4", doperator4.propertySetterOnly); |
| assertTrue("booleanProperty " + doperator4, doperator4.booleanProperty); |
| |
| StreamMeta input1 = dag.getStream("inputStream"); |
| assertNotNull(input1); |
| Assert.assertEquals("input1 source", dag.getOperatorMeta("inputOperator"), input1.getSource().getOperatorMeta()); |
| Set<OperatorMeta> targetNodes = Sets.newHashSet(); |
| for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) { |
| targetNodes.add(targetPort.getOperatorMeta()); |
| } |
| |
| Assert.assertEquals("input1 target ", Sets.newHashSet(dag.getOperatorMeta("operator1"), operator3, operator4), targetNodes); |
| |
| } |
| |
| @Test |
| public void testLoadFromJson() throws Exception |
| { |
| String resourcePath = "/testTopology.json"; |
| InputStream is = this.getClass().getResourceAsStream(resourcePath); |
| if (is == null) { |
| fail("Could not load " + resourcePath); |
| } |
| StringWriter writer = new StringWriter(); |
| |
| IOUtils.copy(is, writer); |
| JSONObject json = new JSONObject(writer.toString()); |
| |
| Configuration conf = new Configuration(false); |
| conf.set(StreamingApplication.DT_PREFIX + "operator.operator3.prop.myStringProperty", "o3StringFromConf"); |
| |
| LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf); |
| LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson"); |
| dag.validate(); |
| |
| assertEquals("DAG attribute CONTAINER_JVM_OPTIONS ", dag.getAttributes().get(DAGContext.CONTAINER_JVM_OPTIONS), "-Xmx16m"); |
| Map<Class<?>, Class<? extends StringCodec<?>>> stringCodecsMap = Maps.newHashMap(); |
| stringCodecsMap.put(Integer.class, Integer2String.class); |
| assertEquals("DAG attribute STRING_CODECS ", stringCodecsMap, dag.getAttributes().get(DAGContext.STRING_CODECS)); |
| assertEquals("DAG attribute CONTAINER_OPTS_CONFIGURATOR ", BasicContainerOptConfigurator.class, dag.getAttributes().get(DAGContext.CONTAINER_OPTS_CONFIGURATOR).getClass()); |
| |
| assertEquals("number of operator confs", 5, dag.getAllOperators().size()); |
| assertEquals("number of root operators", 1, dag.getRootOperators().size()); |
| |
| StreamMeta s1 = dag.getStream("n1n2"); |
| assertNotNull(s1); |
| assertTrue("n1n2 inline", DAG.Locality.CONTAINER_LOCAL == s1.getLocality()); |
| |
| OperatorMeta input = dag.getOperatorMeta("inputOperator"); |
| TestStatsListener tsl = new TestStatsListener(); |
| tsl.setIntProp(222); |
| List<StatsListener> sll = Lists.<StatsListener>newArrayList(tsl); |
| assertEquals("inputOperator STATS_LISTENERS attribute ", sll, input.getAttributes().get(OperatorContext.STATS_LISTENERS)); |
| for (OutputPortMeta opm : input.getOutputStreams().keySet()) { |
| assertTrue("output port of input Operator attribute is JsonStreamCodec ", opm.getAttributes().get(PortContext.STREAM_CODEC) instanceof JsonStreamCodec<?>); |
| } |
| |
| OperatorMeta operator3 = dag.getOperatorMeta("operator3"); |
| assertEquals("operator3.classname", GenericTestOperator.class, operator3.getOperator().getClass()); |
| |
| GenericTestOperator doperator3 = (GenericTestOperator)operator3.getOperator(); |
| assertEquals("myStringProperty " + doperator3, "o3StringFromConf", doperator3.getMyStringProperty()); |
| assertFalse("booleanProperty " + doperator3, doperator3.booleanProperty); |
| |
| OperatorMeta operator4 = dag.getOperatorMeta("operator4"); |
| GenericTestOperator doperator4 = (GenericTestOperator)operator4.getOperator(); |
| assertEquals("myStringProperty " + doperator4, "overrideOperator4", doperator4.getMyStringProperty()); |
| assertEquals("setterOnlyOperator4 " + doperator4, "setterOnlyOperator4", doperator4.propertySetterOnly); |
| assertTrue("booleanProperty " + doperator4, doperator4.booleanProperty); |
| |
| StreamMeta input1 = dag.getStream("inputStream"); |
| assertNotNull(input1); |
| OperatorMeta inputOperator = dag.getOperatorMeta("inputOperator"); |
| Assert.assertEquals("input1 source", inputOperator, input1.getSource().getOperatorMeta()); |
| Set<OperatorMeta> targetNodes = Sets.newHashSet(); |
| for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) { |
| targetNodes.add(targetPort.getOperatorMeta()); |
| } |
| Assert.assertEquals("operator attribute " + inputOperator, 64, (int)inputOperator.getValue(OperatorContext.MEMORY_MB)); |
| Assert.assertEquals("port attribute " + inputOperator, 8, (int)input1.getSource().getValue(PortContext.UNIFIER_LIMIT)); |
| Assert.assertEquals("input1 target ", Sets.newHashSet(dag.getOperatorMeta("operator1"), operator3, operator4), targetNodes); |
| } |
| |
| @Test |
| @SuppressWarnings("UnnecessaryBoxing") |
| public void testAppLevelAttributes() |
| { |
| String appName = "app1"; |
| |
| Properties props = new Properties(); |
| props.put(StreamingApplication.DT_PREFIX + DAG.MASTER_MEMORY_MB.getName(), "123"); |
| props.put(StreamingApplication.DT_PREFIX + DAG.CONTAINER_JVM_OPTIONS.getName(), "-Dlog4j.properties=custom_log4j.properties"); |
| props.put(StreamingApplication.DT_PREFIX + DAG.APPLICATION_PATH.getName(), "/defaultdir"); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + "." + DAG.APPLICATION_PATH.getName(), "/otherdir"); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + "." + DAG.STREAMING_WINDOW_SIZE_MILLIS.getName(), "1000"); |
| |
| LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); |
| dagBuilder.addFromProperties(props, null); |
| |
| LogicalPlan dag = new LogicalPlan(); |
| |
| dagBuilder.populateDAG(dag); |
| |
| dagBuilder.setApplicationConfiguration(dag, appName, null); |
| |
| Assert.assertEquals("", "/otherdir", dag.getValue(DAG.APPLICATION_PATH)); |
| Assert.assertEquals("", Integer.valueOf(123), dag.getValue(DAG.MASTER_MEMORY_MB)); |
| Assert.assertEquals("", Integer.valueOf(1000), dag.getValue(DAG.STREAMING_WINDOW_SIZE_MILLIS)); |
| Assert.assertEquals("", "-Dlog4j.properties=custom_log4j.properties", dag.getValue(DAG.CONTAINER_JVM_OPTIONS)); |
| |
| } |
| |
| @Test |
| @SuppressWarnings("UnnecessaryBoxing") |
| public void testAppLevelProperties() |
| { |
| String appName = "app1"; |
| Properties props = new Properties(); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".testprop1", "10"); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".prop.testprop2", "100"); |
| props.put(StreamingApplication.DT_PREFIX + "application.*.prop.testprop3", "1000"); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".inncls.a", "10000"); |
| LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); |
| dagBuilder.addFromProperties(props, null); |
| |
| LogicalPlan dag = new LogicalPlan(); |
| TestApplication app1Test = new TestApplication(); |
| |
| dagBuilder.setApplicationConfiguration(dag, appName, app1Test); |
| Assert.assertEquals("", Integer.valueOf(10), app1Test.getTestprop1()); |
| Assert.assertEquals("", Integer.valueOf(100), app1Test.getTestprop2()); |
| Assert.assertEquals("", Integer.valueOf(1000), app1Test.getTestprop3()); |
| Assert.assertEquals("", Integer.valueOf(10000), app1Test.getInncls().getA()); |
| } |
| |
| @Test |
| public void testPrepareDAG() |
| { |
| final MutableBoolean appInitialized = new MutableBoolean(false); |
| StreamingApplication app = new StreamingApplication() |
| { |
| @Override |
| public void populateDAG(DAG dag, Configuration conf) |
| { |
| Assert.assertEquals("", "hostname:9090", dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS)); |
| dag.setAttribute(DAG.GATEWAY_CONNECT_ADDRESS, "hostname:9091"); |
| appInitialized.setValue(true); |
| } |
| }; |
| Configuration conf = new Configuration(false); |
| conf.addResource(StramClientUtils.DT_SITE_XML_FILE); |
| LogicalPlanConfiguration pb = new LogicalPlanConfiguration(conf); |
| |
| LogicalPlan dag = new LogicalPlan(); |
| pb.prepareDAG(dag, app, "testconfig"); |
| |
| Assert.assertTrue("populateDAG called", appInitialized.booleanValue()); |
| Assert.assertEquals("populateDAG overrides attribute", "hostname:9091", dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS)); |
| } |
| |
| @Test |
| public void testOperatorConfigurationLookup() |
| { |
| |
| Properties props = new Properties(); |
| |
| // match operator by name |
| props.put(StreamingApplication.DT_PREFIX + "template.matchId1.matchIdRegExp", ".*operator1.*"); |
| props.put(StreamingApplication.DT_PREFIX + "template.matchId1.stringProperty2", "stringProperty2Value-matchId1"); |
| props.put(StreamingApplication.DT_PREFIX + "template.matchId1.nested.property", "nested.propertyValue-matchId1"); |
| |
| // match class name, lower priority |
| props.put(StreamingApplication.DT_PREFIX + "template.matchClass1.matchClassNameRegExp", ".*" + ValidationTestOperator.class.getSimpleName()); |
| props.put(StreamingApplication.DT_PREFIX + "template.matchClass1.stringProperty2", "stringProperty2Value-matchClass1"); |
| |
| // match class name |
| props.put(StreamingApplication.DT_PREFIX + "template.t2.matchClassNameRegExp", ".*" + GenericTestOperator.class.getSimpleName()); |
| props.put(StreamingApplication.DT_PREFIX + "template.t2.myStringProperty", "myStringPropertyValue"); |
| |
| // direct setting |
| props.put(StreamingApplication.DT_PREFIX + "operator.operator3.emitFormat", "emitFormatValue"); |
| |
| LogicalPlan dag = new LogicalPlan(); |
| Operator operator1 = dag.addOperator("operator1", new ValidationTestOperator()); |
| Operator operator2 = dag.addOperator("operator2", new ValidationTestOperator()); |
| Operator operator3 = dag.addOperator("operator3", new GenericTestOperator()); |
| |
| LogicalPlanConfiguration pb = new LogicalPlanConfiguration(new Configuration(false)); |
| LOG.debug("calling addFromProperties"); |
| pb.addFromProperties(props, null); |
| |
| Map<String, String> configProps = pb.getProperties(dag.getMeta(operator1), "appName"); |
| Assert.assertEquals("" + configProps, 2, configProps.size()); |
| Assert.assertEquals("" + configProps, "stringProperty2Value-matchId1", configProps.get("stringProperty2")); |
| Assert.assertEquals("" + configProps, "nested.propertyValue-matchId1", configProps.get("nested.property")); |
| |
| configProps = pb.getProperties(dag.getMeta(operator2), "appName"); |
| Assert.assertEquals("" + configProps, 1, configProps.size()); |
| Assert.assertEquals("" + configProps, "stringProperty2Value-matchClass1", configProps.get("stringProperty2")); |
| |
| configProps = pb.getProperties(dag.getMeta(operator3), "appName"); |
| Assert.assertEquals("" + configProps, 2, configProps.size()); |
| Assert.assertEquals("" + configProps, "myStringPropertyValue", configProps.get("myStringProperty")); |
| Assert.assertEquals("" + configProps, "emitFormatValue", configProps.get("emitFormat")); |
| |
| } |
| |
| @Test |
| public void testSetOperatorProperties() |
| { |
| |
| Configuration conf = new Configuration(false); |
| conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.myStringProperty", "myStringPropertyValue"); |
| conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.stringArrayField", "a,b,c"); |
| conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty.key1", "key1Val"); |
| conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty(key1.dot)", "key1dotVal"); |
| conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty(key2.dot)", "key2dotVal"); |
| |
| LogicalPlan dag = new LogicalPlan(); |
| GenericTestOperator o1 = dag.addOperator("o1", new GenericTestOperator()); |
| ValidationTestOperator o2 = dag.addOperator("o2", new ValidationTestOperator()); |
| |
| LogicalPlanConfiguration pb = new LogicalPlanConfiguration(conf); |
| |
| pb.setOperatorProperties(dag, "testSetOperatorProperties"); |
| Assert.assertEquals("o1.myStringProperty", "myStringPropertyValue", o1.getMyStringProperty()); |
| Assert.assertArrayEquals("o2.stringArrayField", new String[] {"a", "b", "c"}, o2.getStringArrayField()); |
| |
| Assert.assertEquals("o2.mapProperty.key1", "key1Val", o2.getMapProperty().get("key1")); |
| Assert.assertEquals("o2.mapProperty(key1.dot)", "key1dotVal", o2.getMapProperty().get("key1.dot")); |
| Assert.assertEquals("o2.mapProperty(key2.dot)", "key2dotVal", o2.getMapProperty().get("key2.dot")); |
| |
| } |
| |
| @ApplicationAnnotation(name = "AnnotatedAlias") |
| class AnnotatedApplication implements StreamingApplication |
| { |
| |
| @Override |
| public void populateDAG(DAG dag, Configuration conf) |
| { |
| } |
| |
| } |
| |
| @Test |
| public void testAppNameAttribute() |
| { |
| StreamingApplication app = new AnnotatedApplication(); |
| Configuration conf = new Configuration(false); |
| conf.addResource(StramClientUtils.DT_SITE_XML_FILE); |
| |
| LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf); |
| |
| Properties properties = new Properties(); |
| properties.put(StreamingApplication.DT_PREFIX + "application.TestAliasApp.class", app.getClass().getName()); |
| |
| builder.addFromProperties(properties, null); |
| |
| LogicalPlan dag = new LogicalPlan(); |
| String appPath = app.getClass().getName().replace(".", "/") + ".class"; |
| dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_NAME, "testApp"); |
| builder.prepareDAG(dag, app, appPath); |
| |
| Assert.assertEquals("Application name", "testApp", dag.getAttributes().get(com.datatorrent.api.Context.DAGContext.APPLICATION_NAME)); |
| } |
| |
| @Test |
| public void testAppAlias() |
| { |
| StreamingApplication app = new AnnotatedApplication(); |
| Configuration conf = new Configuration(false); |
| conf.addResource(StramClientUtils.DT_SITE_XML_FILE); |
| |
| LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf); |
| |
| Properties properties = new Properties(); |
| properties.put(StreamingApplication.DT_PREFIX + "application.TestAliasApp.class", app.getClass().getName()); |
| |
| builder.addFromProperties(properties, null); |
| |
| LogicalPlan dag = new LogicalPlan(); |
| String appPath = app.getClass().getName().replace(".", "/") + ".class"; |
| builder.prepareDAG(dag, app, appPath); |
| |
| Assert.assertEquals("Application name", "TestAliasApp", dag.getAttributes().get(com.datatorrent.api.Context.DAGContext.APPLICATION_NAME)); |
| } |
| |
| @Test |
| public void testAppAnnotationAlias() |
| { |
| StreamingApplication app = new AnnotatedApplication(); |
| Configuration conf = new Configuration(false); |
| conf.addResource(StramClientUtils.DT_SITE_XML_FILE); |
| |
| LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf); |
| |
| LogicalPlan dag = new LogicalPlan(); |
| String appPath = app.getClass().getName().replace(".", "/") + ".class"; |
| builder.prepareDAG(dag, app, appPath); |
| |
| Assert.assertEquals("Application name", "AnnotatedAlias", dag.getAttributes().get(com.datatorrent.api.Context.DAGContext.APPLICATION_NAME)); |
| } |
| |
| @Test |
| @SuppressWarnings({"UnnecessaryBoxing", "AssertEqualsBetweenInconvertibleTypes"}) |
| public void testOperatorLevelAttributes() |
| { |
| String appName = "app1"; |
| StreamingApplication app = new StreamingApplication() |
| { |
| @Override |
| public void populateDAG(DAG dag, Configuration conf) |
| { |
| dag.addOperator("operator1", GenericTestOperator.class); |
| dag.addOperator("operator2", GenericTestOperator.class); |
| } |
| }; |
| |
| Properties props = new Properties(); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName()); |
| props.put(StreamingApplication.DT_PREFIX + "operator.*." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "2"); |
| props.put(StreamingApplication.DT_PREFIX + "operator.*." + OperatorContext.STATS_LISTENERS.getName(), PartitionLoadWatch.class.getName()); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "20"); |
| |
| LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); |
| dagBuilder.addFromProperties(props, null); |
| |
| String appPath = app.getClass().getName().replace(".", "/") + ".class"; |
| |
| LogicalPlan dag = new LogicalPlan(); |
| dagBuilder.prepareDAG(dag, app, appPath); |
| |
| Assert.assertEquals("", Integer.valueOf(20), dag.getOperatorMeta("operator1").getValue(OperatorContext.APPLICATION_WINDOW_COUNT)); |
| Assert.assertEquals("", Integer.valueOf(2), dag.getOperatorMeta("operator2").getValue(OperatorContext.APPLICATION_WINDOW_COUNT)); |
| Assert.assertEquals("", PartitionLoadWatch.class, dag.getOperatorMeta("operator2").getValue(OperatorContext.STATS_LISTENERS).toArray()[0].getClass()); |
| } |
| |
| @Test |
| @SuppressWarnings({"UnnecessaryBoxing", "AssertEqualsBetweenInconvertibleTypes"}) |
| public void testUnifierLevelAttributes() |
| { |
| String appName = "app1"; |
| final GenericTestOperator operator1 = new GenericTestOperator(); |
| final GenericTestOperator operator2 = new GenericTestOperator(); |
| StreamingApplication app = new StreamingApplication() |
| { |
| @Override |
| public void populateDAG(DAG dag, Configuration conf) |
| { |
| dag.addOperator("operator1", operator1); |
| dag.addOperator("operator2", operator2); |
| dag.addStream("s1", operator1.outport1, operator2.inport1); |
| } |
| }; |
| |
| Properties props = new Properties(); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName()); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.outputport.outport1.unifier." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "2"); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.outputport.outport1.unifier." + OperatorContext.MEMORY_MB.getName(), "512"); |
| LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); |
| dagBuilder.addFromProperties(props, null); |
| |
| String appPath = app.getClass().getName().replace(".", "/") + ".class"; |
| |
| LogicalPlan dag = new LogicalPlan(); |
| dagBuilder.prepareDAG(dag, app, appPath); |
| |
| OperatorMeta om = null; |
| for (Map.Entry<OutputPortMeta, StreamMeta> entry : dag.getOperatorMeta("operator1").getOutputStreams().entrySet()) { |
| if (entry.getKey().getPortName().equals("outport1")) { |
| om = entry.getKey().getUnifierMeta(); |
| } |
| } |
| Assert.assertNotNull(om); |
| Assert.assertEquals("", Integer.valueOf(2), om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT)); |
| Assert.assertEquals("", Integer.valueOf(512), om.getValue(OperatorContext.MEMORY_MB)); |
| } |
| |
| @Test |
| @SuppressWarnings({"UnnecessaryBoxing", "AssertEqualsBetweenInconvertibleTypes"}) |
| public void testModuleUnifierLevelAttributes() |
| { |
| class DummyOperator extends BaseOperator |
| { |
| int prop; |
| |
| public transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>() |
| { |
| @Override |
| public void process(Integer tuple) |
| { |
| LOG.debug(tuple.intValue() + " processed"); |
| output.emit(tuple); |
| } |
| }; |
| public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>(); |
| } |
| |
| class DummyOutputOperator extends BaseOperator |
| { |
| int prop; |
| |
| public transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>() |
| { |
| @Override |
| public void process(Integer tuple) |
| { |
| LOG.debug(tuple.intValue() + " processed"); |
| } |
| }; |
| } |
| |
| class TestUnifierAttributeModule implements Module |
| { |
| public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<>(); |
| public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<>(); |
| |
| @Override |
| public void populateDAG(DAG dag, Configuration conf) |
| { |
| DummyOperator dummyOperator = dag.addOperator("DummyOperator", new DummyOperator()); |
| dag.setOperatorAttribute(dummyOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<DummyOperator>(3)); |
| dag.setUnifierAttribute(dummyOperator.output, OperatorContext.TIMEOUT_WINDOW_COUNT, 2); |
| moduleInput.set(dummyOperator.input); |
| moduleOutput.set(dummyOperator.output); |
| } |
| } |
| |
| StreamingApplication app = new StreamingApplication() |
| { |
| @Override |
| public void populateDAG(DAG dag, Configuration conf) |
| { |
| Module m1 = dag.addModule("TestModule", new TestUnifierAttributeModule()); |
| DummyOutputOperator dummyOutputOperator = dag.addOperator("DummyOutputOperator", new DummyOutputOperator()); |
| dag.addStream("Module To Operator", ((TestUnifierAttributeModule)m1).moduleOutput, dummyOutputOperator.input); |
| } |
| }; |
| |
| String appName = "UnifierApp"; |
| LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); |
| LogicalPlan dag = new LogicalPlan(); |
| dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new MockStorageAgent()); |
| dagBuilder.prepareDAG(dag, app, appName); |
| LogicalPlan.OperatorMeta ometa = dag.getOperatorMeta("TestModule$DummyOperator"); |
| LogicalPlan.OperatorMeta om = null; |
| for (Map.Entry<LogicalPlan.OutputPortMeta, LogicalPlan.StreamMeta> entry : ometa.getOutputStreams().entrySet()) { |
| if (entry.getKey().getPortName().equals("output")) { |
| om = entry.getKey().getUnifierMeta(); |
| } |
| } |
| |
| /* |
| * Verify the attribute value after preparing DAG. |
| */ |
| Assert.assertNotNull(om); |
| Assert.assertEquals("", Integer.valueOf(2), om.getValue(Context.OperatorContext.TIMEOUT_WINDOW_COUNT)); |
| |
| PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext()); |
| List<PTContainer> containers = plan.getContainers(); |
| LogicalPlan.OperatorMeta operatorMeta = null; |
| for (PTContainer container : containers) { |
| List<PTOperator> operators = container.getOperators(); |
| for (PTOperator operator : operators) { |
| if (operator.isUnifier()) { |
| operatorMeta = operator.getOperatorMeta(); |
| } |
| } |
| } |
| |
| /* |
| * Verify attribute after physical plan creation with partitioned operators. |
| */ |
| Assert.assertEquals("", Integer.valueOf(2), operatorMeta.getValue(OperatorContext.TIMEOUT_WINDOW_COUNT)); |
| } |
| |
| @Test |
| public void testOperatorLevelProperties() |
| { |
| String appName = "app1"; |
| final GenericTestOperator operator1 = new GenericTestOperator(); |
| final GenericTestOperator operator2 = new GenericTestOperator(); |
| StreamingApplication app = new StreamingApplication() |
| { |
| @Override |
| public void populateDAG(DAG dag, Configuration conf) |
| { |
| dag.addOperator("operator1", operator1); |
| dag.addOperator("operator2", operator2); |
| } |
| }; |
| |
| Properties props = new Properties(); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName()); |
| props.put(StreamingApplication.DT_PREFIX + "operator.*.myStringProperty", "pv1"); |
| props.put(StreamingApplication.DT_PREFIX + "operator.*.booleanProperty", Boolean.TRUE.toString()); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.myStringProperty", "apv1"); |
| |
| LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); |
| dagBuilder.addFromProperties(props, null); |
| |
| String appPath = app.getClass().getName().replace(".", "/") + ".class"; |
| |
| LogicalPlan dag = new LogicalPlan(); |
| dagBuilder.prepareDAG(dag, app, appPath); |
| |
| Assert.assertEquals("apv1", operator1.getMyStringProperty()); |
| Assert.assertEquals("pv1", operator2.getMyStringProperty()); |
| Assert.assertEquals(true, operator2.isBooleanProperty()); |
| } |
| |
| @Test |
| public void testApplicationLevelParameter() |
| { |
| String appName = "app1"; |
| final GenericTestOperator operator1 = new GenericTestOperator(); |
| final GenericTestOperator operator2 = new GenericTestOperator(); |
| StreamingApplication app = new StreamingApplication() |
| { |
| @Override |
| public void populateDAG(DAG dag, Configuration conf) |
| { |
| dag.addOperator("operator1", operator1); |
| dag.addOperator("operator2", operator2); |
| } |
| }; |
| |
| Properties props = new Properties(); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName()); |
| props.put(StreamingApplication.DT_PREFIX + "operator.*.myStringProperty", "foo ${xyz} bar ${zzz} baz"); |
| props.put(StreamingApplication.DT_PREFIX + "operator.*.booleanProperty", Boolean.TRUE.toString()); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.myStringProperty", "apv1"); |
| |
| LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); |
| |
| Configuration vars = new Configuration(false); |
| vars.set("xyz", "123"); |
| vars.set("zzz", "456"); |
| dagBuilder.addFromProperties(props, vars); |
| |
| String appPath = app.getClass().getName().replace(".", "/") + ".class"; |
| |
| LogicalPlan dag = new LogicalPlan(); |
| dagBuilder.prepareDAG(dag, app, appPath); |
| |
| Assert.assertEquals("apv1", operator1.getMyStringProperty()); |
| Assert.assertEquals("foo 123 bar 456 baz", operator2.getMyStringProperty()); |
| Assert.assertEquals(true, operator2.isBooleanProperty()); |
| } |
| |
| @Test |
| @SuppressWarnings("UnnecessaryBoxing") |
| public void testPortLevelAttributes() |
| { |
| String appName = "app1"; |
| SimpleTestApplication app = new SimpleTestApplication(); |
| |
| Properties props = new Properties(); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName()); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.port.*." + PortContext.QUEUE_CAPACITY.getName(), "" + 16 * 1024); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator2.inputport.inport1." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator2.outputport.outport1." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator3.port.*." + PortContext.QUEUE_CAPACITY.getName(), "" + 16 * 1024); |
| props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator3.inputport.inport2." + PortContext.QUEUE_CAPACITY.getName(), "" + 32 * 1024); |
| |
| LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); |
| dagBuilder.addFromProperties(props, null); |
| |
| String appPath = app.getClass().getName().replace(".", "/") + ".class"; |
| |
| LogicalPlan dag = new LogicalPlan(); |
| dagBuilder.prepareDAG(dag, app, appPath); |
| |
| OperatorMeta om1 = dag.getOperatorMeta("operator1"); |
| Assert.assertEquals("", Integer.valueOf(16 * 1024), om1.getMeta(app.gt1.outport1).getValue(PortContext.QUEUE_CAPACITY)); |
| OperatorMeta om2 = dag.getOperatorMeta("operator2"); |
| Assert.assertEquals("", Integer.valueOf(32 * 1024), om2.getMeta(app.gt2.inport1).getValue(PortContext.QUEUE_CAPACITY)); |
| Assert.assertEquals("", Integer.valueOf(32 * 1024), om2.getMeta(app.gt2.outport1).getValue(PortContext.QUEUE_CAPACITY)); |
| OperatorMeta om3 = dag.getOperatorMeta("operator3"); |
| Assert.assertEquals("", Integer.valueOf(16 * 1024), om3.getMeta(app.gt3.inport1).getValue(PortContext.QUEUE_CAPACITY)); |
| Assert.assertEquals("", Integer.valueOf(32 * 1024), om3.getMeta(app.gt3.inport2).getValue(PortContext.QUEUE_CAPACITY)); |
| } |
| |
| @Test |
| public void testInvalidAttribute() throws Exception |
| { |
| Assert.assertNotSame(0, com.datatorrent.api.Context.DAGContext.serialVersionUID); |
| Attribute<String> attribute = new Attribute<>("", null); |
| |
| Field nameField = Attribute.class.getDeclaredField("name"); |
| nameField.setAccessible(true); |
| nameField.set(attribute, "NOT_CONFIGURABLE"); |
| nameField.setAccessible(false); |
| |
| ContextUtils.addAttribute(com.datatorrent.api.Context.DAGContext.class, attribute); |
| AttributeParseUtils.initialize(); |
| ConfElement.initialize(); |
| |
| // attribute that cannot be configured |
| |
| Properties props = new Properties(); |
| props.put(StreamingApplication.DT_PREFIX + "attr.NOT_CONFIGURABLE", "value"); |
| |
| LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); |
| dagBuilder.addFromProperties(props, null); |
| |
| try { |
| dagBuilder.prepareDAG(new LogicalPlan(), null, ""); |
| Assert.fail("Exception expected"); |
| } catch (Exception e) { |
| Assert.assertThat("Attribute not configurable", e.getMessage(), RegexMatcher.matches("Attribute does not support property configuration: NOT_CONFIGURABLE.*")); |
| } |
| |
| ContextUtils.removeAttribute(com.datatorrent.api.Context.DAGContext.class, attribute); |
| AttributeParseUtils.initialize(); |
| ConfElement.initialize(); |
| |
| // invalid attribute name |
| props = new Properties(); |
| String invalidAttribute = StreamingApplication.DT_PREFIX + "attr.INVALID_NAME"; |
| props.put(invalidAttribute, "value"); |
| |
| try { |
| new LogicalPlanConfiguration(new Configuration(false)).addFromProperties(props, null); |
| Assert.fail("Exception expected"); |
| } catch (Exception e) { |
| LOG.debug("Exception message: {}", e); |
| Assert.assertThat("Invalid attribute name", e.getMessage(), RegexMatcher.matches("Invalid attribute reference: " + invalidAttribute)); |
| } |
| } |
| |
| @Test |
| public void testAttributesCodec() |
| { |
| Assert.assertNotSame(null, new Long[] {com.datatorrent.api.Context.DAGContext.serialVersionUID, OperatorContext.serialVersionUID, PortContext.serialVersionUID}); |
| @SuppressWarnings("unchecked") |
| Set<Class<? extends Context>> contextClasses = Sets.newHashSet(com.datatorrent.api.Context.DAGContext.class, OperatorContext.class, PortContext.class); |
| for (Class<?> c : contextClasses) { |
| for (Attribute<Object> attr : AttributeInitializer.getAttributes(c)) { |
| Assert.assertNotNull(attr.name + " codec", attr.codec); |
| } |
| } |
| } |
| |
| @Test |
| public void testTupleClassAttr() throws Exception |
| { |
| String resourcePath = "/schemaTestTopology.json"; |
| InputStream is = this.getClass().getResourceAsStream(resourcePath); |
| if (is == null) { |
| fail("Could not load " + resourcePath); |
| } |
| StringWriter writer = new StringWriter(); |
| |
| IOUtils.copy(is, writer); |
| JSONObject json = new JSONObject(writer.toString()); |
| |
| Configuration conf = new Configuration(false); |
| |
| LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf); |
| LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson"); |
| dag.validate(); |
| |
| OperatorMeta operator1 = dag.getOperatorMeta("operator1"); |
| assertEquals("operator1.classname", SchemaTestOperator.class, operator1.getOperator().getClass()); |
| |
| StreamMeta input1 = dag.getStream("inputStream"); |
| assertNotNull(input1); |
| for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) { |
| Assert.assertEquals("tuple class name required", TestSchema.class, targetPort.getAttributes().get(PortContext.TUPLE_CLASS)); |
| } |
| } |
| |
| @Test(expected = ValidationException.class) |
| public void testTupleClassAttrValidation() throws Exception |
| { |
| String resourcePath = "/schemaTestTopology.json"; |
| InputStream is = this.getClass().getResourceAsStream(resourcePath); |
| if (is == null) { |
| fail("Could not load " + resourcePath); |
| } |
| StringWriter writer = new StringWriter(); |
| |
| IOUtils.copy(is, writer); |
| JSONObject json = new JSONObject(writer.toString()); |
| |
| //removing schema so that validation fails |
| json.getJSONArray("streams").getJSONObject(0).remove("schema"); |
| Configuration conf = new Configuration(false); |
| |
| LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf); |
| LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson"); |
| |
| dag.validate(); |
| } |
| |
| @Test |
| public void testTestTupleClassAttrSetFromConfig() |
| { |
| Configuration conf = new Configuration(false); |
| conf.set(StreamingApplication.DT_PREFIX + "operator.o2.port.schemaRequiredPort.attr.TUPLE_CLASS", |
| "com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest$TestSchema"); |
| |
| StreamingApplication streamingApplication = new StreamingApplication() |
| { |
| @Override |
| public void populateDAG(DAG dag, Configuration conf) |
| { |
| TestGeneratorInputOperator o1 = dag.addOperator("o1", new TestGeneratorInputOperator()); |
| SchemaTestOperator o2 = dag.addOperator("o2", new SchemaTestOperator()); |
| dag.addStream("stream", o1.outport, o2.schemaRequiredPort); |
| } |
| }; |
| LogicalPlan dag = new LogicalPlan(); |
| LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf); |
| lpc.prepareDAG(dag, streamingApplication, "app"); |
| dag.validate(); |
| } |
| |
| /* |
| * This test and all of the following ambiguous attribute tests verify that when an ambiguous attribute |
| * name is provided, all the corresponding attributes are set. |
| * <br/><br/> |
| * <b>Note:</b> Ambiguous attribute means that when multiple attributes with the same |
| * simple name exist for multiple types of dag elements (like operators and ports). |
| * An example of such attributes are the com.datatorrent.api.Context.OperatorContext.AUTO_RECORD |
| * and com.datatorrent.api.Context.PortContext.AUTO_RECORD. |
| * <br/><br/> |
| * This test should set the attribute on the operators and ports. |
| */ |
| /** |
| * This test checks if an ambiguous DAG attribute does not get set on operators. |
| */ |
| @Test |
| public void testDagAttributeNotSetOnOperator() |
| { |
| dagOperatorAttributeHelper(true); |
| } |
| |
| @Test |
| public void testAmbiguousAttributeSetOnOperatorAndNotDAG() |
| { |
| dagOperatorAttributeHelper(false); |
| } |
| |
| private void dagOperatorAttributeHelper(boolean attrOnDag) |
| { |
| String attributeName = null; |
| |
| if (attrOnDag) { |
| attributeName = DAGContext.CHECKPOINT_WINDOW_COUNT.getSimpleName(); |
| } else { |
| attributeName = OperatorContext.class.getCanonicalName() + LogicalPlanConfiguration.KEY_SEPARATOR + DAGContext.CHECKPOINT_WINDOW_COUNT.getSimpleName(); |
| } |
| |
| Properties props = new Properties(); |
| String propName = StreamingApplication.DT_PREFIX + StramElement.ATTR.getValue() + LogicalPlanConfiguration.KEY_SEPARATOR + attributeName; |
| props.put(propName, "5"); |
| |
| SimpleTestApplicationWithName app = new SimpleTestApplicationWithName(); |
| |
| LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); |
| dagBuilder.addFromProperties(props, null); |
| |
| String appPath = app.getClass().getName().replace(".", "/") + ".class"; |
| |
| LogicalPlan dag = new LogicalPlan(); |
| dagBuilder.prepareDAG(dag, app, appPath); |
| |
| OperatorMeta om1 = dag.getOperatorMeta("operator1"); |
| |
| if (attrOnDag) { |
| Assert.assertNotEquals((Integer)5, om1.getValue(OperatorContext.CHECKPOINT_WINDOW_COUNT)); |
| } else { |
| Assert.assertEquals((Integer)5, om1.getValue(OperatorContext.CHECKPOINT_WINDOW_COUNT)); |
| } |
| } |
| |
| /** |
| * This test should set the attribute on the operators and ports. |
| */ |
| @Test |
| public void testRootLevelAmbiguousAttributeSimple() |
| { |
| testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, |
| StreamingApplication.DT_PREFIX, null, Boolean.TRUE, true, true); |
| } |
| |
| /** |
| * This test should set the attribute on the operators and ports. |
| */ |
| @Test |
| public void testApplicationLevelAmbiguousAttributeSimple() |
| { |
| testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, |
| StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, null, Boolean.TRUE, true, true); |
| } |
| |
| /** |
| * This should only set the attribute on the operator |
| */ |
| @Test |
| public void testOperatorLevelAmbiguousAttributeSimple() |
| { |
| testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, |
| StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, null, Boolean.TRUE, true, false); |
| } |
| |
| /** |
| * This should only set the attribute on the port |
| */ |
| @Test |
| public void testPortLevelAmbiguousAttributeSimple() |
| { |
| testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, |
| StreamingApplication.DT_PREFIX + "port" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, null, Boolean.TRUE, false, true); |
| } |
| |
| /** |
| * This test should set the attribute on the operators and ports. |
| */ |
| @Test |
| public void testRootLevelAmbiguousAttributeComplex() |
| { |
| testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, StreamingApplication.DT_PREFIX, |
| PortContext.class.getCanonicalName(), Boolean.TRUE, false, true); |
| } |
| |
| /** |
| * This test should set the attribute on the operators and ports. |
| */ |
| @Test |
| public void testApplicationLevelAmbiguousAttributeComplex() |
| { |
| testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, |
| StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, PortContext.class.getCanonicalName(), |
| Boolean.TRUE, false, true); |
| } |
| |
| /** |
| * This should only set the attribute on the operator |
| */ |
| @Test |
| public void testOperatorLevelAmbiguousAttributeComplex() |
| { |
| testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, |
| StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, OperatorContext.class.getCanonicalName(), |
| Boolean.TRUE, true, false); |
| } |
| |
| /** |
| * This should only set the attribute on the port |
| */ |
| @Test |
| public void testOperatorLevelAmbiguousAttributeComplex2() |
| { |
| testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, |
| StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, PortContext.class.getCanonicalName(), |
| Boolean.TRUE, false, true); |
| } |
| |
| /** |
| * This should only set the attribute on the port |
| */ |
| @Test |
| public void testPortLevelAmbiguousAttributeComplex() |
| { |
| testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, Context.PortContext.AUTO_RECORD, |
| StreamingApplication.DT_PREFIX + "port" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, PortContext.class.getCanonicalName(), |
| Boolean.TRUE, false, true); |
| } |
| |
| private void testAttributeAmbiguousSimpleHelper(Attribute<?> attributeObjOperator, Attribute<?> attributeObjPort, |
| String root, String contextClass, Object val, boolean operatorSet, |
| boolean portSet) |
| { |
| Properties props = propertiesBuilder(attributeObjOperator.getSimpleName(), root, contextClass, val); |
| |
| simpleAttributeOperatorHelperAssert(attributeObjOperator, props, val, operatorSet); |
| |
| simpleNamePortAssertHelperAssert(attributeObjPort, props, val, portSet); |
| } |
| |
| @Test |
| public void testRootLevelAttributeSimpleNameOperator() |
| { |
| simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, StreamingApplication.DT_PREFIX, true, 4096, true, true); |
| } |
| |
| @Test |
| public void testRootLevelStorageAgentSimpleNameOperator() |
| { |
| MockStorageAgent mockAgent = new MockStorageAgent(); |
| |
| simpleAttributeOperatorHelper(OperatorContext.STORAGE_AGENT, StreamingApplication.DT_PREFIX, true, mockAgent, true, false); |
| } |
| |
| @Test |
| public void testRootLevelAttributeSimpleNameOperatorNoScope() |
| { |
| simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, StreamingApplication.DT_PREFIX, true, 4096, true, false); |
| } |
| |
| @Test |
| public void testApplicationLevelAttributeSimpleNameOperator() |
| { |
| simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, |
| true, 4096, true, true); |
| } |
| |
| private void simpleAttributeOperatorHelper(Attribute<?> attributeObj, String root, boolean simpleName, |
| Object val, boolean set, boolean scope) |
| { |
| Properties props = propertiesBuilderOperator(attributeObj.getSimpleName(), root, simpleName, |
| val, scope); |
| |
| simpleAttributeOperatorHelperAssert(attributeObj, props, val, set); |
| } |
| |
| private void simpleAttributeOperatorHelperAssert(Attribute<?> attributeObj, Properties props, Object val, boolean set) |
| { |
| SimpleTestApplicationWithName app = new SimpleTestApplicationWithName(); |
| |
| LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); |
| dagBuilder.addFromProperties(props, null); |
| |
| String appPath = app.getClass().getName().replace(".", "/") + ".class"; |
| |
| LogicalPlan dag = new LogicalPlan(); |
| dagBuilder.prepareDAG(dag, app, appPath); |
| |
| OperatorMeta om1 = dag.getOperatorMeta("operator1"); |
| |
| if (set) { |
| Assert.assertEquals(val, om1.getValue(attributeObj)); |
| } else { |
| Assert.assertNotEquals(val, om1.getValue(attributeObj)); |
| } |
| |
| OperatorMeta om2 = dag.getOperatorMeta("operator2"); |
| |
| if (set) { |
| Assert.assertEquals(val, om2.getValue(attributeObj)); |
| } else { |
| Assert.assertNotEquals(val, om2.getValue(attributeObj)); |
| } |
| |
| OperatorMeta om3 = dag.getOperatorMeta("operator3"); |
| |
| if (set) { |
| Assert.assertEquals(val, om3.getValue(attributeObj)); |
| } else { |
| Assert.assertNotEquals(val, om3.getValue(attributeObj)); |
| } |
| } |
| |
| /* Port tests */ |
| @Test |
| public void testRootLevelAttributeSimpleNamePort() |
| { |
| simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, |
| true, (Integer)4096, true, true); |
| } |
| |
| @Test |
| public void testRootLevelAttributeSimpleNamePortNoScope() |
| { |
| simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, |
| true, (Integer)4096, true, false); |
| } |
| |
| @Test |
| public void testOperatorLevelAttributeSimpleNamePort() |
| { |
| simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, |
| true, (Integer)4096, true, true); |
| } |
| |
| @Test |
| public void testApplicationLevelAttributeSimpleNamePort() |
| { |
| simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, |
| true, (Integer)4096, true, true); |
| } |
| |
| @Test |
| public void testRootLevelAttributeComplexNamePort() |
| { |
| simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, false, |
| (Integer)4096, true, true); |
| } |
| |
| @Test |
| public void testRootLevelAttributeComplexNamePortNoScope() |
| { |
| simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, false, |
| (Integer)4096, true, false); |
| } |
| |
| @Test |
| public void testOperatorLevelAttributeComplexNamePort() |
| { |
| simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, |
| false, (Integer)4096, true, true); |
| } |
| |
| @Test |
| public void testApplicationLevelAttributeComplexNamePort() |
| { |
| simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, |
| false, (Integer)4096, true, true); |
| } |
| |
| /* Input port tests */ |
| @Test |
| public void testRootLevelAttributeSimpleNameInputPort() |
| { |
| simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, true, |
| (Integer)4096, true); |
| } |
| |
| @Test |
| public void testOperatorLevelAttributeSimpleNameInputPort() |
| { |
| simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, true, (Integer)4096, true); |
| } |
| |
| @Test |
| public void testApplicationLevelAttributeSimpleNameInputPort() |
| { |
| simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, |
| true, (Integer)4096, true); |
| } |
| |
| @Test |
| public void testRootLevelAttributeComplexNameInputPort() |
| { |
| simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, false, (Integer)4096, true); |
| } |
| |
| @Test |
| public void testOperatorLevelAttributeComplexNameInputPort() |
| { |
| simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, false, |
| (Integer)4096, true); |
| } |
| |
| @Test |
| public void testApplicationLevelAttributeComplexNameInputPort() |
| { |
| simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, |
| false, (Integer)4096, true); |
| } |
| |
| /* Output port tests */ |
| @Test |
| public void testRootLevelAttributeSimpleNameOutputPort() |
| { |
| simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, true, (Integer)4096, true); |
| } |
| |
| @Test |
| public void testOperatorLevelAttributeSimpleNameOutputPort() |
| { |
| simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, true, (Integer)4096, true); |
| } |
| |
| @Test |
| public void testApplicationLevelAttributeSimpleNameOutputPort() |
| { |
| simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + |
| "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, true, (Integer)4096, true); |
| } |
| |
| @Test |
| public void testRootLevelAttributeComplexNameOutputPort() |
| { |
| simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX, false, (Integer)4096, true); |
| } |
| |
| @Test |
| public void testOperatorLevelAttributeComplexNameOutputPort() |
| { |
| simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR, false, (Integer)4096, true); |
| } |
| |
| @Test |
| public void testApplicationLevelAttributeComplexNameOutputPort() |
| { |
| simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, StreamingApplication.DT_PREFIX + "application" + LogicalPlanConfiguration.KEY_SEPARATOR + |
| "SimpleTestApp" + LogicalPlanConfiguration.KEY_SEPARATOR, false, (Integer)4096, true); |
| } |
| |
| /* Helpers for building ports */ |
| private void simpleAttributePortHelper(Attribute<?> attributeObj, String root, boolean simpleName, Object val, boolean set, boolean scope) |
| { |
| Properties props = propertiesBuilderPort(attributeObj.getSimpleName(), root, simpleName, val, scope); |
| |
| simpleNamePortAssertHelperAssert(attributeObj, props, val, set); |
| } |
| |
| private void simpleAttributeInputPortHelper(Attribute<?> attributeObj, String root, boolean simpleName, Object val, boolean set) |
| { |
| Properties props = propertiesBuilderInputPort(attributeObj.getSimpleName(), root, simpleName, val); |
| |
| simpleNameInputPortAssertHelperAssert(attributeObj, props, val, set); |
| simpleNameOutputPortAssertHelperAssert(attributeObj, props, val, !set); |
| } |
| |
| private void simpleAttributeOutputPortHelper(Attribute<?> attributeObj, String root, boolean simpleName, Object val, boolean set) |
| { |
| Properties props = propertiesBuilderOutputPort(attributeObj.getSimpleName(), root, simpleName, val); |
| |
| simpleNameOutputPortAssertHelperAssert(attributeObj, props, val, set); |
| simpleNameInputPortAssertHelperAssert(attributeObj, props, val, !set); |
| } |
| |
| private void simpleNamePortAssertHelperAssert(Attribute<?> attributeObj, Properties props, Object val, boolean set) |
| { |
| SimpleTestApplicationWithName app = new SimpleTestApplicationWithName(); |
| |
| LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); |
| dagBuilder.addFromProperties(props, null); |
| |
| String appPath = app.getClass().getName().replace(".", "/") + ".class"; |
| |
| LogicalPlan dag = new LogicalPlan(); |
| dagBuilder.prepareDAG(dag, app, appPath); |
| |
| simpleNamePortAssertHelper(attributeObj, dag, "operator1", val, set); |
| simpleNamePortAssertHelper(attributeObj, dag, "operator2", val, set); |
| simpleNamePortAssertHelper(attributeObj, dag, "operator3", val, set); |
| } |
| |
| private void simpleNameInputPortAssertHelperAssert(Attribute<?> attributeObj, Properties props, Object val, boolean set) |
| { |
| SimpleTestApplicationWithName app = new SimpleTestApplicationWithName(); |
| |
| LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); |
| dagBuilder.addFromProperties(props, null); |
| |
| String appPath = app.getClass().getName().replace(".", "/") + ".class"; |
| |
| LogicalPlan dag = new LogicalPlan(); |
| dagBuilder.prepareDAG(dag, app, appPath); |
| |
| simpleNameInputPortAssertHelper(attributeObj, dag, "operator1", val, set); |
| simpleNameInputPortAssertHelper(attributeObj, dag, "operator2", val, set); |
| simpleNameInputPortAssertHelper(attributeObj, dag, "operator3", val, set); |
| } |
| |
| private void simpleNameOutputPortAssertHelperAssert(Attribute<?> attributeObj, Properties props, Object val, boolean set) |
| { |
| SimpleTestApplicationWithName app = new SimpleTestApplicationWithName(); |
| |
| LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); |
| dagBuilder.addFromProperties(props, null); |
| |
| String appPath = app.getClass().getName().replace(".", "/") + ".class"; |
| |
| LogicalPlan dag = new LogicalPlan(); |
| dagBuilder.prepareDAG(dag, app, appPath); |
| |
| simpleNameOutputPortAssertHelper(attributeObj, dag, "operator1", val, set); |
| simpleNameOutputPortAssertHelper(attributeObj, dag, "operator2", val, set); |
| simpleNameOutputPortAssertHelper(attributeObj, dag, "operator3", val, set); |
| } |
| |
| private void simpleNamePortAssertHelper(Attribute<?> attributeObj, LogicalPlan dag, String operatorName, Object queueCapacity, boolean set) |
| { |
| simpleNameInputPortAssertHelper(attributeObj, dag, operatorName, queueCapacity, set); |
| simpleNameOutputPortAssertHelper(attributeObj, dag, operatorName, queueCapacity, set); |
| } |
| |
| private void simpleNameInputPortAssertHelper(Attribute<?> attributeObj, LogicalPlan dag, String operatorName, Object queueCapacity, boolean set) |
| { |
| OperatorMeta operatorMeta = dag.getOperatorMeta(operatorName); |
| |
| for (InputPortMeta inputPortMeta: operatorMeta.getInputStreams().keySet()) { |
| if (set) { |
| Assert.assertEquals(queueCapacity, inputPortMeta.getValue(attributeObj)); |
| } else { |
| Assert.assertNotEquals(queueCapacity, inputPortMeta.getValue(attributeObj)); |
| } |
| } |
| } |
| |
| private void simpleNameOutputPortAssertHelper(Attribute<?> attributeObj, LogicalPlan dag, String operatorName, Object queueCapacity, boolean set) |
| { |
| OperatorMeta operatorMeta = dag.getOperatorMeta(operatorName); |
| |
| for (OutputPortMeta outputPortMeta: operatorMeta.getOutputStreams().keySet()) { |
| if (set) { |
| Assert.assertEquals(queueCapacity, outputPortMeta.getValue(attributeObj)); |
| } else { |
| Assert.assertNotEquals(queueCapacity, outputPortMeta.getValue(attributeObj)); |
| } |
| } |
| } |
| |
| /* Helpers for building properties */ |
| private Properties propertiesBuilder(String attributeName, String root, String contextClass, Object val) |
| { |
| boolean simpleName = contextClass == null; |
| |
| if (!simpleName) { |
| attributeName = contextClass + LogicalPlanConfiguration.KEY_SEPARATOR + attributeName; |
| } |
| |
| Properties props = new Properties(); |
| String propName = root + StramElement.ATTR.getValue() + LogicalPlanConfiguration.KEY_SEPARATOR + attributeName; |
| props.put(propName, val.toString()); |
| |
| return props; |
| } |
| |
| private Properties propertiesBuilderOperator(String attributeName, String root, boolean simpleName, Object val, boolean addOperator) |
| { |
| String contextClass = simpleName ? null : OperatorContext.class.getCanonicalName(); |
| |
| if (addOperator) { |
| root += "operator" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR; |
| } |
| |
| return propertiesBuilder(attributeName, root, contextClass, val); |
| } |
| |
| private Properties propertiesBuilderPort(String attributeName, String root, boolean simpleName, Object val, boolean addPort) |
| { |
| String contextClass = simpleName ? null : PortContext.class.getCanonicalName(); |
| |
| if (addPort) { |
| root += "port" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR; |
| } |
| |
| return propertiesBuilder(attributeName, root, contextClass, val); |
| } |
| |
| private Properties propertiesBuilderInputPort(String attributeName, String root, boolean simpleName, Object val) |
| { |
| String contextClass = simpleName ? null : PortContext.class.getCanonicalName(); |
| |
| root += "inputport" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR; |
| |
| return propertiesBuilder(attributeName, root, contextClass, val); |
| } |
| |
| private Properties propertiesBuilderOutputPort(String attributeName, String root, boolean simpleName, Object val) |
| { |
| String contextClass = simpleName ? null : PortContext.class.getCanonicalName(); |
| |
| root += "outputport" + LogicalPlanConfiguration.KEY_SEPARATOR + "*" + LogicalPlanConfiguration.KEY_SEPARATOR; |
| |
| return propertiesBuilder(attributeName, root, contextClass, val); |
| } |
| |
| /** |
| * Note: If the same name is given to an Attribute specified in multiple Context classes, then the type of that |
| * Attribute is required to be the same accross all Context classes. This is required because if a simple attribute |
| * name is specified in a properties file at the top level context then that attribute needs to be set in all child configurations. If |
| * there were multiple Attributes specified in different Contexts with the same name, but a different type, then |
| * it would not be possible to set the values of Attributes specified by a simple attribute name in the root |
| * context of a properties file. If this were the case, then adding another Attribute with the same name as a pre-existing Attribute to a new Context |
| * class would be a backwards incompatible change. |
| */ |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testErrorSameAttrMultipleTypes() |
| { |
| //Trigger initialization of attributes for existing Contexts. |
| LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration()); |
| |
| Exception ex = null; |
| try { |
| ContextUtils.buildAttributeMaps(Sets.newHashSet(MockContext1.class, MockContext2.class)); |
| } catch (ValidationException e) { |
| ex = e; |
| } |
| |
| Assert.assertNotNull(ex); |
| Assert.assertTrue(ex.getMessage().contains("is defined with two different types in two different context classes:")); |
| |
| //Clear test data from Context. |
| ContextUtils.initialize(); |
| } |
| |
| private static final Logger logger = LoggerFactory.getLogger(LogicalPlanConfigurationTest.class); |
| |
| public static class TestApplication implements StreamingApplication |
| { |
| Integer testprop1; |
| Integer testprop2; |
| Integer testprop3; |
| TestInnerClass inncls; |
| |
| public TestApplication() |
| { |
| inncls = new TestInnerClass(); |
| } |
| |
| public Integer getTestprop1() |
| { |
| return testprop1; |
| } |
| |
| public void setTestprop1(Integer testprop1) |
| { |
| this.testprop1 = testprop1; |
| } |
| |
| public Integer getTestprop2() |
| { |
| return testprop2; |
| } |
| |
| public void setTestprop2(Integer testprop2) |
| { |
| this.testprop2 = testprop2; |
| } |
| |
| public Integer getTestprop3() |
| { |
| return testprop3; |
| } |
| |
| public void setTestprop3(Integer testprop3) |
| { |
| this.testprop3 = testprop3; |
| } |
| |
| public TestInnerClass getInncls() |
| { |
| return inncls; |
| } |
| |
| public void setInncls(TestInnerClass inncls) |
| { |
| this.inncls = inncls; |
| } |
| |
| @Override |
| public void populateDAG(DAG dag, Configuration conf) |
| { |
| |
| } |
| |
| public class TestInnerClass |
| { |
| Integer a; |
| |
| public Integer getA() |
| { |
| return a; |
| } |
| |
| public void setA(Integer a) |
| { |
| this.a = a; |
| } |
| } |
| } |
| |
| public static class TestStatsListener implements StatsListener |
| { |
| |
| private int intProp; |
| |
| public TestStatsListener() |
| { |
| } |
| |
| @Override |
| public Response processStats(BatchedOperatorStats stats) |
| { |
| return null; |
| } |
| |
| public int getIntProp() |
| { |
| return intProp; |
| } |
| |
| public void setIntProp(int intProp) |
| { |
| this.intProp = intProp; |
| } |
| |
| @Override |
| public int hashCode() |
| { |
| final int prime = 31; |
| int result = 1; |
| result = prime * result + intProp; |
| return result; |
| } |
| |
| @Override |
| public boolean equals(Object obj) |
| { |
| if (this == obj) { |
| return true; |
| } |
| if (obj == null) { |
| return false; |
| } |
| if (getClass() != obj.getClass()) { |
| return false; |
| } |
| TestStatsListener other = (TestStatsListener)obj; |
| if (intProp != other.intProp) { |
| return false; |
| } |
| return true; |
| } |
| } |
| |
| public static class TestSchema |
| { |
| } |
| |
| public static class SimpleTestApplication implements StreamingApplication |
| { |
| public final GenericTestOperator gt1 = new GenericTestOperator(); |
| public final GenericTestOperator gt2 = new GenericTestOperator(); |
| public final GenericTestOperator gt3 = new GenericTestOperator(); |
| |
| @Override |
| public void populateDAG(DAG dag, Configuration conf) |
| { |
| dag.addOperator("operator1", gt1); |
| dag.addOperator("operator2", gt2); |
| dag.addOperator("operator3", gt3); |
| dag.addStream("s1", gt1.outport1, gt2.inport1); |
| dag.addStream("s2", gt2.outport1, gt3.inport1, gt3.inport2); |
| } |
| } |
| |
| public interface MockContext1 extends Context |
| { |
| /** |
| * Number of tuples the poll buffer can cache without blocking the input stream to the port. |
| */ |
| Attribute<Integer> TEST_ATTR = new Attribute<>(1024); |
| |
| @SuppressWarnings("FieldNameHidesFieldInSuperclass") |
| long serialVersionUID = AttributeMap.AttributeInitializer.initialize(MockContext1.class); |
| } |
| |
| public interface MockContext2 extends Context |
| { |
| /** |
| * Number of tuples the poll buffer can cache without blocking the input stream to the port. |
| */ |
| Attribute<Boolean> TEST_ATTR = new Attribute<>(false); |
| |
| @SuppressWarnings("FieldNameHidesFieldInSuperclass") |
| long serialVersionUID = AttributeMap.AttributeInitializer.initialize(MockContext2.class); |
| } |
| |
| @ApplicationAnnotation(name = "SimpleTestApp") |
| public static class SimpleTestApplicationWithName extends SimpleTestApplication |
| { |
| } |
| |
| private static final Logger LOG = LoggerFactory.getLogger(LogicalPlanConfigurationTest.class); |
| } |
| |