/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package com.datatorrent.stram.plan.logical.module;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;

import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Module;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;

public class TestModuleExpansion
{
  public static class DummyInputOperator extends BaseOperator implements InputOperator
  {
    private int inputOperatorProp = 0;

    Random r = new Random();
    public transient DefaultOutputPort<Integer> out = new DefaultOutputPort<>();

    @Override
    public void emitTuples()
    {
      out.emit(r.nextInt());
    }

    public int getInputOperatorProp()
    {
      return inputOperatorProp;
    }

    public void setInputOperatorProp(int inputOperatorProp)
    {
      this.inputOperatorProp = inputOperatorProp;
    }
  }

  public static class DummyOperator extends BaseOperator
  {
    private int operatorProp = 0;

    @OutputPortFieldAnnotation(optional = true)
    public final transient DefaultOutputPort<Integer> out1 = new DefaultOutputPort<>();

    @OutputPortFieldAnnotation(optional = true)
    public final transient DefaultOutputPort<Integer> out2 = new DefaultOutputPort<>();

    @InputPortFieldAnnotation(optional = true)
    public final transient DefaultInputPort<Integer> in = new DefaultInputPort<Integer>()
    {
      @Override
      public void process(Integer tuple)
      {
        out1.emit(tuple);
        out2.emit(tuple);
      }
    };

    public int getOperatorProp()
    {
      return operatorProp;
    }

    public void setOperatorProp(int operatorProp)
    {
      this.operatorProp = operatorProp;
    }
  }

  public static class TestPartitioner implements Partitioner<DummyOperator>, Serializable
  {
    @Override
    public Collection<Partition<DummyOperator>> definePartitions(Collection<Partition<DummyOperator>> partitions, PartitioningContext context)
    {
      ArrayList<Partition<DummyOperator>> lst = new ArrayList();
      lst.add(partitions.iterator().next());
      return lst;
    }

    @Override
    public void partitioned(Map<Integer, Partition<DummyOperator>> partitions)
    {

    }
  }

  public static class Level1Module implements Module
  {
    private int level1ModuleProp = 0;

    @InputPortFieldAnnotation(optional = true)
    public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>();
    @OutputPortFieldAnnotation(optional = true)
    public final transient ProxyOutputPort<Integer> mOut = new ProxyOutputPort<>();
    private int memory = 512;
    private int portMemory = 2;

    @Override
    public void populateDAG(DAG dag, Configuration conf)
    {
      DummyOperator o1 = dag.addOperator("O1", new DummyOperator());
      o1.setOperatorProp(level1ModuleProp);

      /** set various attribute on the operator for testing */
      Attribute.AttributeMap attr = dag.getMeta(o1).getAttributes();
      attr.put(OperatorContext.MEMORY_MB, memory);
      attr.put(OperatorContext.APPLICATION_WINDOW_COUNT, 2);
      attr.put(OperatorContext.LOCALITY_HOST, "host1");
      attr.put(OperatorContext.PARTITIONER, new TestPartitioner());
      attr.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 120);
      attr.put(OperatorContext.STATELESS, true);
      attr.put(OperatorContext.SPIN_MILLIS, 20);

      dag.setInputPortAttribute(o1.in, Context.PortContext.BUFFER_MEMORY_MB, portMemory);
      mIn.set(o1.in);
      mOut.set(o1.out1);
    }

    public int getLevel1ModuleProp()
    {
      return level1ModuleProp;
    }

    public void setLevel1ModuleProp(int level1ModuleProp)
    {
      this.level1ModuleProp = level1ModuleProp;
    }

    public int getMemory()
    {
      return memory;
    }

    public void setMemory(int memory)
    {
      this.memory = memory;
    }

    public int getPortMemory()
    {
      return portMemory;
    }

