/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package com.datatorrent.stram;


import com.datatorrent.api.*;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.stram.api.OperatorDeployInfo;
import com.datatorrent.stram.codec.DefaultStatefulStreamCodec;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.*;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/**
 */
public class StreamCodecTest
{
  private LogicalPlan dag;

  @Rule
  public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();

  @Before
  public void setup()
  {
    dag = StramTestSupport.createDAG(testMeta);
  }

  @Test
  public void testStreamCodec()
  {
    GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
    GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
    GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
    dag.setInputPortAttribute(node3.inport1, Context.PortContext.STREAM_CODEC, new TestStreamCodec());

    dag.addStream("n1n2", node1.outport1, node2.inport1);
    dag.addStream("n2n3", node2.outport1, node3.inport1);

    dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
    StramTestSupport.MemoryStorageAgent msa = new StramTestSupport.MemoryStorageAgent();
    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, msa);

    StreamingContainerManager dnm = new StreamingContainerManager(dag);
    PhysicalPlan plan = dnm.getPhysicalPlan();

    List<PTContainer> containers = plan.getContainers();
    Assert.assertEquals("number containers", 3, containers.size());

    for (int i = 0; i < containers.size(); ++i) {
      StreamingContainerManagerTest.assignContainer(dnm, "container" + (i + 1));
    }

