/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package com.datatorrent.stram.plan;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.DAG.OperatorMeta;
import com.datatorrent.api.DAG.StreamMeta;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Partitioner.PartitionKeys;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.PartitioningTest;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.StreamingContainerManagerTest;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.logical.DefaultKryoStreamCodec;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.support.StramTestSupport;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class StreamPersistanceTests
{
  static final Logger logger = LoggerFactory.getLogger(StreamPersistanceTests.class);

  public static class TestReceiverOperator extends BaseOperator
  {
    public static volatile List<Object> results = new ArrayList<>();
    public volatile AtomicInteger size = new AtomicInteger(0);

    @InputPortFieldAnnotation(optional = true)
    public final transient InputPort<Object> inport = new DefaultInputPort<Object>()
    {
      @Override
      public final void process(Object t)
      {
        results.add(t);
        size.incrementAndGet();
      }
    };
  }

  public static class TestPersistanceOperator implements Operator
  {
    public static volatile List<Object> results = new ArrayList<>();

    @InputPortFieldAnnotation(optional = true)
    public final transient InputPort<Object> inport = new DefaultInputPort<Object>()
    {
      @Override
      public final void process(Object t)
      {
        results.add(t);
      }
    };

    @Override
    public void setup(OperatorContext context)
    {
    }

    @Override
    public void teardown()
    {
    }

    @Override
    public void beginWindow(long windowId)
    {
    }

    @Override
    public void endWindow()
    {
    }

  }

  public static class PartitionedTestPersistanceOperator extends TestPersistanceOperator implements Partitioner<PartitionedTestPersistanceOperator>
  {
    @Override
    public Collection definePartitions(Collection partitions, PartitioningContext context)
    {
      Collection<Partition> newPartitions = new ArrayList<>();

      int partitionMask = 0x03;

      // No partitioning done so far..
      // Single partition with mask 0x03 and set {0}
      // First partition
      PartitionedTestPersistanceOperator newInstance = new PartitionedTestPersistanceOperator();
      Partition partition = new DefaultPartition<>(newInstance);
      PartitionKeys value = new PartitionKeys(partitionMask, Sets.newHashSet(0));
      partition.getPartitionKeys().put(inport, value);
      newPartitions.add(partition);

      return newPartitions;
    }

    @Override
    public void partitioned(Map partitions)
    {
      // TODO Auto-generated method stub
    }
  }

  public class TestOperatorWithOutputPorts extends BaseOperator
  {

    @InputPortFieldAnnotation(optional = true)
    public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>()
    {
      @Override
      public final void process(Object t)
      {
        // Do nothing: Dummy operator for test
      }
    };

    @InputPortFieldAnnotation(optional = false)
    public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<>();
  }

  public class TestOperatorWithMultipleNonOptionalInputPorts extends BaseOperator
  {

    @InputPortFieldAnnotation(optional = false)
    public final transient DefaultInputPort<Object> inputPort1 = new DefaultInputPort<Object>()
    {
      @Override
      public final void process(Object t)
      {
        // Do nothing: Dummy operator for test
      }
    };

    @InputPortFieldAnnotation(optional = false)
    public final transient DefaultInputPort<Object> inputPort2 = new DefaultInputPort<Object>()
    {
      @Override
      public final void process(Object t)
      {
        // Do nothing: Dummy operator for test
      }
    };

    public final transient DefaultInputPort<Object> inputPort3 = new DefaultInputPort<Object>()
    {
      @Override
      public final void process(Object t)
      {
        // Do nothing: Dummy operator for test
      }
    };
  }

  public class TestOperatorWithoutInputPorts extends BaseOperator
  {
  }

  private LogicalPlan dag;

  @Before
  public void setup()
  {
    dag = StramTestSupport.createDAG(testMeta);
  }

  @Test
  public void testPersistStreamOperatorIsAdded()
  {
    TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
    GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
    TestReceiverOperator persister = new TestReceiverOperator();
    StreamMeta stream = dag.addStream("Stream1", input1.outport, x.inport1);
    stream.persistUsing("Stream1_persister",persister, persister.inport);

    // Check operator is added to dag
    OperatorMeta persistOperatorMeta = dag.getOperatorMeta("Stream1_persister");
    assertEquals("Persist operator not added to dag ", persister, persistOperatorMeta.getOperator());
    dag.validate();
  }

  @Test
  public void testPersistStreamOperatorIsAddedPerSink()
  {
    TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
    GenericTestOperator x1 = dag.addOperator("x1", new GenericTestOperator());
    GenericTestOperator x2 = dag.addOperator("x2", new GenericTestOperator());
    GenericTestOperator x3 = dag.addOperator("x3", new GenericTestOperator());

    TestReceiverOperator persister = new TestReceiverOperator();
    TestReceiverOperator persister1 = new TestReceiverOperator();
    TestReceiverOperator persister2 = new TestReceiverOperator();

    StreamMeta stream = dag.addStream("Stream1", input1.outport, x1.inport1, x2.inport1, x3.inport1);

    stream.persistUsing("Stream1_persister", persister, persister.inport);
    stream.persistUsing("Stream1_x1_persister", persister1, persister1.inport, x1.inport1);
    stream.persistUsing("Stream1_x2_persister", persister2, persister2.inport, x2.inport1);

    // Check 3 persist operators are added to dag
    OperatorMeta persistOperatorMeta = dag.getOperatorMeta("Stream1_persister");
    assertEquals("Persist operator not added to dag ", persister, persistOperatorMeta.getOperator());

    persistOperatorMeta = dag.getOperatorMeta("Stream1_x1_persister");
    assertEquals("Persist operator not added to dag ", persister1, persistOperatorMeta.getOperator());

    persistOperatorMeta = dag.getOperatorMeta("Stream1_x2_persister");
    assertEquals("Persist operator not added to dag ", persister2, persistOperatorMeta.getOperator());

    dag.validate();
  }

  @Test
  public void testaddStreamThrowsExceptionOnInvalidLoggerType()
  {
    // Test Logger with non-optional output ports
    TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
    GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
    StreamMeta stream = dag.addStream("Stream1", input1.outport, x.inport1);

    TestOperatorWithOutputPorts persister = new TestOperatorWithOutputPorts();
    try {
      stream.persistUsing("persister", persister, persister.inputPort);
      Assert.fail("should throw Illegal argument exception: Persist operator has non optional output ports");
    } catch (IllegalArgumentException e) {
      logger.debug(e.getMessage());
    }

    // Test already added operator passed
    TestOperatorWithOutputPorts persister1 = new TestOperatorWithOutputPorts();
    try {
      stream.persistUsing("Stream1_persister", persister1, persister1.inputPort);
      Assert.fail("should throw exception that Stream1_persister object was already added");
    } catch (IllegalArgumentException e) {
      logger.debug(e.getMessage());
    }

    // Test persist operator without any input ports
    dag.removeOperator(dag.getOperatorMeta("Stream1_persister").getOperator());
    TestOperatorWithoutInputPorts logger2 = new TestOperatorWithoutInputPorts();
    try {
      stream.persistUsing("Stream1_persister", logger2);
      Assert.fail("should throw Illegal argument exception: persist operator should have input ports");
    } catch (IllegalArgumentException e) {
      logger.debug(e.getMessage());
    }

    // Test persist operator with more than one input port as non-optional
    dag.removeOperator(dag.getOperatorMeta("Stream1_persister").getOperator());
    TestOperatorWithMultipleNonOptionalInputPorts persister3 = new TestOperatorWithMultipleNonOptionalInputPorts();
    try {
      stream.persistUsing("Stream1_persister", persister3);
      Assert.fail("should throw Illegal argument exception: persist operator should have at most 1 non-optional input port");
    } catch (IllegalArgumentException e) {
      logger.debug(e.getMessage());
    }
  }

  @Test
  public void testaddStreamThrowsExceptionOnInvalidInputPortForLoggerType()
  {
    // Test for input port belonging to different object
    TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
    GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
    TestReceiverOperator persister = new TestReceiverOperator();
    TestReceiverOperator persister1 = new TestReceiverOperator();
    StreamMeta stream = dag.addStream("Stream1", input1.outport, x.inport1);
    try {
      stream.persistUsing("Stream1_persister", persister, persister1.inport);
      Assert.fail("should throw Illegal argument exception: Port passed does not belong to operator class");
    } catch (IllegalArgumentException e) {
      // all good
    }

    // Remove persist operator from dag
    dag.removeOperator(dag.getOperatorMeta("Stream1_persister").getOperator());
  }

  @Test
  public void testPersistStreamOperatorIsRemovedWhenStreamIsRemoved()
  {
    // Remove Stream and check if persist operator is removed
    TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
    GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
    TestReceiverOperator persister = new TestReceiverOperator();
    StreamMeta stream = dag.addStream("Stream1", input1.outport, x.inport1);
    stream.persistUsing("Stream1_persister", persister, persister.inport);

    ((LogicalPlan.StreamMeta)stream).remove();

    // Check operator is added to dag
    OperatorMeta persistOperatorMeta = dag.getOperatorMeta("Stream1_persister");
    assertEquals("Persist operator should be removed from dag after stream.remove", null, persistOperatorMeta);
  }

  @Test
  public void testPersistStreamOperatorIsRemovedWhenSinkIsRemoved()
  {
    // Remove sink and check if corresponding persist operator is removed
    TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
    GenericTestOperator x1 = dag.addOperator("x1", new GenericTestOperator());
    GenericTestOperator x2 = dag.addOperator("x2", new GenericTestOperator());
    GenericTestOperator x3 = dag.addOperator("x3", new GenericTestOperator());

    TestReceiverOperator persister = new TestReceiverOperator();
    TestReceiverOperator persister1 = new TestReceiverOperator();
    TestReceiverOperator persister2 = new TestReceiverOperator();

    StreamMeta stream = dag.addStream("Stream1", input1.outport, x1.inport1, x2.inport1, x3.inport1);

    stream.persistUsing("Stream1_persister", persister, persister.inport);
    stream.persistUsing("Stream1_x1_persister", persister1, persister1.inport, x1.inport1);
    stream.persistUsing("Stream1_x2_persister", persister2, persister2.inport, x2.inport1);

    // Check 3 persist operators are added to dag
    OperatorMeta persistOperatorMeta = dag.getOperatorMeta("Stream1_persister");
    assertEquals("Persist operator not added to dag ", persister, persistOperatorMeta.getOperator());

    persistOperatorMeta = dag.getOperatorMeta("Stream1_x1_persister");
    assertEquals("Persist operator not added to dag ", persister1, persistOperatorMeta.getOperator());

    persistOperatorMeta = dag.getOperatorMeta("Stream1_x2_persister");
    assertEquals("Persist operator not added to dag ", persister2, persistOperatorMeta.getOperator());

    dag.removeOperator(x1);
    // Check persister for x1 is removed
    persistOperatorMeta = dag.getOperatorMeta("Stream1_x1_persister");
    assertEquals("Persist operator should be removed from dag after sink is removed", null, persistOperatorMeta);

    // Check other persisters are unchanged

    persistOperatorMeta = dag.getOperatorMeta("Stream1_persister");
    assertEquals("Persist operator not added to dag ", persister, persistOperatorMeta.getOperator());

    persistOperatorMeta = dag.getOperatorMeta("Stream1_x2_persister");
    assertEquals("Persist operator not added to dag ", persister2, persistOperatorMeta.getOperator());
  }

  @Test
  public void testPersistStreamOperatorIsRemovedWhenAllSinksAreRemoved()
  {
    TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
    GenericTestOperator x1 = dag.addOperator("x1", new GenericTestOperator());
    GenericTestOperator x2 = dag.addOperator("x2", new GenericTestOperator());
    GenericTestOperator x3 = dag.addOperator("x3", new GenericTestOperator());

    TestReceiverOperator persister = new TestReceiverOperator();

    StreamMeta stream = dag.addStream("Stream1", input1.outport, x1.inport1, x2.inport1, x3.inport1);

    stream.persistUsing("Stream1_persister", persister, persister.inport);

    // Check stream persister is added to the dag
    Assert.assertNotNull("Stream persister operator should be present", dag.getOperatorMeta("Stream1_persister"));

    // remove sink operators x1, x2, x3 from dag and check that persister
    // operator is removed
    dag.removeOperator(x1);
    dag.removeOperator(x2);
    dag.removeOperator(x3);
    Assert.assertNull("Persister operator should have been removed after all sinks are removed", dag.getOperatorMeta("Stream1_persister"));
  }

  @Test
  public void testPersistStreamOperatorGeneratesIdenticalOutputAsSink() throws ClassNotFoundException, IOException, InterruptedException
  {
    AscendingNumbersOperator input1 = dag.addOperator("input1", AscendingNumbersOperator.class);
    // Add PersistOperator directly to dag
    final TestReceiverOperator x = dag.addOperator("x", new TestReceiverOperator());
    StreamMeta stream = dag.addStream("Stream1", input1.outputPort, x.inport);

    // Use an instance of PersistOperator to persist stream
    TestPersistanceOperator persister = new TestPersistanceOperator();
    stream.persistUsing("Stream1_persister", persister, persister.inport);

    runLocalClusterAndValidate(dag, x, persister);
  }

  private void runLocalClusterAndValidate(LogicalPlan dag, final TestReceiverOperator x, final TestPersistanceOperator persister) throws IOException, ClassNotFoundException
  {
    try {
      x.results.clear();
      persister.results.clear();
      // Run local cluster and verify both results are identical
      final StramLocalCluster lc = new StramLocalCluster(dag);

      new Thread("LocalClusterController")
      {
        @Override
        public void run()
        {
          long startTms = System.currentTimeMillis();
          long timeout = 100000L;
          try {
            while (System.currentTimeMillis() - startTms < timeout) {
              if (x.results.size() < 1000) {
                Thread.sleep(10);
              } else {
                break;
              }
            }
          } catch (Exception ex) {
            throw Throwables.propagate(ex);
          } finally {
            lc.shutdown();
          }
        }

      }.start();

      lc.run();
      int maxTuples = x.results.size() > persister.results.size() ? persister.results.size() : x.results.size();
      // Output of both operators should be identical
      for (int i = 0; i < maxTuples; i++) {
        logger.debug("Tuple = " + x.results.get(i) + " - " + persister.results.get(i));
        assertEquals("Mismatch observed for tuple ", x.results.get(i), persister.results.get(i));
      }
    } finally {
      x.results.clear();
      persister.results.clear();
    }
  }

  public static class AscendingNumbersOperator implements InputOperator
  {

    private Integer count = 0;

    @Override
    public void emitTuples()
    {

      outputPort.emit(count++);
    }

    public final transient DefaultOutputPort<Integer> outputPort = new DefaultOutputPort<>();

    @Override
    public void beginWindow(long windowId)
    {
    }

    @Override
    public void endWindow()
    {
    }

    @Override
    public void setup(OperatorContext context)
    {
    }

    @Override
    public void teardown()
    {
    }

  }

  public static class DivisibleByStreamCodec extends DefaultKryoStreamCodec
  {

    protected int number = 1;

    public DivisibleByStreamCodec()
    {
      super();
    }

    public DivisibleByStreamCodec(int number)
    {
      super();
      this.number = number;
    }

    @Override
    public int getPartition(Object o)
    {
      if ((Integer)o % number == 0) {
        return 1;
      }
      return 2;
    }

  }

  public static class PassThruOperatorWithCodec extends BaseOperator implements Partitioner<PassThruOperatorWithCodec>
  {

    private int divisibleBy = 1;

    public PassThruOperatorWithCodec()
    {
    }

    public PassThruOperatorWithCodec(int divisibleBy)
    {
      this.divisibleBy = divisibleBy;
    }

    public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
    {
      @Override
      public void process(Object tuple)
      {
        output.emit(tuple);
      }

      @Override
      public StreamCodec<Object> getStreamCodec()
      {
        return new DivisibleByStreamCodec(divisibleBy);
      }
    };

    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();

    @Override
    public Collection definePartitions(Collection partitions, PartitioningContext context)
    {
      Collection<Partition> newPartitions = new ArrayList<Partition>();

      // Mostly for 1 partition we dont need to do this
      int partitionBits = (Integer.numberOfLeadingZeros(0) - Integer.numberOfLeadingZeros(1));
      int partitionMask = 0;
      if (partitionBits > 0) {
        partitionMask = -1 >>> (Integer.numberOfLeadingZeros(-1)) - partitionBits;
      }

      partitionMask = 1;

      if (partitions.size() == 1) {
        // No partitioning done so far..
        // Single partition again, but with only even numbers ok?
        PassThruOperatorWithCodec newInstance = new PassThruOperatorWithCodec();
        Partition partition = new DefaultPartition<PassThruOperatorWithCodec>(newInstance);

        // Consider partitions are 1 & 2 and we are sending only 1 partition
        // Partition 1 = even numbers
        // Partition 2 = odd numbers
        PartitionKeys value = new PartitionKeys(partitionMask, Sets.newHashSet(1));
        partition.getPartitionKeys().put(input, value);
        newPartitions.add(partition);
      }

      return newPartitions;
    }

    @Override
    public void partitioned(Map partitions)
    {
      // TODO Auto-generated method stub

    }
  }

  @Test
  public void testPersistStreamWithFiltering() throws ClassNotFoundException, IOException, InterruptedException
  {
    AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
    PassThruOperatorWithCodec passThru = dag.addOperator("PassThrough", new PassThruOperatorWithCodec(2));
    TestReceiverOperator console = dag.addOperator("console", new TestReceiverOperator());
    TestPersistanceOperator console1 = new TestPersistanceOperator();
    StreamMeta s = dag.addStream("Stream1", ascend.outputPort, passThru.input);
    s.persistUsing("Stream1_persister", console1, console1.inport);
    dag.addStream("Stream2", passThru.output, console.inport);
    runLocalClusterAndValidate(dag, console, console1);
  }

  @Test
  public void testPersistStreamOnSingleSinkWithFiltering() throws ClassNotFoundException, IOException, InterruptedException
  {
    AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
    PassThruOperatorWithCodec passThru = dag.addOperator("PassThrough", new PassThruOperatorWithCodec(2));
    final TestReceiverOperator console = dag.addOperator("console", new TestReceiverOperator());

    TestPersistanceOperator persister = new TestPersistanceOperator();
    StreamMeta s = dag.addStream("Stream1", ascend.outputPort, passThru.input);
    s.persistUsing("Stream1_persister", persister, persister.inport, passThru.input);
    dag.addStream("Stream2", passThru.output, console.inport);
    runLocalClusterAndValidate(dag, console, persister);
  }

  @Test
  public void testPersistStreamOnSingleSinkWithFilteringContainerLocal() throws ClassNotFoundException, IOException, InterruptedException
  {
    AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
    PassThruOperatorWithCodec passThru = dag.addOperator("PassThrough", new PassThruOperatorWithCodec(2));
    PassThruOperatorWithCodec passThru2 = dag.addOperator("Multiples_of_3", new PassThruOperatorWithCodec(3));

    final TestReceiverOperator console = dag.addOperator("console", new TestReceiverOperator());
    final TestReceiverOperator console1 = dag.addOperator("console1", new TestReceiverOperator());

    TestPersistanceOperator persister = new TestPersistanceOperator();
    StreamMeta s = dag.addStream("Stream1", ascend.outputPort, passThru.input, passThru2.input).setLocality(Locality.CONTAINER_LOCAL);
    s.persistUsing("persister", persister, persister.inport);
    dag.addStream("Stream2", passThru.output, console.inport);
    dag.addStream("Stream3", passThru2.output, console1.inport);
    // runLocalClusterAndValidate(dag, console, persister);

    persister.results.clear();
    console.results.clear();
    console1.results.clear();

    // Validate union of results is received on persist operator
    final StramLocalCluster lc = new StramLocalCluster(dag);

    new Thread("LocalClusterController")
    {
      @Override
      public void run()
      {
        long startTms = System.currentTimeMillis();
        long timeout = 1000000L;
        try {
          while (System.currentTimeMillis() - startTms < timeout) {
            if ((console.results.size() < 6) || (console.results.size() < 6)) {
              Thread.sleep(10);
            } else {
              break;
            }
          }
        } catch (Exception ex) {
          throw Throwables.propagate(ex);
        } finally {
          lc.shutdown();
        }
      }

    }.start();

    lc.run();
    try {
      Integer[] expectedResult = {0, 2, 3, 4, 6, 8, 9, 10, 12};
      for (int i = 0; i < expectedResult.length; i++) {
        logger.debug(persister.results.get(i) + " " + expectedResult[i]);
        assertEquals("Mismatch observed for tuple ", expectedResult[i], persister.results.get(i));
      }
    } finally {

      persister.results.clear();
      console.results.clear();
      console1.results.clear();
    }
  }

  @Test
  public void testPersistStreamOperatorGeneratesUnionOfAllSinksOutput() throws ClassNotFoundException, IOException
  {
    AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
    PassThruOperatorWithCodec passThru1 = dag.addOperator("PassThrough1", new PassThruOperatorWithCodec(2));
    PassThruOperatorWithCodec passThru2 = dag.addOperator("PassThrough2", new PassThruOperatorWithCodec(3));

    final TestReceiverOperator console = dag.addOperator("console", new TestReceiverOperator());
    final TestReceiverOperator console1 = dag.addOperator("console1", new TestReceiverOperator());

    TestPersistanceOperator persister = new TestPersistanceOperator();
    StreamMeta s = dag.addStream("Stream1", ascend.outputPort, passThru1.input, passThru2.input);
    s.persistUsing("persister", persister, persister.inport);

    dag.addStream("Stream2", passThru1.output, console.inport);
    dag.addStream("Stream3", passThru2.output, console1.inport);

    persister.results.clear();
    console.results.clear();
    console1.results.clear();

    // Validate union of results is received on persist operator
    final StramLocalCluster lc = new StramLocalCluster(dag);

    new Thread("LocalClusterController")
    {
      @Override
      public void run()
      {
        long startTms = System.currentTimeMillis();
        long timeout = 10000L;
        try {
          while (System.currentTimeMillis() - startTms < timeout) {
            if ((console.results.size() < 6) || (console.results.size() < 6)) {
              Thread.sleep(10);
            } else {
              break;
            }
          }
        } catch (Exception ex) {
          throw Throwables.propagate(ex);
        } finally {
          lc.shutdown();
        }
      }

    }.start();

    lc.run();
    try {
      Integer[] expectedResult = {0, 2, 3, 4, 6, 8, 9, 10, 12};
      for (int i = 0; i < expectedResult.length; i++) {
        logger.debug(persister.results.get(i) + " " + expectedResult[i]);
        assertEquals("Mismatch observed for tuple ", expectedResult[i], persister.results.get(i));
      }
    } finally {

      persister.results.clear();
      console.results.clear();
      console1.results.clear();
    }
  }

  public static class TestPartitionCodec extends DefaultKryoStreamCodec
  {

    public TestPartitionCodec()
    {
      super();
    }

    @Override
    public int getPartition(Object o)
    {
      return (int)o;// & 0x03;
    }

  }

  public static class PartitionedTestOperatorWithFiltering extends BaseOperator implements Partitioner<PassThruOperatorWithCodec>
  {

    public PartitionedTestOperatorWithFiltering()
    {
    }

    public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
    {
      @Override
      public void process(Object tuple)
      {
        output.emit(tuple);
      }
    };

    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();

    @Override
    public Collection definePartitions(Collection partitions, PartitioningContext context)
    {
      Collection<Partition> newPartitions = new ArrayList<>();

      int partitionMask = 0x03;

      // No partitioning done so far..
      // Single partition again, but with only even numbers ok?
      // First partition
      PassThruOperatorWithCodec newInstance = new PassThruOperatorWithCodec();
      Partition partition = new DefaultPartition<>(newInstance);
      PartitionKeys value = new PartitionKeys(partitionMask, Sets.newHashSet(0));
      partition.getPartitionKeys().put(input, value);
      newPartitions.add(partition);

      // Second partition
      newInstance = new PassThruOperatorWithCodec();
      partition = new DefaultPartition<>(newInstance);
      value = new PartitionKeys(partitionMask, Sets.newHashSet(1));
      partition.getPartitionKeys().put(input, value);

      newPartitions.add(partition);

      return newPartitions;
    }

    @Override
    public void partitioned(Map partitions)
    {
      logger.debug("Dynamic partitioning done....");
    }
  }

  @Test
  public void testPersistStreamOperatorMultiplePhysicalOperatorsForSink() throws ClassNotFoundException, IOException
  {
    AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
    PartitionedTestOperatorWithFiltering passThru = dag.addOperator("partition", new PartitionedTestOperatorWithFiltering());
    final TestReceiverOperator console = dag.addOperator("console", new TestReceiverOperator());
    final TestPersistanceOperator console1 = new TestPersistanceOperator();
    StreamMeta s = dag.addStream("Stream1", ascend.outputPort, passThru.input);
    dag.setInputPortAttribute(passThru.input, PortContext.STREAM_CODEC, new TestPartitionCodec());
    s.persistUsing("persister", console1, console1.inport);
    dag.addStream("Stream2", passThru.output, console.inport);

    final StramLocalCluster lc = new StramLocalCluster(dag);

    new Thread("LocalClusterController")
    {
      @Override
      public void run()
      {
        long startTms = System.currentTimeMillis();
        long timeout = 100000L;
        try {
          while (System.currentTimeMillis() - startTms < timeout) {
            if ((console.results.size() < 6) || (console1.results.size() < 6)) {
              Thread.sleep(10);
            } else {
              break;
            }
          }
        } catch (Exception ex) {
          throw Throwables.propagate(ex);
        } finally {
          lc.shutdown();
        }
      }

    }.start();

    lc.run();

    try {
      Integer[] expectedResult = {0, 1, 4, 5, 8, 9, 12, 13, 16};

      for (int i = 0; i < expectedResult.length; i++) {
        logger.debug(console1.results.get(i) + " " + expectedResult[i]);
        assertEquals("Mismatch observed for tuple ", expectedResult[i], console1.results.get(i));
      }
    } finally {
      console1.results.clear();
      console.results.clear();
    }
  }

  @Test
  public void testPartitionedPersistOperator() throws ClassNotFoundException, IOException
  {
    AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());
    PartitionedTestOperatorWithFiltering passThru = dag.addOperator("partition", new PartitionedTestOperatorWithFiltering());
    final TestReceiverOperator console = dag.addOperator("console", new TestReceiverOperator());
    final PartitionedTestPersistanceOperator console1 = new PartitionedTestPersistanceOperator();
    StreamMeta s = dag.addStream("Stream1", ascend.outputPort, passThru.input);
    dag.setInputPortAttribute(passThru.input, PortContext.STREAM_CODEC, new TestPartitionCodec());
    s.persistUsing("persister", console1, console1.inport);
    dag.setInputPortAttribute(console1.inport, PortContext.STREAM_CODEC, new TestPartitionCodec());
    dag.addStream("Stream2", passThru.output, console.inport);

    final StramLocalCluster lc = new StramLocalCluster(dag);

    new Thread("LocalClusterController")
    {
      @Override
      public void run()
      {
        long startTms = System.currentTimeMillis();
        long timeout = 100000L;
        try {
          while (System.currentTimeMillis() - startTms < timeout) {
            if (console1.results.size() < 6) {
              Thread.sleep(10);
            } else {
              break;
            }
          }
        } catch (Exception ex) {
          throw Throwables.propagate(ex);
        } finally {
          lc.shutdown();
        }
      }

    }.start();

    lc.run();

    try {
      // Values as per persist operator's partition keys should be picked up
      Integer[] expectedResult = {0, 4, 8, 12, 16, 20 };

      for (int i = 0; i < expectedResult.length; i++) {
        logger.debug(console1.results.get(i) + " " + expectedResult[i]);
        assertEquals("Mismatch observed for tuple ", expectedResult[i], console1.results.get(i));
      }
    } finally {
      console1.results.clear();
      console.results.clear();
    }
  }

  @Rule
  public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();

  @Test
  public void testDynamicPartitioning() throws ClassNotFoundException, IOException
  {
    AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator());

    final TestReceiverOperator console = dag.addOperator("console", new TestReceiverOperator());
    dag.setOperatorAttribute(console, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<TestReceiverOperator>(2));
    dag.setOperatorAttribute(console, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));

    final PartitionedTestPersistanceOperator console1 = new PartitionedTestPersistanceOperator();

    StreamMeta s = dag.addStream("Stream1", ascend.outputPort, console.inport);
    dag.setInputPortAttribute(console.inport, PortContext.STREAM_CODEC, new TestPartitionCodec());
    s.persistUsing("persister", console1, console1.inport);

    dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
    StramTestSupport.MemoryStorageAgent msa = new StramTestSupport.MemoryStorageAgent();
    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, msa);

    StreamingContainerManager dnm = new StreamingContainerManager(dag);
    PhysicalPlan plan = dnm.getPhysicalPlan();

    List<PTContainer> containers = plan.getContainers();
    Assert.assertEquals("number containers", 4, containers.size());

    for (int i = 0; i < containers.size(); ++i) {
      StreamingContainerManagerTest.assignContainer(dnm, "container" + (i + 1));
    }

    LogicalPlan.OperatorMeta passThruMeta = dag.getMeta(console);

    List<PTOperator> ptos = plan.getOperators(passThruMeta);

    PTOperator persistOperatorContainer = null;

    for (PTContainer container : plan.getContainers()) {
      for (PTOperator operator : container.getOperators()) {
        operator.setState(PTOperator.State.ACTIVE);
        if (operator.getName().equals("persister")) {
          persistOperatorContainer = operator;
        }
      }
    }

    // Check that persist operator is part of dependents redeployed
    Set<PTOperator> operators = plan.getDependents(ptos);
    logger.debug("Operators to be re-deployed = {}", operators);
    // Validate that persist operator is part of dependents
    assertTrue("persist operator should be part of the operators to be redeployed", operators.contains(persistOperatorContainer));

    LogicalPlan.StreamMeta s1 = (LogicalPlan.StreamMeta)s;
    StreamCodec codec = s1.getPersistOperatorInputPort().getStreamCodec();

    assertEquals("Codec should be instance of StreamCodecWrapper", codec instanceof StreamCodecWrapperForPersistance, true);
    StreamCodecWrapperForPersistance wrapperCodec = (StreamCodecWrapperForPersistance)codec;

    Entry<InputPortMeta, Collection<PartitionKeys>> keys = (Entry<InputPortMeta, Collection<PartitionKeys>>)wrapperCodec.inputPortToPartitionMap.entrySet().iterator().next();
    logger.debug(keys.toString());
    assertEquals("Size of partitions should be 2", 2, keys.getValue().size());

    for (PTOperator ptOperator : ptos) {
      PartitioningTest.PartitionLoadWatch.put(ptOperator, -1);
      plan.onStatusUpdate(ptOperator);
    }

    dnm.processEvents();

    assertEquals("Input port map", wrapperCodec.inputPortToPartitionMap.size(), 1);

    keys = (Entry<InputPortMeta, Collection<PartitionKeys>>)wrapperCodec.inputPortToPartitionMap.entrySet().iterator().next();
    assertEquals("Size of partitions should be 1 after repartition", 1, keys.getValue().size());
    logger.debug(keys.toString());
  }
}