    public void setPortMemory(int portMemory)
    {
      this.portMemory = portMemory;
    }
  }

  public static class Level2ModuleA implements Module
  {
    private int level2ModuleAProp1 = 0;
    private int level2ModuleAProp2 = 0;
    private int level2ModuleAProp3 = 0;

    @InputPortFieldAnnotation(optional = true)
    public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>();

    @OutputPortFieldAnnotation(optional = true)
    public final transient ProxyOutputPort<Integer> mOut1 = new ProxyOutputPort<>();

    @OutputPortFieldAnnotation(optional = true)
    public final transient ProxyOutputPort<Integer> mOut2 = new ProxyOutputPort<>();

    @Override
    public void populateDAG(DAG dag, Configuration conf)
    {
      Level1Module m1 = dag.addModule("M1", new Level1Module());
      m1.setMemory(1024);
      m1.setPortMemory(1);
      m1.setLevel1ModuleProp(level2ModuleAProp1);

      Level1Module m2 = dag.addModule("M2", new Level1Module());
      m2.setMemory(2048);
      m2.setPortMemory(2);
      m2.setLevel1ModuleProp(level2ModuleAProp2);

      DummyOperator o1 = dag.addOperator("O1", new DummyOperator());
      o1.setOperatorProp(level2ModuleAProp3);

      dag.addStream("M1_M2&O1", m1.mOut, m2.mIn, o1.in).setLocality(DAG.Locality.CONTAINER_LOCAL);

      mIn.set(m1.mIn);
      mOut1.set(m2.mOut);
      mOut2.set(o1.out1);
    }

    public int getLevel2ModuleAProp1()
    {
      return level2ModuleAProp1;
    }

    public void setLevel2ModuleAProp1(int level2ModuleAProp1)
    {
      this.level2ModuleAProp1 = level2ModuleAProp1;
    }

    public int getLevel2ModuleAProp2()
    {
      return level2ModuleAProp2;
    }

    public void setLevel2ModuleAProp2(int level2ModuleAProp2)
    {
      this.level2ModuleAProp2 = level2ModuleAProp2;
    }

    public int getLevel2ModuleAProp3()
    {
      return level2ModuleAProp3;
    }

    public void setLevel2ModuleAProp3(int level2ModuleAProp3)
    {
      this.level2ModuleAProp3 = level2ModuleAProp3;
    }
  }

  public static class Level2ModuleB implements Module
  {
    private int level2ModuleBProp1 = 0;
    private int level2ModuleBProp2 = 0;
    private int level2ModuleBProp3 = 0;

    @InputPortFieldAnnotation(optional = true)
    public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>();

    @OutputPortFieldAnnotation(optional = true)
    public final transient ProxyOutputPort<Integer> mOut1 = new ProxyOutputPort<>();

    @OutputPortFieldAnnotation(optional = true)
    public final transient ProxyOutputPort<Integer> mOut2 = new ProxyOutputPort<>();

    @Override
    public void populateDAG(DAG dag, Configuration conf)
    {
      DummyOperator o1 = dag.addOperator("O1", new DummyOperator());
      o1.setOperatorProp(level2ModuleBProp1);

      Level1Module m1 = dag.addModule("M1", new Level1Module());
      m1.setMemory(4096);
      m1.setPortMemory(3);
      m1.setLevel1ModuleProp(level2ModuleBProp2);

      DummyOperator o2 = dag.addOperator("O2", new DummyOperator());
      o2.setOperatorProp(level2ModuleBProp3);

      dag.addStream("O1_M1", o1.out1, m1.mIn).setLocality(DAG.Locality.THREAD_LOCAL);
      dag.addStream("O1_O2", o1.out2, o2.in).setLocality(DAG.Locality.RACK_LOCAL);

      mIn.set(o1.in);
      mOut1.set(m1.mOut);
      mOut2.set(o2.out1);
    }

    public int getLevel2ModuleBProp1()
    {
      return level2ModuleBProp1;
    }

    public void setLevel2ModuleBProp1(int level2ModuleBProp1)
    {
      this.level2ModuleBProp1 = level2ModuleBProp1;
    }

    public int getLevel2ModuleBProp2()
    {
      return level2ModuleBProp2;
    }

    public void setLevel2ModuleBProp2(int level2ModuleBProp2)
    {
      this.level2ModuleBProp2 = level2ModuleBProp2;
    }

    public int getLevel2ModuleBProp3()
    {
      return level2ModuleBProp3;
    }

    public void setLevel2ModuleBProp3(int level2ModuleBProp3)
    {
      this.level2ModuleBProp3 = level2ModuleBProp3;
    }
  }

  public static class Level3Module implements Module
  {

    public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>();
    public final transient ProxyOutputPort<Integer> mOut1 = new ProxyOutputPort<>();
    public final transient ProxyOutputPort<Integer> mOut2 = new ProxyOutputPort<>();

    @Override
    public void populateDAG(DAG dag, Configuration conf)
    {
      DummyOperator op = dag.addOperator("O1", new DummyOperator());
      Level2ModuleB m1 = dag.addModule("M1", new Level2ModuleB());
      Level1Module m2 = dag.addModule("M2", new Level1Module());

      dag.addStream("s1", op.out1, m1.mIn);
      dag.addStream("s2", op.out2, m2.mIn);

      mIn.set(op.in);
      mOut1.set(m1.mOut1);
      mOut2.set(m2.mOut);
    }
  }

  public static class NestedModuleApp implements StreamingApplication
  {
    @Override
    public void populateDAG(DAG dag, Configuration conf)
    {
      DummyInputOperator o1 = dag.addOperator("O1", new DummyInputOperator());
      o1.setInputOperatorProp(1);

      DummyOperator o2 = dag.addOperator("O2", new DummyOperator());
      o2.setOperatorProp(2);

      Level2ModuleA ma = dag.addModule("Ma", new Level2ModuleA());
      ma.setLevel2ModuleAProp1(11);
      ma.setLevel2ModuleAProp2(12);
      ma.setLevel2ModuleAProp3(13);

      Level2ModuleB mb = dag.addModule("Mb", new Level2ModuleB());
      mb.setLevel2ModuleBProp1(21);
      mb.setLevel2ModuleBProp2(22);
      mb.setLevel2ModuleBProp3(23);

      Level2ModuleA mc = dag.addModule("Mc", new Level2ModuleA());
      mc.setLevel2ModuleAProp1(31);
      mc.setLevel2ModuleAProp2(32);
      mc.setLevel2ModuleAProp3(33);

      Level2ModuleB md = dag.addModule("Md", new Level2ModuleB());
      md.setLevel2ModuleBProp1(41);
      md.setLevel2ModuleBProp2(42);
      md.setLevel2ModuleBProp3(43);

      Level3Module me = dag.addModule("Me", new Level3Module());

      dag.addStream("O1_O2", o1.out, o2.in, me.mIn);
      dag.addStream("O2_Ma", o2.out1, ma.mIn);
      dag.addStream("Ma_Mb", ma.mOut1, mb.mIn);
      dag.addStream("Ma_Md", ma.mOut2, md.mIn);
      dag.addStream("Mb_Mc", mb.mOut2, mc.mIn);
    }
  }

  @Test
  public void testModuleExtreme()
  {
    StreamingApplication app = new NestedModuleApp();
    Configuration conf = new Configuration(false);
    LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
    LogicalPlan dag = new LogicalPlan();
    lpc.prepareDAG(dag, app, "ModuleApp");

    dag.validate();
    validateTopLevelOperators(dag);
    validateTopLevelStreams(dag);
    validatePublicMethods(dag);
  }

  private void validateTopLevelStreams(LogicalPlan dag)
  {
    List<String> streamNames = new ArrayList<>();
    for (LogicalPlan.StreamMeta streamMeta : dag.getAllStreams()) {
      streamNames.add(streamMeta.getName());
    }

    Assert.assertTrue(streamNames.contains(componentName("Mb", "O1_M1")));
    Assert.assertTrue(streamNames.contains("O2_Ma"));
    Assert.assertTrue(streamNames.contains("Mb_Mc"));
    Assert.assertTrue(streamNames.contains(componentName("Mb", "O1_O2")));
    Assert.assertTrue(streamNames.contains(componentName("Ma", "M1_M2&O1")));
    Assert.assertTrue(streamNames.contains(componentName("Md", "O1_M1")));
    Assert.assertTrue(streamNames.contains(componentName("Ma_Md")));
    Assert.assertTrue(streamNames.contains(componentName("Mc", "M1_M2&O1")));
    Assert.assertTrue(streamNames.contains(componentName("Md", "O1_O2")));
    Assert.assertTrue(streamNames.contains("Ma_Mb"));
    Assert.assertTrue(streamNames.contains("O1_O2"));

    validateSeperateStream(dag, componentName("Mb", "O1_M1"), componentName("Mb", "O1"),
        componentName("Mb", "M1", "O1"));
    validateSeperateStream(dag, "O2_Ma", "O2", componentName("Ma", "M1", "O1"));
    validateSeperateStream(dag, "Mb_Mc", componentName("Mb", "O2"), componentName("Mc", "M1", "O1"));
    validateSeperateStream(dag, componentName("Mb", "O1_O2"), componentName("Mb", "O1"), componentName("Mb", "O2"));
    validateSeperateStream(dag, componentName("Ma", "M1_M2&O1"), componentName("Ma", "M1", "O1"),
        componentName("Ma", "O1"), componentName("Ma", "M2", "O1"));
    validateSeperateStream(dag, componentName("Md", "O1_M1"), componentName("Md", "O1"),
        componentName("Md", "M1", "O1"));
    validateSeperateStream(dag, "Ma_Md", componentName("Ma", "O1"), componentName("Md", "O1"));
    validateSeperateStream(dag, componentName("Mc", "M1_M2&O1"), componentName("Mc", "M1", "O1"),
        componentName("Mc", "O1"), componentName("Mc", "M2", "O1"));
    validateSeperateStream(dag, componentName("Md", "O1_O2"), componentName("Md", "O1"), componentName("Md", "O2"));
    validateSeperateStream(dag, "Ma_Mb", componentName("Ma", "M2", "O1"), componentName("Mb", "O1"));
    validateSeperateStream(dag, "O1_O2", "O1", "O2", componentName("Me", "O1"));

    /* Verify that stream locality is set correctly in top level dag */
    validateStreamLocality(dag, componentName("Mc", "M1_M2&O1"), DAG.Locality.CONTAINER_LOCAL);
    validateStreamLocality(dag, componentName("Mb", "O1_M1"), DAG.Locality.THREAD_LOCAL);
    validateStreamLocality(dag, componentName("Mb", "O1_O2"), DAG.Locality.RACK_LOCAL);
    validateStreamLocality(dag, componentName("Mc", "M1_M2&O1"), DAG.Locality.CONTAINER_LOCAL);
    validateStreamLocality(dag, componentName("Md", "O1_M1"), DAG.Locality.THREAD_LOCAL);
    validateStreamLocality(dag, componentName("Me", "s1"), null);

  }

  private void validateSeperateStream(LogicalPlan dag, String streamName, String inputOperatorName,
      String... outputOperatorNames)
  {
    LogicalPlan.StreamMeta streamMeta = dag.getStream(streamName);
    String sourceName = streamMeta.getSource().getOperatorMeta().getName();

    List<String> sinksName = new ArrayList<>();
    for (LogicalPlan.InputPortMeta inputPortMeta : streamMeta.getSinks()) {
      sinksName.add(inputPortMeta.getOperatorMeta().getName());
    }

    Assert.assertTrue(inputOperatorName.equals(sourceName));
    Assert.assertEquals(outputOperatorNames.length, sinksName.size());

    for (String outputOperatorName : outputOperatorNames) {
      Assert.assertTrue(sinksName.contains(outputOperatorName));
    }
  }

  private void validateTopLevelOperators(LogicalPlan dag)
  {
    List<String> operatorNames = new ArrayList<>();
    for (LogicalPlan.OperatorMeta operatorMeta : dag.getAllOperators()) {
      operatorNames.add(operatorMeta.getName());
    }
    Assert.assertTrue(operatorNames.contains("O1"));
    Assert.assertTrue(operatorNames.contains("O2"));
    Assert.assertTrue(operatorNames.contains(componentName("Ma", "M1", "O1")));
    Assert.assertTrue(operatorNames.contains(componentName("Ma", "M2", "O1")));
    Assert.assertTrue(operatorNames.contains(componentName("Ma", "O1")));
    Assert.assertTrue(operatorNames.contains(componentName("Mb", "O1")));
    Assert.assertTrue(operatorNames.contains(componentName("Mb", "M1", "O1")));
    Assert.assertTrue(operatorNames.contains(componentName("Mb", "O2")));
    Assert.assertTrue(operatorNames.contains(componentName("Mc", "M1", "O1")));
    Assert.assertTrue(operatorNames.contains(componentName("Mc", "M2", "O1")));
    Assert.assertTrue(operatorNames.contains(componentName("Mc", "O1")));
    Assert.assertTrue(operatorNames.contains(componentName("Md", "O1")));
    Assert.assertTrue(operatorNames.contains(componentName("Md", "M1", "O1")));
    Assert.assertTrue(operatorNames.contains(componentName("Md", "O2")));

    validateOperatorPropertyValue(dag, "O1", 1);
    validateOperatorPropertyValue(dag, "O2", 2);
    validateOperatorPropertyValue(dag, componentName("Ma", "M1", "O1"), 11);
    validateOperatorPropertyValue(dag, componentName("Ma", "M2", "O1"), 12);
    validateOperatorPropertyValue(dag, componentName("Ma", "O1"), 13);
    validateOperatorPropertyValue(dag, componentName("Mb", "O1"), 21);
    validateOperatorPropertyValue(dag, componentName("Mb", "M1", "O1"), 22);
    validateOperatorPropertyValue(dag, componentName("Mb", "O2"), 23);
    validateOperatorPropertyValue(dag, componentName("Mc", "M1", "O1"), 31);
    validateOperatorPropertyValue(dag, componentName("Mc", "M2", "O1"), 32);
    validateOperatorPropertyValue(dag, componentName("Mc", "O1"), 33);
    validateOperatorPropertyValue(dag, componentName("Md", "O1"), 41);
    validateOperatorPropertyValue(dag, componentName("Md", "M1", "O1"), 42);
    validateOperatorPropertyValue(dag, componentName("Md", "O2"), 43);

    validateOperatorParent(dag, "O1", null);
    validateOperatorParent(dag, "O2", null);
    validateOperatorParent(dag, componentName("Ma", "M1", "O1"), componentName("Ma", "M1"));
    validateOperatorParent(dag, componentName("Ma", "M2", "O1"), componentName("Ma", "M2"));
    validateOperatorParent(dag, componentName("Ma", "O1"), "Ma");
    validateOperatorParent(dag, componentName("Mb", "O1"), "Mb");
    validateOperatorParent(dag, componentName("Mb", "M1", "O1"), componentName("Mb", "M1"));
    validateOperatorParent(dag, componentName("Mb", "O2"), "Mb");
    validateOperatorParent(dag, componentName("Mc", "M1", "O1"), componentName("Mc", "M1"));
    validateOperatorParent(dag, componentName("Mc", "M2", "O1"), componentName("Mc", "M2"));
    validateOperatorParent(dag, componentName("Mc", "O1"), "Mc");
    validateOperatorParent(dag, componentName("Md", "O1"), "Md");
    validateOperatorParent(dag, componentName("Md", "M1", "O1"), componentName("Md", "M1"));
    validateOperatorParent(dag, componentName("Md", "O2"), "Md");

    validateOperatorAttribute(dag, componentName("Ma", "M1", "O1"), 1024);
    validateOperatorAttribute(dag, componentName("Ma", "M2", "O1"), 2048);
    validateOperatorAttribute(dag, componentName("Mb", "M1", "O1"), 4096);
    validateOperatorAttribute(dag, componentName("Mc", "M1", "O1"), 1024);
    validateOperatorAttribute(dag, componentName("Mc", "M2", "O1"), 2048);

    validatePortAttribute(dag, componentName("Ma", "M1", "O1"), 1);
    validatePortAttribute(dag, componentName("Ma", "M2", "O1"), 2);
    validatePortAttribute(dag, componentName("Mb", "M1", "O1"), 3);
    validatePortAttribute(dag, componentName("Mc", "M1", "O1"), 1);
    validatePortAttribute(dag, componentName("Mc", "M2", "O1"), 2);
  }

  private void validateOperatorParent(LogicalPlan dag, String operatorName, String parentModuleName)
  {
    LogicalPlan.OperatorMeta operatorMeta = dag.getOperatorMeta(operatorName);
    if (parentModuleName == null) {
      Assert.assertNull(operatorMeta.getModuleName());
    } else {
      Assert.assertTrue(parentModuleName.equals(operatorMeta.getModuleName()));
    }
  }

  private void validateOperatorPropertyValue(LogicalPlan dag, String operatorName, int expectedValue)
  {
    LogicalPlan.OperatorMeta oMeta = dag.getOperatorMeta(operatorName);
    if (operatorName.equals("O1")) {
      DummyInputOperator operator = (DummyInputOperator)oMeta.getOperator();
      Assert.assertEquals(expectedValue, operator.getInputOperatorProp());
    } else {
      DummyOperator operator = (DummyOperator)oMeta.getOperator();
      Assert.assertEquals(expectedValue, operator.getOperatorProp());
    }
  }

  private void validatePublicMethods(LogicalPlan dag)
  {
    // Logical dag contains 4 modules added on top level.
    List<String> moduleNames = new ArrayList<>();
    for (LogicalPlan.ModuleMeta moduleMeta : dag.getAllModules()) {
      moduleNames.add(moduleMeta.getName());
    }
    Assert.assertTrue(moduleNames.contains("Ma"));
    Assert.assertTrue(moduleNames.contains("Mb"));
    Assert.assertTrue(moduleNames.contains("Mc"));
    Assert.assertTrue(moduleNames.contains("Md"));
    Assert.assertTrue(moduleNames.contains("Me"));
    Assert.assertEquals("Number of modules are 5", 5, dag.getAllModules().size());

  }

  private static String componentName(String... names)
  {
    if (names.length == 0) {
      return "";
    }
    StringBuilder sb = new StringBuilder(names[0]);
    for (int i = 1; i < names.length; i++) {
      sb.append(LogicalPlan.MODULE_NAMESPACE_SEPARATOR);
      sb.append(names[i]);
    }
    return sb.toString();
  }

  /**
   * Generate a conflict, Add a top level operator with name "m1_O1",
   * and add a module "m1" which will populate operator "O1", causing name conflict with
   * top level operator.
   */
  @Test(expected = java.lang.IllegalArgumentException.class)
  public void conflictingNamesWithExpandedModule()
  {
    Configuration conf = new Configuration(false);
    LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
    LogicalPlan dag = new LogicalPlan();
    DummyInputOperator in = dag.addOperator(componentName("m1", "O1"), new DummyInputOperator());
    Level2ModuleA module = dag.addModule("m1", new Level2ModuleA());
    dag.addStream("s1", in.out, module.mIn);
    lpc.prepareDAG(dag, null, "ModuleApp");
    dag.validate();
  }

  /**
   * Module and Operator with same name is not allowed in a DAG, to prevent properties
   * conflict.
   */
  @Test(expected = java.lang.IllegalArgumentException.class)
  public void conflictingNamesWithOperator1()
  {
    Configuration conf = new Configuration(false);
    LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
    LogicalPlan dag = new LogicalPlan();
    DummyInputOperator in = dag.addOperator("M1", new DummyInputOperator());
    Level2ModuleA module = dag.addModule("M1", new Level2ModuleA());
    dag.addStream("s1", in.out, module.mIn);
    lpc.prepareDAG(dag, null, "ModuleApp");
    dag.validate();
  }

  /**
   * Module and Operator with same name is not allowed in a DAG, to prevent properties
   * conflict.
   */
  @Test(expected = java.lang.IllegalArgumentException.class)
  public void conflictingNamesWithOperator2()
  {
    Configuration conf = new Configuration(false);
    LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
    LogicalPlan dag = new LogicalPlan();
    Level2ModuleA module = dag.addModule("M1", new Level2ModuleA());
    DummyInputOperator in = dag.addOperator("M1", new DummyInputOperator());
    dag.addStream("s1", in.out, module.mIn);
    lpc.prepareDAG(dag, null, "ModuleApp");
    dag.validate();
  }

  /**
   * Verify attributes populated on DummyOperator from Level1 module
   */
  private void validateOperatorAttribute(LogicalPlan dag, String name, int memory)
  {
    LogicalPlan.OperatorMeta oMeta = dag.getOperatorMeta(name);
    Attribute.AttributeMap attrs = oMeta.getAttributes();
    Assert.assertEquals((int)attrs.get(OperatorContext.MEMORY_MB), memory);
    Assert.assertEquals("Application window id is 2 ", (int)attrs.get(OperatorContext.APPLICATION_WINDOW_COUNT), 2);
    Assert.assertEquals("Locality host is host1", attrs.get(OperatorContext.LOCALITY_HOST), "host1");
    Assert.assertEquals(attrs.get(OperatorContext.PARTITIONER).getClass(), TestPartitioner.class);
    Assert.assertEquals("Checkpoint window count ", (int)attrs.get(OperatorContext.CHECKPOINT_WINDOW_COUNT), 120);
    Assert.assertEquals("Operator is stateless ", attrs.get(OperatorContext.STATELESS), true);
    Assert.assertEquals("SPIN MILLIS is set to 20 ", (int)attrs.get(OperatorContext.SPIN_MILLIS), 20);

  }

  /**
   * Validate attribute set on the port of DummyOperator in Level1Module
   */
  private void validatePortAttribute(LogicalPlan dag, String name, int memory)
  {
    LogicalPlan.InputPortMeta imeta = dag.getOperatorMeta(name).getInputStreams().keySet().iterator().next();
    Assert.assertEquals(memory, (int)imeta.getAttributes().get(Context.PortContext.BUFFER_MEMORY_MB));
  }

  /**
   * validate if stream attributes are copied or not
   */
  private void validateStreamLocality(LogicalPlan dag, String name, DAG.Locality locality)
  {
    LogicalPlan.StreamMeta meta = dag.getStream(name);
    Assert.assertTrue("Metadata for stream is available ", meta != null);
    Assert.assertEquals("Locality is " + locality, meta.getLocality(), locality);
  }

  @Test
  public void testLoadFromPropertiesFile() throws IOException
  {
    Properties props = new Properties();
    String resourcePath = "/testModuleTopology.properties";
    InputStream is = this.getClass().getResourceAsStream(resourcePath);
    if (is == null) {
      throw new RuntimeException("Could not load " + resourcePath);
    }
    props.load(is);
    LogicalPlanConfiguration pb = new LogicalPlanConfiguration(new Configuration(false))
        .addFromProperties(props, null);

    LogicalPlan dag = new LogicalPlan();
    pb.populateDAG(dag);
    pb.prepareDAG(dag, null, "testApplication");
    dag.validate();
    validateTopLevelOperators(dag);
    validateTopLevelStreams(dag);
    validatePublicMethods(dag);
  }

  @Test
  public void testLoadFromJson() throws Exception
  {
    String resourcePath = "/testModuleTopology.json";
    InputStream is = this.getClass().getResourceAsStream(resourcePath);
    if (is == null) {
      throw new RuntimeException("Could not load " + resourcePath);
    }
    StringWriter writer = new StringWriter();

    IOUtils.copy(is, writer);
    JSONObject json = new JSONObject(writer.toString());

    Configuration conf = new Configuration(false);
    conf.set(StreamingApplication.DT_PREFIX + "operator.operator3.prop.myStringProperty", "o3StringFromConf");

    LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
    LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
    planConf.prepareDAG(dag, null, "testApplication");
    dag.validate();
    validateTopLevelOperators(dag);
    validateTopLevelStreams(dag);
    validatePublicMethods(dag);
  }

}