    LogicalPlan.OperatorMeta n1meta = dag.getMeta(node1);
    LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
    LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);

    OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm);

    OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1));
    String id = n1meta.getName() + " " + n1odi.portName;
    Assert.assertEquals("number stream codecs " + id, n1odi.streamCodecs.size(), 1);
    Assert.assertTrue("No user set stream codec", n1odi.streamCodecs.containsValue(null));

    OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, dnm);

    OperatorDeployInfo.InputDeployInfo n2idi = getInputDeployInfo(n2di, n2meta.getMeta(node2.inport1));
    id = n2meta.getName() + " " + n2idi.portName;
    Assert.assertEquals("number stream codecs " + id, n2idi.streamCodecs.size(), 1);
    Assert.assertTrue("No user set stream codec", n2idi.streamCodecs.containsValue(null));

    OperatorDeployInfo.OutputDeployInfo n2odi = getOutputDeployInfo(n2di, n2meta.getMeta(node2.outport1));
    id = n2meta.getName() + " " + n2odi.portName;
    Assert.assertEquals("number stream codecs " + id, n2odi.streamCodecs.size(), 1);
    checkPresentStreamCodec(n3meta, node3.inport1, n2odi.streamCodecs, id, plan);


    OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, dnm);

    OperatorDeployInfo.InputDeployInfo n3idi = getInputDeployInfo(n3di, n3meta.getMeta(node3.inport1));
    id = n3meta.getName() + " " + n3idi.portName;
    Assert.assertEquals("number stream codecs " + id, n3idi.streamCodecs.size(), 1);
    checkPresentStreamCodec(n3meta, node3.inport1, n3idi.streamCodecs, id, plan);
  }

  @Test
  public void testStreamCodecReuse()
  {
    GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
    GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
    GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
    GenericTestOperator node4 = dag.addOperator("node4", GenericTestOperator.class);
    TestStreamCodec serDe = new TestStreamCodec();
    dag.setInputPortAttribute(node4.inport1, Context.PortContext.STREAM_CODEC, serDe);
    GenericTestOperator node5 = dag.addOperator("node5", GenericTestOperator.class);
    dag.setInputPortAttribute(node5.inport1, Context.PortContext.STREAM_CODEC, serDe);
    GenericTestOperator node6 = dag.addOperator("node6", GenericTestOperator.class);
    serDe = new TestStreamCodec();
    dag.setInputPortAttribute(node6.inport1, Context.PortContext.STREAM_CODEC, serDe);

    dag.addStream("n1n2", node1.outport1, node2.inport1);
    dag.addStream("n2n3", node2.outport1, node3.inport1);
    dag.addStream("n3n4", node3.outport1, node4.inport1);
    dag.addStream("n4n5", node4.outport1, node5.inport1);
    dag.addStream("n5n6", node5.outport1, node6.inport1);

    dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
    StramTestSupport.MemoryStorageAgent msa = new StramTestSupport.MemoryStorageAgent();
    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, msa);

    StreamingContainerManager dnm = new StreamingContainerManager(dag);
    PhysicalPlan plan = dnm.getPhysicalPlan();

    List<PTContainer> containers = plan.getContainers();
    Assert.assertEquals("number containers", 6, containers.size());

    for (int i = 0; i < containers.size(); ++i) {
      StreamingContainerManagerTest.assignContainer(dnm, "container" + (i + 1));
    }

    getSingleOperatorDeployInfo(node1, dnm);
    getSingleOperatorDeployInfo(node2, dnm);
    getSingleOperatorDeployInfo(node3, dnm);
    getSingleOperatorDeployInfo(node4, dnm);
    getSingleOperatorDeployInfo(node5, dnm);
    getSingleOperatorDeployInfo(node6, dnm);
    Assert.assertEquals("number of stream codec identifiers", 3, plan.getStreamCodecIdentifiers().size());
  }

  @Test
  public void testDefaultStreamCodec()
  {
    GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
    DefaultCodecOperator node2 = dag.addOperator("node2", DefaultCodecOperator.class);
    DefaultCodecOperator node3 = dag.addOperator("node3", DefaultCodecOperator.class);
    dag.setInputPortAttribute(node3.inportWithCodec, Context.PortContext.STREAM_CODEC, new TestStreamCodec());

    dag.addStream("n1n2", node1.outport1, node2.inportWithCodec);
    dag.addStream("n2n3", node2.outport1, node3.inportWithCodec);

    dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
    StramTestSupport.MemoryStorageAgent msa = new StramTestSupport.MemoryStorageAgent();
    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, msa);

    StreamingContainerManager dnm = new StreamingContainerManager(dag);
    PhysicalPlan plan = dnm.getPhysicalPlan();

    List<PTContainer> containers = plan.getContainers();
    Assert.assertEquals("number containers", 3, containers.size());

    for (int i = 0; i < containers.size(); ++i) {
      StreamingContainerManagerTest.assignContainer(dnm, "container" + (i + 1));
    }

    LogicalPlan.OperatorMeta n1meta = dag.getMeta(node1);
    LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
    LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);

    OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm);

    OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1));
    String id = n1meta.getName() + " " + n1odi.portName;
    Assert.assertEquals("number stream codecs " + id, n1odi.streamCodecs.size(), 1);
    checkPresentStreamCodec(n2meta, node2.inportWithCodec, n1odi.streamCodecs, id, plan);

    OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, dnm);

    OperatorDeployInfo.InputDeployInfo n2idi = getInputDeployInfo(n2di, n2meta.getMeta(node2.inportWithCodec));
    id = n2meta.getName() + " " + n2idi.portName;
    Assert.assertEquals("number stream codecs " + id, n2idi.streamCodecs.size(), 1);
    checkPresentStreamCodec(n2meta, node2.inportWithCodec, n2idi.streamCodecs, id, plan);

    OperatorDeployInfo.OutputDeployInfo n2odi = getOutputDeployInfo(n2di, n2meta.getMeta(node2.outport1));
    id = n2meta.getName() + " " + n2odi.portName;
    Assert.assertEquals("number stream codecs " + id, n2odi.streamCodecs.size(), 1);
    checkPresentStreamCodec(n3meta, node3.inportWithCodec, n2odi.streamCodecs, id, plan);

    OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, dnm);

    OperatorDeployInfo.InputDeployInfo n3idi = getInputDeployInfo(n3di, n3meta.getMeta(node3.inportWithCodec));
    id = n3meta.getName() + " " + n3idi.portName;
    Assert.assertEquals("number stream codecs " + id, n3idi.streamCodecs.size(), 1);
    checkPresentStreamCodec(n3meta, node3.inportWithCodec, n3idi.streamCodecs, id, plan);
  }

  @Test
  public void testPartitioningStreamCodec()
  {
    GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
    GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
    dag.setAttribute(node2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3));
    TestStreamCodec serDe = new TestStreamCodec();
    dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, serDe);

    dag.addStream("n1n2", node1.outport1, node2.inport1);

    dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
    StramTestSupport.MemoryStorageAgent msa = new StramTestSupport.MemoryStorageAgent();
    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, msa);

    StreamingContainerManager dnm = new StreamingContainerManager(dag);
    PhysicalPlan plan = dnm.getPhysicalPlan();

    List<PTContainer> containers = plan.getContainers();
    Assert.assertEquals("number containers", 4, containers.size());

    for (int i = 0; i < containers.size(); ++i) {
      StreamingContainerManagerTest.assignContainer(dnm, "container" + (i + 1));
    }

    LogicalPlan.OperatorMeta n1meta = dag.getMeta(node1);
    LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);

    OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm);

    OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1));
    String id = n1meta.getName() + " " + n1odi.portName;
    Assert.assertEquals("number stream codecs " + id, n1odi.streamCodecs.size(), 1);
    checkPresentStreamCodec(n2meta, node2.inport1, n1odi.streamCodecs, id, plan);


    List<PTOperator> operators = plan.getOperators(n2meta);
    Assert.assertEquals("number operators " + n2meta.getName(), 3, operators.size());
    for (PTOperator operator : operators) {
      OperatorDeployInfo odi = getOperatorDeployInfo(operator, n2meta.getName(), dnm);

      OperatorDeployInfo.InputDeployInfo idi = getInputDeployInfo(odi, n2meta.getMeta(node2.inport1));
      id = n2meta.getName() + " " + idi.portName;
      Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
      checkPresentStreamCodec(n2meta, node2.inport1, idi.streamCodecs, id, plan);
    }
  }

  @Test
  public void testMxNPartitioningStreamCodec()
  {
    GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
    dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
    GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
    dag.setAttribute(node2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3));
    TestStreamCodec serDe = new TestStreamCodec();
    dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, serDe);

    dag.addStream("n1n2", node1.outport1, node2.inport1);

    dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
    StramTestSupport.MemoryStorageAgent msa = new StramTestSupport.MemoryStorageAgent();
    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, msa);

    StreamingContainerManager dnm = new StreamingContainerManager(dag);
    PhysicalPlan plan = dnm.getPhysicalPlan();

    List<PTContainer> containers = plan.getContainers();

    for (int i = 0; i < containers.size(); ++i) {
      StreamingContainerManagerTest.assignContainer(dnm, "container" + (i + 1));
    }

    LogicalPlan.OperatorMeta n1meta = dag.getMeta(node1);
    LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);

    // Sanity check that physical operators have been allocated for n1meta and n2meta
    Assert.assertEquals("number operators " + n1meta.getName(), 2, plan.getOperators(n1meta).size());
    Assert.assertEquals("number operators " + n2meta.getName(), 3, plan.getOperators(n2meta).size());

    for (PTContainer container : containers) {
      List<PTOperator> operators = container.getOperators();
      for (PTOperator operator :operators) {
        if (!operator.isUnifier()) {
          if (operator.getOperatorMeta() == n1meta) {
            OperatorDeployInfo odi = getOperatorDeployInfo(operator, n1meta.getName(), dnm);

            OperatorDeployInfo.OutputDeployInfo otdi = getOutputDeployInfo(odi, n1meta.getMeta(node1.outport1));
            String id = n1meta.getName() + " " + otdi.portName;
            Assert.assertEquals("number stream codecs " + id, otdi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n2meta, node2.inport1, otdi.streamCodecs, id, plan);
          } else if (operator.getOperatorMeta() == n2meta) {
            OperatorDeployInfo odi = getOperatorDeployInfo(operator, n2meta.getName(), dnm);

            OperatorDeployInfo.InputDeployInfo idi = getInputDeployInfo(odi, n2meta.getMeta(node2.inport1));
            String id = n1meta.getName() + " " + idi.portName;
            Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n2meta, node2.inport1, idi.streamCodecs, id, plan);
          }
        } else {
          OperatorDeployInfo odi = getOperatorDeployInfo(operator, operator.getName(), dnm);
          List<OperatorDeployInfo.InputDeployInfo> idis = odi.inputs;
          for (OperatorDeployInfo.InputDeployInfo idi : idis) {
            String id = operator.getName() + " " + idi.portName;
            Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n2meta, node2.inport1, idi.streamCodecs, id, plan);
          }
        }
      }
    }
  }

  @Test
  public void testParallelPartitioningStreamCodec()
  {
    GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
    dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
    GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
    dag.setInputPortAttribute(node2.inport1, Context.PortContext.PARTITION_PARALLEL, true);
    TestStreamCodec serDe = new TestStreamCodec();
    dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, serDe);
    GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
    TestStreamCodec2 serDe2 = new TestStreamCodec2();
    dag.setInputPortAttribute(node3.inport1, Context.PortContext.STREAM_CODEC, serDe2);

    dag.addStream("n1n2", node1.outport1, node2.inport1);
    dag.addStream("n2n3", node2.outport1, node3.inport1);

    dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
    StramTestSupport.MemoryStorageAgent msa = new StramTestSupport.MemoryStorageAgent();
    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, msa);

    StreamingContainerManager dnm = new StreamingContainerManager(dag);
    PhysicalPlan plan = dnm.getPhysicalPlan();

    List<PTContainer> containers = plan.getContainers();

    for (int i = 0; i < containers.size(); ++i) {
      StreamingContainerManagerTest.assignContainer(dnm, "container" + (i+1));
    }

    LogicalPlan.OperatorMeta n1meta = dag.getMeta(node1);
    LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
    LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);

    // Sanity check that physical operators have been allocated for n1meta and n2meta
    Assert.assertEquals("number operators " + n1meta.getName(), 2, plan.getOperators(n1meta).size());
    Assert.assertEquals("number operators " + n2meta.getName(), 2, plan.getOperators(n2meta).size());
    Assert.assertEquals("number operators " + n3meta.getName(), 1, plan.getOperators(n3meta).size());

    for (PTContainer container : containers) {
      List<PTOperator> operators = container.getOperators();
      for (PTOperator operator :operators) {
        if (!operator.isUnifier()) {
          if (operator.getOperatorMeta() == n1meta) {
            OperatorDeployInfo odi = getOperatorDeployInfo(operator, n1meta.getName(), dnm);

            OperatorDeployInfo.OutputDeployInfo otdi = getOutputDeployInfo(odi, n1meta.getMeta(node1.outport1));
            String id = n1meta.getName() + " " + otdi.portName;
            Assert.assertEquals("number stream codecs " + id, otdi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n2meta, node2.inport1, otdi.streamCodecs, id, plan);
          } else if (operator.getOperatorMeta() == n2meta) {
            OperatorDeployInfo odi = getOperatorDeployInfo(operator, n2meta.getName(), dnm);

            OperatorDeployInfo.InputDeployInfo idi = getInputDeployInfo(odi, n2meta.getMeta(node2.inport1));
            String id = n1meta.getName() + " " + idi.portName;
            Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n2meta, node2.inport1, idi.streamCodecs, id, plan);

            OperatorDeployInfo.OutputDeployInfo otdi = getOutputDeployInfo(odi, n2meta.getMeta(node2.outport1));
            id = n2meta.getName() + " " + otdi.portName;
            Assert.assertEquals("number stream codecs " + id, otdi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n3meta, node3.inport1, otdi.streamCodecs, id, plan);
          } else if (operator.getOperatorMeta() == n3meta) {
            OperatorDeployInfo odi = getOperatorDeployInfo(operator, n3meta.getName(), dnm);

            OperatorDeployInfo.InputDeployInfo idi = getInputDeployInfo(odi, n3meta.getMeta(node3.inport1));
            String id = n3meta.getName() + " " + idi.portName;
            Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n3meta, node3.inport1, idi.streamCodecs, id, plan);
          }
        } else {
          OperatorDeployInfo odi = getOperatorDeployInfo(operator, operator.getName(), dnm);
          List<OperatorDeployInfo.InputDeployInfo> idis = odi.inputs;
          for (OperatorDeployInfo.InputDeployInfo idi : idis) {
            String id = operator.getName() + " " + idi.portName;
            Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n3meta, node3.inport1, idi.streamCodecs, id, plan);
          }
          List<OperatorDeployInfo.OutputDeployInfo> otdis = odi.outputs;
          for (OperatorDeployInfo.OutputDeployInfo otdi : otdis) {
            String id = operator.getName() + " " + otdi.portName;
            Assert.assertEquals("number stream codecs " + id, otdi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n3meta, node3.inport1, otdi.streamCodecs, id, plan);
          }
        }
      }
    }
  }

  @Test
  public void testMultipleInputStreamCodec()
  {
    GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
    TestStreamCodec serDe = new TestStreamCodec();
    GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
    dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, serDe);
    GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
    dag.setInputPortAttribute(node3.inport1, Context.PortContext.STREAM_CODEC, serDe);

    dag.addStream("n1n2n3", node1.outport1, node2.inport1, node3.inport1);

    dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
    StramTestSupport.MemoryStorageAgent msa = new StramTestSupport.MemoryStorageAgent();
    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, msa);

    StreamingContainerManager dnm = new StreamingContainerManager(dag);
    PhysicalPlan plan = dnm.getPhysicalPlan();

    List<PTContainer> containers = plan.getContainers();
    Assert.assertEquals("number containers", 3, containers.size());

    for (int i = 0; i < containers.size(); ++i) {
      StreamingContainerManagerTest.assignContainer(dnm, "container" + (i + 1));
    }

    LogicalPlan.OperatorMeta n1meta = dag.getMeta(node1);
    LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
    LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);

    OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm);

    OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1));
    String id = n1meta.getName() + " " + n1odi.portName;
    Assert.assertEquals("number stream codecs " + id, n1odi.streamCodecs.size(), 1);
    checkPresentStreamCodec(n2meta, node2.inport1, n1odi.streamCodecs, id, plan);

    OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, dnm);

    OperatorDeployInfo.InputDeployInfo n2idi = getInputDeployInfo(n2di, n2meta.getMeta(node2.inport1));
    id = n2meta.getName() + " " + n2idi.portName;
    Assert.assertEquals("number stream codecs " + id, n2idi.streamCodecs.size(), 1);
    checkPresentStreamCodec(n2meta, node2.inport1, n2idi.streamCodecs, id, plan);

    OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, dnm);

    OperatorDeployInfo.InputDeployInfo n3idi = getInputDeployInfo(n3di, n3meta.getMeta(node3.inport1));
    id = n3meta.getName() + " " + n3idi.portName;
    Assert.assertEquals("number stream codecs " + id, n3idi.streamCodecs.size(), 1);
    checkPresentStreamCodec(n3meta, node3.inport1, n3idi.streamCodecs, id, plan);
  }

  @Test
  public void testPartitioningMultipleInputStreamCodec()
  {
    GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
    GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
    dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
    TestStreamCodec serDe = new TestStreamCodec();
    dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, serDe);
    GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
    dag.setInputPortAttribute(node3.inport1, Context.PortContext.STREAM_CODEC, serDe);

    dag.addStream("n1n2n3", node1.outport1, node2.inport1, node3.inport1);

    dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
    StramTestSupport.MemoryStorageAgent msa = new StramTestSupport.MemoryStorageAgent();
    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, msa);

    StreamingContainerManager dnm = new StreamingContainerManager(dag);
    PhysicalPlan plan = dnm.getPhysicalPlan();

    List<PTContainer> containers = plan.getContainers();
    Assert.assertEquals("number containers", 5, containers.size());

    for (int i = 0; i < containers.size(); ++i) {
      StreamingContainerManagerTest.assignContainer(dnm, "container" + (i + 1));
    }

    LogicalPlan.OperatorMeta n1meta = dag.getMeta(node1);
    LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
    LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);

    for (PTContainer container : containers) {
      List<PTOperator> operators = container.getOperators();
      for (PTOperator operator :operators) {
        if (!operator.isUnifier()) {
          if (operator.getOperatorMeta() == n1meta) {
            OperatorDeployInfo odi = getOperatorDeployInfo(operator, n1meta.getName(), dnm);

            OperatorDeployInfo.OutputDeployInfo otdi = getOutputDeployInfo(odi, n1meta.getMeta(node1.outport1));
            String id = n1meta.getName() + " " + otdi.portName;
            Assert.assertEquals("number stream codecs " + id, otdi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n2meta, node2.inport1, otdi.streamCodecs, id, plan);
          } else if (operator.getOperatorMeta() == n2meta) {
            OperatorDeployInfo odi = getOperatorDeployInfo(operator, n2meta.getName(), dnm);

            OperatorDeployInfo.InputDeployInfo idi = getInputDeployInfo(odi, n2meta.getMeta(node2.inport1));
            String id = n2meta.getName() + " " + idi.portName;
            Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n2meta, node2.inport1, idi.streamCodecs, id, plan);
          } else if (operator.getOperatorMeta() == n3meta) {
            OperatorDeployInfo odi = getOperatorDeployInfo(operator, n3meta.getName(), dnm);

            OperatorDeployInfo.InputDeployInfo idi = getInputDeployInfo(odi, n3meta.getMeta(node3.inport1));
            String id = n3meta.getName() + " " + idi.portName;
            Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n3meta, node3.inport1, idi.streamCodecs, id, plan);
          }
        } else {
          OperatorDeployInfo odi = getOperatorDeployInfo(operator, operator.getName(), dnm);
          List<OperatorDeployInfo.InputDeployInfo> idis = odi.inputs;
          for (OperatorDeployInfo.InputDeployInfo idi : idis) {
            String id = operator.getName() + " " + idi.portName;
            Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n2meta, node2.inport1, idi.streamCodecs, id, plan);
          }
          List<OperatorDeployInfo.OutputDeployInfo> otdis = odi.outputs;
          for (OperatorDeployInfo.OutputDeployInfo otdi : otdis) {
            String id = operator.getName() + " " + otdi.portName;
            Assert.assertEquals("number stream codecs " + id, otdi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n2meta, node2.inport1, otdi.streamCodecs, id, plan);
          }
        }
      }
    }
  }

  @Test
  public void testMultipleStreamCodecs()
  {
    GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
    GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
    GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
    TestStreamCodec serDe = new TestStreamCodec();
    dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, serDe);
    TestStreamCodec2 serDe2 = new TestStreamCodec2();
    dag.setInputPortAttribute(node3.inport1, Context.PortContext.STREAM_CODEC, serDe2);

    dag.addStream("n1n2n3", node1.outport1, node2.inport1, node3.inport1);

    dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
    StramTestSupport.MemoryStorageAgent msa = new StramTestSupport.MemoryStorageAgent();
    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, msa);

    StreamingContainerManager dnm = new StreamingContainerManager(dag);
    PhysicalPlan plan = dnm.getPhysicalPlan();

    List<PTContainer> containers = plan.getContainers();
    Assert.assertEquals("number containers", 3, containers.size());

    for (int i = 0; i < containers.size(); ++i) {
      StreamingContainerManagerTest.assignContainer(dnm, "container" + (i + 1));
    }

    LogicalPlan.OperatorMeta n1meta = dag.getMeta(node1);
    LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
    LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);

    OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm);

    OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1));
    String id = n1meta.getName() + " " + n1odi.portName;
    Assert.assertEquals("number stream codecs " + id, n1odi.streamCodecs.size(), 2);
    checkPresentStreamCodec(n2meta, node2.inport1, n1odi.streamCodecs, id, plan);
    checkPresentStreamCodec(n3meta, node3.inport1, n1odi.streamCodecs, id, plan);

    OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, dnm);

    OperatorDeployInfo.InputDeployInfo n2idi = getInputDeployInfo(n2di, n2meta.getMeta(node2.inport1));
    id = n2meta.getName() + " " + n2idi.portName;
    Assert.assertEquals("number stream codecs " + id, n2idi.streamCodecs.size(), 1);
    checkPresentStreamCodec(n2meta, node2.inport1, n2idi.streamCodecs, id, plan);

    OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, dnm);

    OperatorDeployInfo.InputDeployInfo n3idi = getInputDeployInfo(n3di, n3meta.getMeta(node3.inport1));
    id = n3meta.getName() + " " + n3idi.portName;
    Assert.assertEquals("number stream codecs " + id, n3idi.streamCodecs.size(), 1);
    checkPresentStreamCodec(n3meta, node3.inport1, n3idi.streamCodecs, id, plan);
  }

  @Test
  public void testPartitioningMultipleStreamCodecs()
  {
    GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
    GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
    GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
    dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
    TestStreamCodec serDe = new TestStreamCodec();
    dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, serDe);
    TestStreamCodec2 serDe2 = new TestStreamCodec2();
    dag.setInputPortAttribute(node3.inport1, Context.PortContext.STREAM_CODEC, serDe2);

    dag.addStream("n1n2n3", node1.outport1, node2.inport1, node3.inport1);

    dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
    StramTestSupport.MemoryStorageAgent msa = new StramTestSupport.MemoryStorageAgent();
    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, msa);

    StreamingContainerManager dnm = new StreamingContainerManager(dag);
    PhysicalPlan plan = dnm.getPhysicalPlan();

    List<PTContainer> containers = plan.getContainers();
    Assert.assertEquals("number containers", 4, containers.size());

    for (int i = 0; i < containers.size(); ++i) {
      StreamingContainerManagerTest.assignContainer(dnm, "container" + (i + 1));
    }

    LogicalPlan.OperatorMeta n1meta = dag.getMeta(node1);
    LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
    LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);

    for (PTContainer container : containers) {
      List<PTOperator> operators = container.getOperators();
      for (PTOperator operator :operators) {
        if (!operator.isUnifier()) {
          if (operator.getOperatorMeta() == n1meta) {
            OperatorDeployInfo odi = getOperatorDeployInfo(operator, n1meta.getName(), dnm);

            OperatorDeployInfo.OutputDeployInfo otdi = getOutputDeployInfo(odi, n1meta.getMeta(node1.outport1));
            String id = n1meta.getName() + " " + otdi.portName;
            Assert.assertEquals("number stream codecs " + id, otdi.streamCodecs.size(), 2);
            checkPresentStreamCodec(n2meta, node2.inport1, otdi.streamCodecs, id, plan);
            checkPresentStreamCodec(n3meta, node3.inport1, otdi.streamCodecs, id, plan);
          } else if (operator.getOperatorMeta() == n2meta) {
            OperatorDeployInfo odi = getOperatorDeployInfo(operator, n2meta.getName(), dnm);

            OperatorDeployInfo.InputDeployInfo idi = getInputDeployInfo(odi, n2meta.getMeta(node2.inport1));
            String id = n2meta.getName() + " " + idi.portName;
            Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n2meta, node2.inport1, idi.streamCodecs, id, plan);
          } else if (operator.getOperatorMeta() == n3meta) {
            OperatorDeployInfo odi = getOperatorDeployInfo(operator, n3meta.getName(), dnm);

            OperatorDeployInfo.InputDeployInfo idi = getInputDeployInfo(odi, n3meta.getMeta(node3.inport1));
            String id = n3meta.getName() + " " + idi.portName;
            Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n3meta, node3.inport1, idi.streamCodecs, id, plan);
          }
        } else {
          OperatorDeployInfo odi = getOperatorDeployInfo(operator, operator.getName(), dnm);
          Assert.assertEquals("unifier outputs " + operator.getName(), 1, operator.getOutputs().size());
          PTOperator.PTOutput out = operator.getOutputs().get(0);
          Assert.assertEquals("unifier sinks " + operator.getName(), 1, out.sinks.size());
          PTOperator.PTInput idInput = out.sinks.get(0);
          LogicalPlan.OperatorMeta idMeta = idInput.target.getOperatorMeta();
          Operator.InputPort<?> idInputPort = null;
          if (idMeta == n2meta) {
            idInputPort = node2.inport1;
          } else if (idMeta == n3meta) {
            idInputPort = node3.inport1;
          }
          List<OperatorDeployInfo.InputDeployInfo> idis = odi.inputs;
          for (OperatorDeployInfo.InputDeployInfo idi : idis) {
            String id = operator.getName() + " " + idi.portName;
            Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
            checkPresentStreamCodec(idMeta, idInputPort, idi.streamCodecs, id, plan);
          }
        }
      }
    }
  }

  @Test
  public void testMxNMultipleStreamCodecs()
  {
    GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
    dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
    GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
    dag.setAttribute(node2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3));
    TestStreamCodec serDe = new TestStreamCodec();
    dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, serDe);
    GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
    dag.setAttribute(node3, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3));
    TestStreamCodec serDe2 = new TestStreamCodec();
    dag.setInputPortAttribute(node3.inport1, Context.PortContext.STREAM_CODEC, serDe2);


    dag.addStream("n1n2n3", node1.outport1, node2.inport1, node3.inport1);

    dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
    StramTestSupport.MemoryStorageAgent msa = new StramTestSupport.MemoryStorageAgent();
    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, msa);

    StreamingContainerManager dnm = new StreamingContainerManager(dag);
    PhysicalPlan plan = dnm.getPhysicalPlan();

    List<PTContainer> containers = plan.getContainers();

    for (int i = 0; i < containers.size(); ++i) {
      StreamingContainerManagerTest.assignContainer(dnm, "container" + (i + 1));
    }

    LogicalPlan.OperatorMeta n1meta = dag.getMeta(node1);
    LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
    LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);

    // Sanity check that physical operators have been allocated for n1meta and n2meta
    Assert.assertEquals("number operators " + n1meta.getName(), 2, plan.getOperators(n1meta).size());
    Assert.assertEquals("number operators " + n2meta.getName(), 3, plan.getOperators(n2meta).size());
    Assert.assertEquals("number operators " + n3meta.getName(), 3, plan.getOperators(n3meta).size());

    checkMxNStreamCodecs(node1, node2, node3, dnm);
  }

  private void checkMxNStreamCodecs(GenericTestOperator node1, GenericTestOperator node2, GenericTestOperator node3, StreamingContainerManager dnm)
  {
    LogicalPlan dag = dnm.getLogicalPlan();
    PhysicalPlan plan = dnm.getPhysicalPlan();
    List<PTContainer> containers = plan.getContainers();
    LogicalPlan.OperatorMeta n1meta = dag.getMeta(node1);
    LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
    LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);
    for (PTContainer container : containers) {
      List<PTOperator> operators = container.getOperators();
      for (PTOperator operator :operators) {
        if (!operator.isUnifier()) {
          if (operator.getOperatorMeta() == n1meta) {
            OperatorDeployInfo odi = getOperatorDeployInfo(operator, n1meta.getName(), dnm);

            OperatorDeployInfo.OutputDeployInfo otdi = getOutputDeployInfo(odi, n1meta.getMeta(node1.outport1));
            String id = n1meta.getName() + " " + otdi.portName;
            Assert.assertEquals("number stream codecs " + id, otdi.streamCodecs.size(), 2);
            checkPresentStreamCodec(n2meta, node2.inport1, otdi.streamCodecs, id, plan);
            checkPresentStreamCodec(n3meta, node3.inport1, otdi.streamCodecs, id, plan);
          } else if (operator.getOperatorMeta() == n2meta) {
            OperatorDeployInfo odi = getOperatorDeployInfo(operator, n2meta.getName(), dnm);

            OperatorDeployInfo.InputDeployInfo idi = getInputDeployInfo(odi, n2meta.getMeta(node2.inport1));
            String id = n2meta.getName() + " " + idi.portName;
            Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n2meta, node2.inport1, idi.streamCodecs, id, plan);
          } else if (operator.getOperatorMeta() == n3meta) {
            OperatorDeployInfo odi = getOperatorDeployInfo(operator, n3meta.getName(), dnm);

            OperatorDeployInfo.InputDeployInfo idi = getInputDeployInfo(odi, n3meta.getMeta(node3.inport1));
            String id = n3meta.getName() + " " + idi.portName;
            Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n3meta, node3.inport1, idi.streamCodecs, id, plan);
          }
        } else {
          OperatorDeployInfo odi = getOperatorDeployInfo(operator, operator.getName(), dnm);
          Assert.assertEquals("unifier outputs " + operator.getName(), 1, operator.getOutputs().size());
          PTOperator.PTOutput out = operator.getOutputs().get(0);
          Assert.assertEquals("unifier sinks " + operator.getName(), 1, out.sinks.size());
          PTOperator.PTInput idInput = out.sinks.get(0);
          LogicalPlan.OperatorMeta idMeta = idInput.target.getOperatorMeta();
          Operator.InputPort<?> idInputPort = null;
          if (idMeta == n2meta) {
            idInputPort = node2.inport1;
          } else if (idMeta == n3meta) {
            idInputPort = node3.inport1;
          }
          List<OperatorDeployInfo.InputDeployInfo> idis = odi.inputs;
          for (OperatorDeployInfo.InputDeployInfo idi : idis) {
            String id = operator.getName() + " " + idi.portName;
            Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
            checkPresentStreamCodec(idMeta, idInputPort, idi.streamCodecs, id, plan);
          }
        }
      }
    }
  }

  @Test
  public void testInlineStreamCodec()
  {
    GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
    GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
    GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
    TestStreamCodec serDe = new TestStreamCodec();
    dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, serDe);
    dag.setInputPortAttribute(node3.inport1, Context.PortContext.STREAM_CODEC, serDe);

    dag.addStream("n1n2n3", node1.outport1, node2.inport1, node3.inport1);

    // Relying on container max count for the manager to layout node1 and node3 in the
    // same container in inline fashion and node2 in a separate container
    dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
    StramTestSupport.MemoryStorageAgent msa = new StramTestSupport.MemoryStorageAgent();
    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, msa);

    StreamingContainerManager dnm = new StreamingContainerManager(dag);
    PhysicalPlan plan = dnm.getPhysicalPlan();

    List<PTContainer> containers = plan.getContainers();
    Assert.assertEquals("number containers", 2, containers.size());

    for (int i = 0; i < containers.size(); ++i) {
      StreamingContainerManagerTest.assignContainer(dnm, "container" + (i + 1));
    }

    LogicalPlan.OperatorMeta n1meta = dag.getMeta(node1);

    LogicalPlan.OperatorMeta nonInlineMeta = null;

    for (int i = 0; i < containers.size(); ++i) {
      PTContainer container = containers.get(i);
      List<PTOperator> operators = container.getOperators();
      if (operators.size() == 1) {
        nonInlineMeta = operators.get(0).getOperatorMeta();
        break;
      }
    }

    Assert.assertNotNull("non inline operator meta is null", nonInlineMeta);
    GenericTestOperator nonInlineOperator = null;
    Operator.InputPort<?> niInputPort = null;

    if (nonInlineMeta.getName().equals("node2")) {
      nonInlineOperator = node2;
      niInputPort = node2.inport1;
    } else if (nonInlineMeta.getName().equals("node3")) {
      nonInlineOperator = node3;
      niInputPort = node3.inport1;
    }

    Assert.assertNotNull("non inline operator is null", nonInlineOperator);

    OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm);

    OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1));
    String id = n1meta.getName() + " " + n1odi.portName;
    Assert.assertEquals("number stream codecs " + id, n1odi.streamCodecs.size(), 1);
    checkPresentStreamCodec(nonInlineMeta, niInputPort, n1odi.streamCodecs, id, plan);

    OperatorDeployInfo odi = getSingleOperatorDeployInfo(nonInlineOperator, dnm);

    OperatorDeployInfo.InputDeployInfo idi = getInputDeployInfo(odi, nonInlineMeta.getMeta(niInputPort));
    id = nonInlineMeta.getName() + " " + idi.portName;
    Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
    checkPresentStreamCodec(nonInlineMeta, niInputPort, idi.streamCodecs, id, plan);

    /*
    OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, node3.getName(), dnm);

    OperatorDeployInfo.InputDeployInfo n3idi = getInputDeployInfo(n3di, n3meta.getMeta(node3.inport1));
    id = n3meta.getName() + " " + n3idi.portName;
    Assert.assertEquals("number stream codecs " + id, n3idi.streamCodecs.size(), 1);
    streamIdentifier.operName = n3meta.getName();
    streamIdentifier.portName = n3meta.getMeta(node3.inport1).getPortName();
    checkStreamCodecInfo(n3idi.streamCodecs, id, streamIdentifier, serDe2);
    */
  }

  @Test
  public void testCascadingStreamCodec()
  {
    GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
    GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
    GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
    dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3));
    dag.setOutputPortAttribute(node1.outport1, Context.PortContext.UNIFIER_LIMIT, 2);
    TestStreamCodec serDe = new TestStreamCodec();
    dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, serDe);
    TestStreamCodec2 serDe2 = new TestStreamCodec2();
    dag.setInputPortAttribute(node3.inport1, Context.PortContext.STREAM_CODEC, serDe2);

    dag.addStream("n1n2n3", node1.outport1, node2.inport1, node3.inport1);

    dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
    StramTestSupport.MemoryStorageAgent msa = new StramTestSupport.MemoryStorageAgent();
    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, msa);

    StreamingContainerManager dnm = new StreamingContainerManager(dag);
    PhysicalPlan plan = dnm.getPhysicalPlan();

    List<PTContainer> containers = plan.getContainers();
    Assert.assertEquals("number containers", 7, containers.size());

    for (int i = 0; i < containers.size(); ++i) {
      StreamingContainerManagerTest.assignContainer(dnm, "container" + (i + 1));
    }

    LogicalPlan.OperatorMeta n1meta = dag.getMeta(node1);
    LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
    LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);

    for (PTContainer container : containers) {
      List<PTOperator> operators = container.getOperators();
      for (PTOperator operator :operators) {
        if (!operator.isUnifier()) {
          if (operator.getOperatorMeta() == n1meta) {
            OperatorDeployInfo odi = getOperatorDeployInfo(operator, n1meta.getName(), dnm);

            OperatorDeployInfo.OutputDeployInfo otdi = getOutputDeployInfo(odi, n1meta.getMeta(node1.outport1));
            String id = n1meta.getName() + " " + otdi.portName;
            Assert.assertEquals("number stream codecs " + id, otdi.streamCodecs.size(), 2);
            checkPresentStreamCodec(n2meta, node2.inport1, otdi.streamCodecs, id, plan);
            checkPresentStreamCodec(n3meta, node3.inport1, otdi.streamCodecs, id, plan);
          } else if (operator.getOperatorMeta() == n2meta) {
            OperatorDeployInfo odi = getOperatorDeployInfo(operator, n2meta.getName(), dnm);

            OperatorDeployInfo.InputDeployInfo idi = getInputDeployInfo(odi, n2meta.getMeta(node2.inport1));
            String id = n2meta.getName() + " " + idi.portName;
            Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n2meta, node2.inport1, idi.streamCodecs, id, plan);
          } else if (operator.getOperatorMeta() == n3meta) {
            OperatorDeployInfo odi = getOperatorDeployInfo(operator, n3meta.getName(), dnm);

            OperatorDeployInfo.InputDeployInfo idi = getInputDeployInfo(odi, n3meta.getMeta(node3.inport1));
            String id = n3meta.getName() + " " + idi.portName;
            Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
            checkPresentStreamCodec(n3meta, node3.inport1, idi.streamCodecs, id, plan);
          }
        } else {
          OperatorDeployInfo odi = getOperatorDeployInfo(operator, operator.getName(), dnm);
          Assert.assertEquals("unifier outputs " + operator.getName(), 1, operator.getOutputs().size());
          PTOperator.PTOutput out = operator.getOutputs().get(0);
          Assert.assertEquals("unifier sinks " + operator.getName(), 1, out.sinks.size());
          PTOperator.PTInput idInput = out.sinks.get(0);
          LogicalPlan.OperatorMeta idMeta = StreamingContainerAgent.getIdentifyingInputPortMeta(idInput).getOperatorWrapper();
          Operator.InputPort<?> idInputPort = null;
          if (idMeta == n2meta) {
            idInputPort = node2.inport1;
          } else if (idMeta == n3meta) {
            idInputPort = node3.inport1;
          }
          List<OperatorDeployInfo.InputDeployInfo> idis = odi.inputs;
          for (OperatorDeployInfo.InputDeployInfo idi : idis) {
            String id = operator.getName() + " " + idi.portName;
            Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
            checkPresentStreamCodec(idMeta, idInputPort, idi.streamCodecs, id, plan);
          }
        }
      }
    }
  }

  @Test
  public void testDynamicPartitioningStreamCodec()
  {
    GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
    dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
    dag.setAttribute(node1, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener) new PartitioningTest.PartitionLoadWatch()));
    GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
    dag.setAttribute(node2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3));
    dag.setAttribute(node2, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
    TestStreamCodec serDe = new TestStreamCodec();
    dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, serDe);
    GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
    dag.setAttribute(node3, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3));
    dag.setAttribute(node3, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
    TestStreamCodec serDe2 = new TestStreamCodec();
    dag.setInputPortAttribute(node3.inport1, Context.PortContext.STREAM_CODEC, serDe2);


    dag.addStream("n1n2n3", node1.outport1, node2.inport1, node3.inport1);

    dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
    StramTestSupport.MemoryStorageAgent msa = new StramTestSupport.MemoryStorageAgent();
    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, msa);

    StreamingContainerManager dnm = new StreamingContainerManager(dag);
    PhysicalPlan plan = dnm.getPhysicalPlan();

    List<PTContainer> containers = plan.getContainers();
    int lastId = 0;

    for (int i = 0; i < containers.size(); ++i) {
      StreamingContainerManagerTest.assignContainer(dnm, "container" + (++lastId));
    }

    LogicalPlan.OperatorMeta n1meta = dag.getMeta(node1);
    LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
    LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);

    // Sanity check that physical operators have been allocated for n1meta and n2meta
    Assert.assertEquals("number operators " + n1meta.getName(), 2, plan.getOperators(n1meta).size());
    Assert.assertEquals("number operators " + n2meta.getName(), 3, plan.getOperators(n2meta).size());
    Assert.assertEquals("number operators " + n3meta.getName(), 3, plan.getOperators(n3meta).size());

    // Test Dynamic change
    // for M x N partition
    // scale down N (node2) from 3 to 2 and then from 2 to 1
    for (int i = 0; i < 2; i++) {
      markAllOperatorsActive(plan);
      List<PTOperator> ptos =  plan.getOperators(n2meta);
      for (PTOperator ptOperator : ptos) {
        PartitioningTest.PartitionLoadWatch.put(ptOperator, -1);
        plan.onStatusUpdate(ptOperator);
      }

      dnm.processEvents();
      lastId = assignNewContainers(dnm, lastId);

      List<PTOperator> operators = plan.getOperators(n2meta);
      List<PTOperator> upstreamOperators = new ArrayList<PTOperator>();
      for (PTOperator operator : operators) {
        upstreamOperators.addAll(operator.upstreamMerge.values());
        /*
        OperatorDeployInfo odi = getOperatorDeployInfo(operator, n2meta.getName(), dnm);

        OperatorDeployInfo.InputDeployInfo idi = getInputDeployInfo(odi, n2meta.getMeta(node2.inport1));
        String id = n2meta.getName() + " " + idi.portName;
        Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
        checkPresentStreamCodec(n2meta, node2.inport1, idi.streamCodecs, id, plan);
        */
      }

      Assert.assertEquals("Number of unifiers ", 2-i, upstreamOperators.size());
    }

    // scale down N (node3) from 3 to 2 and then from 2 to 1
    for (int i = 0; i < 2; i++) {
      markAllOperatorsActive(plan);
      List<PTOperator> ptos =  plan.getOperators(n3meta);
      for (PTOperator ptOperator : ptos) {
        PartitioningTest.PartitionLoadWatch.put(ptOperator, -1);
        plan.onStatusUpdate(ptOperator);
      }

      dnm.processEvents();
      lastId = assignNewContainers(dnm, lastId);

      List<PTOperator> operators = plan.getOperators(n3meta);
      List<PTOperator> upstreamOperators = new ArrayList<PTOperator>();
      for (PTOperator operator : operators) {
        upstreamOperators.addAll(operator.upstreamMerge.values());
      }

      Assert.assertEquals("Number of unifiers ", 2-i, upstreamOperators.size());
    }

    // Check that different unifiers were created for the two output operators with different codecs
    // even though there are only one partition of each one
    Set<PTOperator> unifiers = getUnifiers(plan);
    Assert.assertEquals("Number of unifiers ", 2, unifiers.size());

    // scale up N (node2) from 1 to 2 and then from 2 to 3
    for (int i = 0; i < 2; i++) {
      markAllOperatorsActive(plan);
      PTOperator o2p1 = plan.getOperators(n2meta).get(0);

      PartitioningTest.PartitionLoadWatch.put(o2p1, 1);

      plan.onStatusUpdate(o2p1);

      dnm.processEvents();

      lastId = assignNewContainers(dnm, lastId);

      List<PTOperator> operators = plan.getOperators(n2meta);
      List<PTOperator> upstreamOperators = new ArrayList<PTOperator>();
      for (PTOperator operator : operators) {
        upstreamOperators.addAll(operator.upstreamMerge.values());
        /*
        if (operator.getState() != PTOperator.State.ACTIVE) {
          OperatorDeployInfo odi = getOperatorDeployInfo(operator, n2meta.getName(), dnm);

          OperatorDeployInfo.InputDeployInfo idi = getInputDeployInfo(odi, n2meta.getMeta(node2.inport1));
          String id = n2meta.getName() + " " + idi.portName;
          Assert.assertEquals("number stream codecs " + id, idi.streamCodecs.size(), 1);
          checkPresentStreamCodec(n2meta, node2.inport1, idi.streamCodecs, id, plan);
        }
        */
      }

      Assert.assertEquals("Number of unifiers ", 2+i, upstreamOperators.size());
    }

    // scale down M to 1
    {
      markAllOperatorsActive(plan);
      for (PTOperator o1p : plan.getOperators(n1meta)) {
        PartitioningTest.PartitionLoadWatch.put(o1p, -1);
        plan.onStatusUpdate(o1p);
      }

      dnm.processEvents();

      lastId = assignNewContainers(dnm, lastId);

      unifiers = getUnifiers(plan);
      Assert.assertEquals("Number of unifiers", 0, unifiers.size());
    }

    // scale up M to 2
    {
      markAllOperatorsActive(plan);
      for (PTOperator o1p : plan.getOperators(n1meta)) {
        PartitioningTest.PartitionLoadWatch.put(o1p, 1);
        plan.onStatusUpdate(o1p);
      }

      dnm.processEvents();

      lastId = assignNewContainers(dnm, lastId);

      unifiers = getUnifiers(plan);
      Assert.assertEquals("Number of unifiers", 4, unifiers.size());
    }
  }

  private int assignNewContainers(StreamingContainerManager dnm, int lastId)
  {
    PhysicalPlan plan = dnm.getPhysicalPlan();
    List<PTContainer> containers = plan.getContainers();

    int numPending = 0;

    for (PTContainer container : containers) {
      if (container.getState() == PTContainer.State.NEW) {
        numPending++;
      }
    }

    for (int j = 0; j < numPending; ++j) {
      StreamingContainerManagerTest.assignContainer(dnm, "container" + (++lastId));
    }
    return lastId;
  }


  private void markAllOperatorsActive(PhysicalPlan plan)
  {
    for (PTContainer container : plan.getContainers()) {
      for (PTOperator operator : container.getOperators()) {
        operator.setState(PTOperator.State.ACTIVE);
      }
    }
  }

  private Set<PTOperator> getUnifiers(PhysicalPlan plan)
  {
    Set<PTOperator> unifiers = new HashSet<PTOperator>();
    for (PTContainer container : plan.getContainers()) {
      for (PTOperator operator : container.getOperators()) {
        if (operator.isUnifier()) {
          unifiers.add(operator);
        }
      }
    }
    return unifiers;
  }

  private void checkPresentStreamCodec(LogicalPlan.OperatorMeta operatorMeta, Operator.InputPort<?> inputPort,
                                       Map<Integer, StreamCodec<?>> streamCodecs,
                                       String id, PhysicalPlan plan )
  {
    StreamCodec<?> streamCodecInfo = operatorMeta.getMeta(inputPort).getStreamCodec();
    Assert.assertTrue("stream codec identifier not present" + id, isStrCodecPresent(streamCodecInfo, plan));
    Integer streamCodecIdentifier = plan.getStreamCodecIdentifier(streamCodecInfo);
    checkPresentStreamCodecInfo(streamCodecs, id, streamCodecIdentifier, streamCodecInfo);
  }

  private void checkPresentStreamCodecInfo(Map<Integer, StreamCodec<?>> streamCodecs, String id,
                                           Integer streamCodecIdentifier, StreamCodec<?> streamCodecInfo)
  {
    StreamCodec<?> opStreamCodecInfo = streamCodecs.get(streamCodecIdentifier);
    Assert.assertNotNull("stream codec info null " + id, opStreamCodecInfo);
    Assert.assertEquals("stream codec not same " + id, opStreamCodecInfo, streamCodecInfo);
  }

  private OperatorDeployInfo getSingleOperatorDeployInfo(Operator oper, StreamingContainerManager scm)
  {
    LogicalPlan dag = scm.getLogicalPlan();
    String id = dag.getMeta(oper).toString();
    PhysicalPlan plan = scm.getPhysicalPlan();
    List<PTOperator> operators = plan.getOperators(dag.getMeta(oper));
    Assert.assertEquals("number of operators " + id, 1, operators.size());

    PTOperator operator = operators.get(0);
    return getOperatorDeployInfo(operator, id, scm);
  }

  private OperatorDeployInfo getOperatorDeployInfo(PTOperator operator, String id, StreamingContainerManager scm)
  {
    String containerId = operator.getContainer().getExternalId();
    System.out.println("Container id " + containerId);

    List<OperatorDeployInfo> cdi = StreamingContainerManagerTest.getDeployInfo(scm.getContainerAgent(containerId));

    OperatorDeployInfo odi = null;
    for (OperatorDeployInfo iodi : cdi) {
      if (iodi.id == operator.getId()) {
        odi = iodi;
        break;
      }
    }

    Assert.assertNotNull(id + " assigned to " + containerId + " deploy info is null", odi );
    return odi;
  }

  private OperatorDeployInfo.InputDeployInfo getInputDeployInfo(OperatorDeployInfo odi, LogicalPlan.InputPortMeta portMeta)
  {
    OperatorDeployInfo.InputDeployInfo idi = null;
    List<OperatorDeployInfo.InputDeployInfo> inputs = odi.inputs;
    for (OperatorDeployInfo.InputDeployInfo input : inputs) {
      if (input.portName.equals(portMeta.getPortName())) {
        idi = input;
        break;
      }
    }
    Assert.assertNotNull("input deploy info " + portMeta.getPortName(), idi);
    return idi;
  }

  private OperatorDeployInfo.OutputDeployInfo getOutputDeployInfo(OperatorDeployInfo odi, LogicalPlan.OutputPortMeta portMeta)
  {
    OperatorDeployInfo.OutputDeployInfo otdi = null;
    List<OperatorDeployInfo.OutputDeployInfo> outputs = odi.outputs;
    for (OperatorDeployInfo.OutputDeployInfo output : outputs) {
      if (output.portName.equals(portMeta.getPortName())) {
        otdi = output;
        break;
      }
    }
    Assert.assertNotNull("output deploy info " + portMeta.getPortName(), otdi);
    return otdi;
  }

  // For tests so that it doesn't trigger assignment of a new id
  public boolean isStrCodecPresent(StreamCodec<?> streamCodecInfo, PhysicalPlan plan)
  {
    return plan.getStreamCodecIdentifiers().containsKey(streamCodecInfo);
  }

  public static class TestStreamCodec extends DefaultStatefulStreamCodec<Object>
  {

    @Override
    public int getPartition(Object o)
    {
      return o.hashCode()/2;
    }
  }

  public static class TestStreamCodec2 extends DefaultStatefulStreamCodec<Object>
  {

    @Override
    public int getPartition(Object o)
    {
      return o.hashCode()/3;
    }
  }

  public static class DefaultTestStreamCodec  extends DefaultStatefulStreamCodec<Object> implements Serializable
  {
    private static final long serialVersionUID = 1L;
  }

  public static class DefaultCodecOperator extends GenericTestOperator
  {
    private static final DefaultTestStreamCodec codec = new DefaultTestStreamCodec();

    @InputPortFieldAnnotation(optional = true)
    final public transient InputPort<Object> inportWithCodec = new DefaultInputPort<Object>() {
      @Override
      public StreamCodec<Object> getStreamCodec()
      {
        return codec;
      }

      @Override
      final public void process(Object payload)
      {
      }

    };
  }
}
