/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.druid.indexing.kinesis.supervisor;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
import org.apache.druid.indexing.kinesis.KinesisIndexTask;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskClient;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig;
import org.apache.druid.indexing.kinesis.KinesisRecordSupplier;
import org.apache.druid.indexing.kinesis.KinesisSequenceNumber;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.IAnswer;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;

public class KinesisSupervisorTest extends EasyMockSupport
{
  private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
  private static final InputFormat INPUT_FORMAT = new JsonInputFormat(
      new JSONPathSpec(true, ImmutableList.of()),
      ImmutableMap.of(),
      false
  );
  private static final String DATASOURCE = "testDS";
  private static final int TEST_CHAT_THREADS = 3;
  private static final long TEST_CHAT_RETRIES = 9L;
  private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S");
  private static final Period TEST_SHUTDOWN_TIMEOUT = new Period("PT80S");
  private static final String STREAM = "stream";
  private static final String SHARD_ID0 = "shardId-000000000000";
  private static final String SHARD_ID1 = "shardId-000000000001";
  private static final String SHARD_ID2 = "shardId-000000000002";
  private static final StreamPartition<String> SHARD0_PARTITION = StreamPartition.of(STREAM, SHARD_ID0);
  private static final StreamPartition<String> SHARD1_PARTITION = StreamPartition.of(STREAM, SHARD_ID1);
  private static final StreamPartition<String> SHARD2_PARTITION = StreamPartition.of(STREAM, SHARD_ID2);
  private static DataSchema dataSchema;
  private KinesisRecordSupplier supervisorRecordSupplier;

  private final int numThreads;
  private TestableKinesisSupervisor supervisor;
  private KinesisSupervisorTuningConfig tuningConfig;
  private TaskStorage taskStorage;
  private TaskMaster taskMaster;
  private TaskRunner taskRunner;
  private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
  private KinesisIndexTaskClient taskClient;
  private TaskQueue taskQueue;
  private RowIngestionMetersFactory rowIngestionMetersFactory;
  private ExceptionCapturingServiceEmitter serviceEmitter;
  private SupervisorStateManagerConfig supervisorConfig;

  public KinesisSupervisorTest()
  {
    this.numThreads = 1;
  }

  @BeforeClass
  public static void setupClass()
  {
    dataSchema = getDataSchema(DATASOURCE);
  }

  @Before
  public void setupTest()
  {
    taskStorage = createMock(TaskStorage.class);
    taskMaster = createMock(TaskMaster.class);
    taskRunner = createMock(TaskRunner.class);
    indexerMetadataStorageCoordinator = createMock(IndexerMetadataStorageCoordinator.class);
    taskClient = createMock(KinesisIndexTaskClient.class);
    taskQueue = createMock(TaskQueue.class);
    supervisorRecordSupplier = createMock(KinesisRecordSupplier.class);

    tuningConfig = new KinesisSupervisorTuningConfig(
        1000,
        null,
        50000,
        null,
        new Period("P1Y"),
        new File("/test"),
        null,
        null,
        null,
        true,
        false,
        null,
        null,
        null,
        null,
        numThreads,
        TEST_CHAT_THREADS,
        TEST_CHAT_RETRIES,
        TEST_HTTP_TIMEOUT,
        TEST_SHUTDOWN_TIMEOUT,
        null,
        null,
        null,
        5000,
        null,
        null,
        null,
        null,
        null,
        null,
        null,
        null
    );
    rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
    serviceEmitter = new ExceptionCapturingServiceEmitter();
    EmittingLogger.registerEmitter(serviceEmitter);
    supervisorConfig = new SupervisorStateManagerConfig();
  }

  @After
  public void tearDownTest()
  {
    supervisor = null;
  }

  @Test
  public void testNoInitialState() throws Exception
  {
    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);

    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Capture<KinesisIndexTask> captured = Capture.newInstance();
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    KinesisIndexTask task = captured.getValue();
    Assert.assertEquals(dataSchema, task.getDataSchema());
    Assert.assertEquals(tuningConfig.convertToTaskTuningConfig(), task.getTuningConfig());

    KinesisIndexTaskIOConfig taskConfig = task.getIOConfig();
    Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
    Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
    Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent());
    Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent());

    Assert.assertEquals(STREAM, taskConfig.getStartSequenceNumbers().getStream());
    Assert.assertEquals(
        "0",
        taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1)
    );
    Assert.assertEquals(
        "0",
        taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0)
    );

    Assert.assertEquals(STREAM, taskConfig.getEndSequenceNumbers().getStream());
    Assert.assertEquals(
        KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
        taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1)
    );
    Assert.assertEquals(
        KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
        taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0)
    );
  }

  @Test
  public void testRecordSupplier()
  {
    KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
        STREAM,
        INPUT_FORMAT,
        "awsEndpoint",
        null,
        1,
        1,
        new Period("PT30M"),
        new Period("P1D"),
        new Period("PT30S"),
        false,
        new Period("PT30M"),
        null,
        null,
        null,
        100,
        1000,
        null,
        null,
        false
    );
    KinesisIndexTaskClientFactory clientFactory = new KinesisIndexTaskClientFactory(null, OBJECT_MAPPER);
    KinesisSupervisor supervisor = new KinesisSupervisor(
        taskStorage,
        taskMaster,
        indexerMetadataStorageCoordinator,
        clientFactory,
        OBJECT_MAPPER,
        new KinesisSupervisorSpec(
            null,
            dataSchema,
            tuningConfig,
            kinesisSupervisorIOConfig,
            null,
            false,
            taskStorage,
            taskMaster,
            indexerMetadataStorageCoordinator,
            clientFactory,
            OBJECT_MAPPER,
            new NoopServiceEmitter(),
            new DruidMonitorSchedulerConfig(),
            rowIngestionMetersFactory,
            null,
            new SupervisorStateManagerConfig()
        ),
        rowIngestionMetersFactory,
        null
    );

    KinesisRecordSupplier supplier = (KinesisRecordSupplier) supervisor.setupRecordSupplier();
    Assert.assertNotNull(supplier);
    Assert.assertEquals(0, supplier.bufferSize());
    Assert.assertEquals(Collections.emptySet(), supplier.getAssignment());
    // background fetch should not be enabled for supervisor supplier
    supplier.start();
    Assert.assertFalse(supplier.isBackgroundFetchRunning());
  }

  @Test
  public void testMultiTask() throws Exception
  {
    supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);

    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock
        .expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
        .andReturn(new KinesisDataSourceMetadata(null))
        .anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    KinesisIndexTask task1 = captured.getValues().get(0);
    Assert.assertEquals(1, task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
    Assert.assertEquals(1, task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size());
    Assert.assertEquals(
        "0",
        task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1)
    );
    Assert.assertEquals(
        KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
        task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1)
    );

    KinesisIndexTask task2 = captured.getValues().get(1);
    Assert.assertEquals(1, task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
    Assert.assertEquals(1, task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size());
    Assert.assertEquals(
        "0",
        task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0)
    );
    Assert.assertEquals(
        KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
        task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0)
    );
  }

  @Test
  public void testReplicas() throws Exception
  {
    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);

    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    KinesisIndexTask task1 = captured.getValues().get(0);
    Assert.assertEquals(2, task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
    Assert.assertEquals(2, task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size());
    Assert.assertEquals(
        "0",
        task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0)
    );
    Assert.assertEquals(
        KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
        task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0)
    );
    Assert.assertEquals(
        "0",
        task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1)
    );
    Assert.assertEquals(
        KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
        task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1)
    );

    KinesisIndexTask task2 = captured.getValues().get(1);
    Assert.assertEquals(2, task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
    Assert.assertEquals(2, task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size());
    Assert.assertEquals(
        "0",
        task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0)
    );
    Assert.assertEquals(
        KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
        task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0)
    );
    Assert.assertEquals(
        "0",
        task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1)
    );
    Assert.assertEquals(
        KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
        task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1)
    );

  }

  @Test
  public void testLateMessageRejectionPeriod() throws Exception
  {
    supervisor = getTestableSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null);

    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    KinesisIndexTask task1 = captured.getValues().get(0);
    KinesisIndexTask task2 = captured.getValues().get(1);

    Assert.assertTrue(
        "minimumMessageTime",
        task1.getIOConfig().getMinimumMessageTime().get().plusMinutes(59).isBeforeNow()
    );
    Assert.assertTrue(
        "minimumMessageTime",
        task1.getIOConfig().getMinimumMessageTime().get().plusMinutes(61).isAfterNow()
    );
    Assert.assertEquals(
        task1.getIOConfig().getMinimumMessageTime().get(),
        task2.getIOConfig().getMinimumMessageTime().get()
    );
  }

  @Test
  public void testEarlyMessageRejectionPeriod() throws Exception
  {
    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, new Period("PT1H"));

    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    KinesisIndexTask task1 = captured.getValues().get(0);
    KinesisIndexTask task2 = captured.getValues().get(1);

    Assert.assertTrue(
        "maximumMessageTime",
        task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(59 + 60).isAfterNow()
    );
    Assert.assertTrue(
        "maximumMessageTime",
        task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(61 + 60).isBeforeNow()
    );
    Assert.assertEquals(
        task1.getIOConfig().getMaximumMessageTime().get(),
        task2.getIOConfig().getMaximumMessageTime().get()
    );
  }


  /**
   * Test generating the starting sequences from the partition data stored in druid_dataSource which contains the
   * sequences of the last built segments.
   */
  @Test
  public void testDatasourceMetadata() throws Exception
  {
    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);

    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(SHARD1_PARTITION)).andReturn("2").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Capture<KinesisIndexTask> captured = Capture.newInstance();
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            new SeekableStreamStartSequenceNumbers<>(
                STREAM,
                ImmutableMap.of(SHARD_ID1, "2", SHARD_ID0, "1"),
                ImmutableSet.of()
            )
        )
    ).anyTimes();

    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    KinesisIndexTask task = captured.getValue();
    KinesisIndexTaskIOConfig taskConfig = task.getIOConfig();
    Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
    Assert.assertEquals(
        "2",
        taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1)
    );
    Assert.assertEquals(
        "1",
        taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0)
    );
  }

  @Test
  public void testBadMetadataOffsets() throws Exception
  {
    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);

    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();


    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            new SeekableStreamStartSequenceNumbers<>(
                STREAM,
                ImmutableMap.of(SHARD_ID1, "101", SHARD_ID0, "-1"),
                ImmutableSet.of()
            )
        )
    ).anyTimes();
    replayAll();

    supervisor.start();
    supervisor.runInternal();

    Assert.assertEquals(
        "org.apache.druid.java.util.common.ISE",
        supervisor.getStateManager().getExceptionEvents().get(0).getExceptionClass()
    );
  }

  @Test
  public void testDontKillTasksWithMismatchedType() throws Exception
  {
    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);

    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    // non KinesisIndexTask (don't kill)
    Task id2 = new RealtimeIndexTask(
        "id2",
        null,
        new FireDepartment(
            dataSchema,
            new RealtimeIOConfig(null, null),
            null
        ),
        null
    );

    List<Task> existingTasks = ImmutableList.of(id2);

    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));

    EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true).times(2);

    replayAll();
    supervisor.start();
    supervisor.runInternal();
    verifyAll();
  }

  @Test
  public void testKillBadPartitionAssignment() throws Exception
  {
    supervisor = getTestableSupervisor(
        1,
        2,
        true,
        "PT1H",
        null,
        null
    );

    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Task id1 = createKinesisIndexTask(
        "id1",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID1, "0"), ImmutableSet.of()),
        new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID1, "12")),
        null,
        null
    );
    Task id2 = createKinesisIndexTask(
        "id2",
        DATASOURCE,
        1,
        new SeekableStreamStartSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID0, "0"), ImmutableSet.of(SHARD_ID0)),
        new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID0, "1")),
        null,
        null
    );
    Task id3 = createKinesisIndexTask(
        "id3",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(SHARD_ID0, "0", SHARD_ID1, "0"), ImmutableSet.of(SHARD_ID0, SHARD_ID1)
        ),
        new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID0, "1", SHARD_ID1, "12")),
        null,
        null
    );
    Task id4 = createKinesisIndexTask(
        "id4",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID0, "0"), ImmutableSet.of(SHARD_ID0)),
        new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID0, "1")),
        null,
        null
    );

    List<Task> existingTasks = ImmutableList.of(id1, id2, id3, id4);

    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id4")).andReturn(Optional.of(TaskStatus.running("id4"))).anyTimes();
    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id4")).andReturn(Optional.of(id4)).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    EasyMock.expect(taskClient.stopAsync("id3", false)).andReturn(Futures.immediateFuture(true));

    TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>();
    checkpoints1.put(0, ImmutableMap.of(SHARD_ID1, "0"));
    TreeMap<Integer, Map<String, String>> checkpoints2 = new TreeMap<>();
    checkpoints2.put(0, ImmutableMap.of(SHARD_ID0, "0"));
    TreeMap<Integer, Map<String, String>> checkpoints4 = new TreeMap<>();
    checkpoints4.put(0, ImmutableMap.of(SHARD_ID0, "0"));
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints1))
            .times(1);
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints2))
            .times(1);
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id4"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints4))
            .times(1);


    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    taskQueue.shutdown("id4", "Task [%s] failed to return status, killing task", "id4");
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();
  }

  @Test
  public void testRequeueTaskWhenFailed() throws Exception
  {
    supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null);
    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(4);

    TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>();
    checkpoints1.put(
        0,
        ImmutableMap.of(
            SHARD_ID1,
            "0"
        )
    );
    TreeMap<Integer, Map<String, String>> checkpoints2 = new TreeMap<>();
    checkpoints2.put(0, ImmutableMap.of(
        SHARD_ID0,
        "0"
    ));
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints1))
            .anyTimes();
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints2))
            .anyTimes();

    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    List<Task> tasks = captured.getValues();

    // test that running the main loop again checks the status of the tasks that were created and does nothing if they
    // are all still running
    EasyMock.reset(taskStorage);
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes();
    for (Task task : tasks) {
      EasyMock.expect(taskStorage.getStatus(task.getId()))
              .andReturn(Optional.of(TaskStatus.running(task.getId())))
              .anyTimes();
      EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
    }
    EasyMock.replay(taskStorage);

    supervisor.runInternal();
    verifyAll();

    // test that a task failing causes a new task to be re-queued with the same parameters
    Capture<Task> aNewTaskCapture = Capture.newInstance();
    List<Task> imStillAlive = tasks.subList(0, 3);
    KinesisIndexTask iHaveFailed = (KinesisIndexTask) tasks.get(3);
    EasyMock.reset(taskStorage);
    EasyMock.reset(taskQueue);
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(imStillAlive).anyTimes();
    for (Task task : imStillAlive) {
      EasyMock.expect(taskStorage.getStatus(task.getId()))
              .andReturn(Optional.of(TaskStatus.running(task.getId())))
              .anyTimes();
      EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
    }
    EasyMock.expect(taskStorage.getStatus(iHaveFailed.getId()))
            .andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId())));
    EasyMock.expect(taskStorage.getTask(iHaveFailed.getId())).andReturn(Optional.of(iHaveFailed)).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.capture(aNewTaskCapture))).andReturn(true);
    EasyMock.replay(taskStorage);
    EasyMock.replay(taskQueue);

    supervisor.runInternal();
    verifyAll();

    Assert.assertNotEquals(iHaveFailed.getId(), aNewTaskCapture.getValue().getId());
    Assert.assertEquals(
        iHaveFailed.getIOConfig().getBaseSequenceName(),
        ((KinesisIndexTask) aNewTaskCapture.getValue()).getIOConfig().getBaseSequenceName()
    );
  }

  @Test
  public void testRequeueAdoptedTaskWhenFailed() throws Exception
  {
    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    DateTime now = DateTimes.nowUtc();
    DateTime maxi = now.plusMinutes(60);
    Task id1 = createKinesisIndexTask(
        "id1",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"),
            ImmutableSet.of()
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        now,
        maxi
    );

    List<Task> existingTasks = ImmutableList.of(id1);

    Capture<Task> captured = Capture.newInstance();
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync("id1"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING));
    EasyMock.expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(now)).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();

    TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
    checkpoints.put(0, ImmutableMap.of(
        SHARD_ID1,
        "0",
        SHARD_ID0,
        "0"
    ));
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .times(2);

    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    // check that replica tasks are created with the same minimumMessageTime as tasks inherited from another supervisor
    Assert.assertEquals(now, ((KinesisIndexTask) captured.getValue()).getIOConfig().getMinimumMessageTime().get());

    // test that a task failing causes a new task to be re-queued with the same parameters
    String runningTaskId = captured.getValue().getId();
    Capture<Task> aNewTaskCapture = Capture.newInstance();
    KinesisIndexTask iHaveFailed = (KinesisIndexTask) existingTasks.get(0);
    EasyMock.reset(taskStorage);
    EasyMock.reset(taskQueue);
    EasyMock.reset(taskClient);

    // for the newly created replica task
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .times(2);
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .times(1);


    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(captured.getValue())).anyTimes();
    EasyMock.expect(taskStorage.getStatus(iHaveFailed.getId()))
            .andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId())));
    EasyMock.expect(taskStorage.getStatus(runningTaskId))
            .andReturn(Optional.of(TaskStatus.running(runningTaskId)))
            .anyTimes();
    EasyMock.expect(taskStorage.getTask(iHaveFailed.getId())).andReturn(Optional.of(iHaveFailed)).anyTimes();
    EasyMock.expect(taskStorage.getTask(runningTaskId)).andReturn(Optional.of(captured.getValue())).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync(runningTaskId))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING));
    EasyMock.expect(taskClient.getStartTimeAsync(runningTaskId)).andReturn(Futures.immediateFuture(now)).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.capture(aNewTaskCapture))).andReturn(true);
    EasyMock.replay(taskStorage);
    EasyMock.replay(taskQueue);
    EasyMock.replay(taskClient);

    supervisor.runInternal();
    verifyAll();

    Assert.assertNotEquals(iHaveFailed.getId(), aNewTaskCapture.getValue().getId());
    Assert.assertEquals(
        iHaveFailed.getIOConfig().getBaseSequenceName(),
        ((KinesisIndexTask) aNewTaskCapture.getValue()).getIOConfig().getBaseSequenceName()
    );

    // check that failed tasks are recreated with the same minimumMessageTime as the task it replaced, even if that
    // task came from another supervisor
    Assert.assertEquals(
        now,
        ((KinesisIndexTask) aNewTaskCapture.getValue()).getIOConfig().getMinimumMessageTime().get()
    );
    Assert.assertEquals(
        maxi,
        ((KinesisIndexTask) aNewTaskCapture.getValue()).getIOConfig().getMaximumMessageTime().get()
    );
  }

  @Test
  public void testQueueNextTasksOnSuccess() throws Exception
  {
    supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null);
    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(4);
    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));

    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    List<Task> tasks = captured.getValues();

    EasyMock.reset(taskStorage);
    EasyMock.reset(taskClient);

    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();
    TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>();
    checkpoints1.put(0, ImmutableMap.of(
        SHARD_ID1,
        "0",
        SHARD_ID0,
        "0"
    ));
    TreeMap<Integer, Map<String, String>> checkpoints2 = new TreeMap<>();
    checkpoints2.put(0, ImmutableMap.of(
        SHARD_ID1,
        "0"
    ));
    // there would be 4 tasks, 2 for each task group
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints1))
            .times(2);
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints2))
            .times(2);


    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes();
    for (Task task : tasks) {
      EasyMock.expect(taskStorage.getStatus(task.getId()))
              .andReturn(Optional.of(TaskStatus.running(task.getId())))
              .anyTimes();
      EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
    }
    EasyMock.replay(taskStorage);
    EasyMock.replay(taskClient);

    supervisor.runInternal();
    verifyAll();

    // test that a task succeeding causes a new task to be re-queued with the next stream range and causes any replica
    // tasks to be shutdown
    Capture<Task> newTasksCapture = Capture.newInstance(CaptureType.ALL);
    Capture<String> shutdownTaskIdCapture = Capture.newInstance();
    List<Task> imStillRunning = tasks.subList(1, 4);
    KinesisIndexTask iAmSuccess = (KinesisIndexTask) tasks.get(0);
    EasyMock.reset(taskStorage);
    EasyMock.reset(taskQueue);
    EasyMock.reset(taskClient);
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(imStillRunning).anyTimes();
    for (Task task : imStillRunning) {
      EasyMock.expect(taskStorage.getStatus(task.getId()))
              .andReturn(Optional.of(TaskStatus.running(task.getId())))
              .anyTimes();
      EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
    }
    EasyMock.expect(taskStorage.getStatus(iAmSuccess.getId()))
            .andReturn(Optional.of(TaskStatus.success(iAmSuccess.getId())));
    EasyMock.expect(taskStorage.getTask(iAmSuccess.getId())).andReturn(Optional.of(iAmSuccess)).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.capture(newTasksCapture))).andReturn(true).times(2);
    EasyMock.expect(taskClient.stopAsync(EasyMock.capture(shutdownTaskIdCapture), EasyMock.eq(false)))
            .andReturn(Futures.immediateFuture(true));
    EasyMock.replay(taskStorage);
    EasyMock.replay(taskQueue);
    EasyMock.replay(taskClient);

    supervisor.runInternal();
    verifyAll();

    // make sure we killed the right task (sequenceName for replicas are the same)
    Assert.assertTrue(shutdownTaskIdCapture.getValue().contains(iAmSuccess.getIOConfig().getBaseSequenceName()));
  }

  @Test
  public void testBeginPublishAndQueueNextTasks() throws Exception
  {
    final TaskLocation location = new TaskLocation("testHost", 1234, -1);

    supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);

    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    final Capture<Task> firstTasks = Capture.newInstance(CaptureType.ALL);
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(null)
    ).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.capture(firstTasks))).andReturn(true).times(4);
    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    final List<Task> tasks = firstTasks.getValues();
    Collection workItems = new ArrayList<>();
    for (Task task : tasks) {
      workItems.add(new TestTaskRunnerWorkItem(task, null, location));
    }

    EasyMock.reset(taskStorage, taskRunner, taskClient, taskQueue);
    final Capture<Task> secondTasks = Capture.newInstance(CaptureType.ALL);
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes();
    for (Task task : tasks) {
      EasyMock.expect(taskStorage.getStatus(task.getId()))
              .andReturn(Optional.of(TaskStatus.running(task.getId())))
              .anyTimes();
      EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
    }
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
            .anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.contains("sequenceName-0")))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc().minusMinutes(2)))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()));
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.contains("sequenceName-1")))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .times(2);
    EasyMock.expect(taskClient.pauseAsync(EasyMock.contains("sequenceName-0")))
            .andReturn(Futures.immediateFuture(ImmutableMap.of(
                SHARD_ID1,
                "1"
            )))
            .andReturn(Futures.immediateFuture(ImmutableMap.of(
                SHARD_ID1,
                "3"
            )));
    EasyMock.expect(
        taskClient.setEndOffsetsAsync(
            EasyMock.contains("sequenceName-0"),
            EasyMock.eq(ImmutableMap.of(
                SHARD_ID1,
                "3"
            )),
            EasyMock.eq(true)
        )
    ).andReturn(Futures.immediateFuture(true)).times(2);
    EasyMock.expect(taskQueue.add(EasyMock.capture(secondTasks))).andReturn(true).times(2);

    TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>();
    checkpoints1.put(0, ImmutableMap.of(
        SHARD_ID1,
        "0"
    ));
    TreeMap<Integer, Map<String, String>> checkpoints2 = new TreeMap<>();
    checkpoints2.put(0, ImmutableMap.of(
        SHARD_ID0,
        "0"
    ));
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints1))
            .times(2);
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints2))
            .times(2);

    EasyMock.replay(taskStorage, taskRunner, taskClient, taskQueue);

    supervisor.runInternal();
    verifyAll();

    for (Task task : secondTasks.getValues()) {
      KinesisIndexTask kinesisIndexTask = (KinesisIndexTask) task;
      Assert.assertEquals(dataSchema, kinesisIndexTask.getDataSchema());
      Assert.assertEquals(tuningConfig.convertToTaskTuningConfig(), kinesisIndexTask.getTuningConfig());

      KinesisIndexTaskIOConfig taskConfig = kinesisIndexTask.getIOConfig();
      Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
      Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());

      Assert.assertEquals(STREAM, taskConfig.getStartSequenceNumbers().getStream());
      Assert.assertEquals(
          "3",
          taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1)
      );
      // start sequenceNumbers should be exclusive for the second batch of tasks
      Assert.assertEquals(
          ImmutableSet.of(SHARD_ID1),
          ((KinesisIndexTask) task).getIOConfig().getStartSequenceNumbers().getExclusivePartitions()
      );
    }
  }

  @Test
  public void testDiscoverExistingPublishingTask() throws Exception
  {
    final TaskLocation location = new TaskLocation("testHost", 1234, -1);
    final Map<String, Long> timeLag = ImmutableMap.of(SHARD_ID1, 0L, SHARD_ID0, 20000000L);

    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);

    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(), EasyMock.anyObject()))
            .andReturn(timeLag)
            .atLeastOnce();

    Task task = createKinesisIndexTask(
        "id1",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"),
            ImmutableSet.of()
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    Collection workItems = new ArrayList<>();
    workItems.add(new TestTaskRunnerWorkItem(task, null, location));

    Capture<KinesisIndexTask> captured = Capture.newInstance();
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(task)).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync("id1"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.PUBLISHING));
    EasyMock.expect(taskClient.getCurrentOffsetsAsync("id1", false))
            .andReturn(Futures.immediateFuture(ImmutableMap.of(
                SHARD_ID1,
                "2",
                SHARD_ID0,
                "1"
            )));
    EasyMock.expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of(
        SHARD_ID1,
        "2",
        SHARD_ID0,
        "1"
    ));
    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);

    TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
    checkpoints.put(0, ImmutableMap.of(
        SHARD_ID1,
        "0",
        SHARD_ID0,
        "0"
    ));
    EasyMock.expect(taskClient.getCheckpoints(EasyMock.anyString(), EasyMock.anyBoolean()))
            .andReturn(checkpoints)
            .anyTimes();

    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    supervisor.updateCurrentAndLatestOffsets();
    SupervisorReport<KinesisSupervisorReportPayload> report = supervisor.getStatus();
    verifyAll();

    Assert.assertEquals(DATASOURCE, report.getId());

    KinesisSupervisorReportPayload payload = report.getPayload();

    Assert.assertEquals(DATASOURCE, payload.getDataSource());
    Assert.assertEquals(3600L, payload.getDurationSeconds());
    Assert.assertEquals(2, payload.getPartitions());
    Assert.assertEquals(1, payload.getReplicas());
    Assert.assertEquals(STREAM, payload.getStream());
    Assert.assertEquals(0, payload.getActiveTasks().size());
    Assert.assertEquals(1, payload.getPublishingTasks().size());
    Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState());
    Assert.assertEquals(0, payload.getRecentErrors().size());

    TaskReportData publishingReport = payload.getPublishingTasks().get(0);

    Assert.assertEquals("id1", publishingReport.getId());
    Assert.assertEquals(ImmutableMap.of(
        SHARD_ID1,
        "0",
        SHARD_ID0,
        "0"
    ), publishingReport.getStartingOffsets());
    Assert.assertEquals(ImmutableMap.of(
        SHARD_ID1,
        "2",
        SHARD_ID0,
        "1"
    ), publishingReport.getCurrentOffsets());

    KinesisIndexTask capturedTask = captured.getValue();
    Assert.assertEquals(dataSchema, capturedTask.getDataSchema());
    Assert.assertEquals(tuningConfig.convertToTaskTuningConfig(), capturedTask.getTuningConfig());

    KinesisIndexTaskIOConfig capturedTaskConfig = capturedTask.getIOConfig();
    Assert.assertEquals("awsEndpoint", capturedTaskConfig.getEndpoint());
    Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName());
    Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction());

    // check that the new task was created with starting sequences matching where the publishing task finished
    Assert.assertEquals(STREAM, capturedTaskConfig.getStartSequenceNumbers().getStream());
    Assert.assertEquals(
        "2",
        capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1)
    );
    Assert.assertEquals(
        "1",
        capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0)
    );

    Assert.assertEquals(STREAM, capturedTaskConfig.getEndSequenceNumbers().getStream());
    Assert.assertEquals(
        KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
        capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1)
    );
    Assert.assertEquals(
        KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
        capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0)
    );
  }

  @Test
  public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() throws Exception
  {
    final TaskLocation location = new TaskLocation("testHost", 1234, -1);
    final Map<String, Long> timeLag = ImmutableMap.of(SHARD_ID1, 9000L, SHARD_ID0, 1234L);

    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(), EasyMock.anyObject()))
            .andReturn(timeLag)
            .atLeastOnce();

    Task task = createKinesisIndexTask(
        "id1",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"),
            ImmutableSet.of()
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    Collection workItems = new ArrayList<>();
    workItems.add(new TestTaskRunnerWorkItem(task, null, location));

    Capture<KinesisIndexTask> captured = Capture.newInstance();
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(task)).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync("id1"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.PUBLISHING));
    EasyMock.expect(taskClient.getCurrentOffsetsAsync("id1", false))
            .andReturn(Futures.immediateFuture(ImmutableMap.of(
                SHARD_ID1,
                "2",
                SHARD_ID0,
                "1"
            )));
    EasyMock.expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of(
        SHARD_ID1,
        "2",
        SHARD_ID0,
        "1"
    ));
    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);

    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    supervisor.updateCurrentAndLatestOffsets();
    SupervisorReport<KinesisSupervisorReportPayload> report = supervisor.getStatus();
    verifyAll();

    Assert.assertEquals(DATASOURCE, report.getId());

    KinesisSupervisorReportPayload payload = report.getPayload();

    Assert.assertEquals(DATASOURCE, payload.getDataSource());
    Assert.assertEquals(3600L, payload.getDurationSeconds());
    Assert.assertEquals(2, payload.getPartitions());
    Assert.assertEquals(1, payload.getReplicas());
    Assert.assertEquals(STREAM, payload.getStream());
    Assert.assertEquals(0, payload.getActiveTasks().size());
    Assert.assertEquals(1, payload.getPublishingTasks().size());
    Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState());
    Assert.assertEquals(0, payload.getRecentErrors().size());

    TaskReportData publishingReport = payload.getPublishingTasks().get(0);

    Assert.assertEquals("id1", publishingReport.getId());
    Assert.assertEquals(ImmutableMap.of(
        SHARD_ID1,
        "0",
        SHARD_ID0,
        "0"
    ), publishingReport.getStartingOffsets());
    Assert.assertEquals(ImmutableMap.of(
        SHARD_ID1,
        "2",
        SHARD_ID0,
        "1"
    ), publishingReport.getCurrentOffsets());

    KinesisIndexTask capturedTask = captured.getValue();
    Assert.assertEquals(dataSchema, capturedTask.getDataSchema());
    Assert.assertEquals(tuningConfig.convertToTaskTuningConfig(), capturedTask.getTuningConfig());

    KinesisIndexTaskIOConfig capturedTaskConfig = capturedTask.getIOConfig();
    Assert.assertEquals("awsEndpoint", capturedTaskConfig.getEndpoint());
    Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName());
    Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction());

    // check that the new task was created with starting sequences matching where the publishing task finished
    Assert.assertEquals(STREAM, capturedTaskConfig.getStartSequenceNumbers().getStream());
    Assert.assertEquals(
        "2",
        capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1)
    );
    Assert.assertEquals(
        "1",
        capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0)
    );

    Assert.assertEquals(STREAM, capturedTaskConfig.getEndSequenceNumbers().getStream());
    Assert.assertEquals(
        KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
        capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1)
    );
    Assert.assertEquals(
        KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
        capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0)
    );
  }

  @Test
  public void testDiscoverExistingPublishingAndReadingTask() throws Exception
  {
    final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
    final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
    final DateTime startTime = DateTimes.nowUtc();
    final Map<String, Long> timeLag = ImmutableMap.of(SHARD_ID0, 100L, SHARD_ID1, 200L);

    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);

    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(), EasyMock.anyObject()))
            .andReturn(timeLag)
            .atLeastOnce();
    Task id1 = createKinesisIndexTask(
        "id1",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"),
            ImmutableSet.of()
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    Task id2 = createKinesisIndexTask(
        "id2",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "2", SHARD_ID0, "1"), ImmutableSet.of(SHARD_ID0, SHARD_ID1)
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    Collection workItems = new ArrayList<>();
    workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
    workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));

    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2)).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync("id1"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.PUBLISHING));
    EasyMock.expect(taskClient.getStatusAsync("id2"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING));
    EasyMock.expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
    EasyMock.expect(taskClient.getCurrentOffsetsAsync("id1", false))
            .andReturn(Futures.immediateFuture(ImmutableMap.of(
                SHARD_ID1,
                "2",
                SHARD_ID0,
                "1"
            )));
    EasyMock.expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of(
        SHARD_ID1,
        "2",
        SHARD_ID0,
        "1"
    ));
    EasyMock.expect(taskClient.getCurrentOffsetsAsync("id2", false))
            .andReturn(Futures.immediateFuture(ImmutableMap.of(
                SHARD_ID1,
                "12",
                SHARD_ID0,
                "1"
            )));

    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));

    // since id1 is publishing, so getCheckpoints wouldn't be called for it
    TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
    checkpoints.put(0, ImmutableMap.of(
        SHARD_ID1,
        "2",
        SHARD_ID0,
        "1"
    ));
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .times(1);

    replayAll();

    supervisor.start();
    supervisor.runInternal();
    supervisor.updateCurrentAndLatestOffsets();
    SupervisorReport<KinesisSupervisorReportPayload> report = supervisor.getStatus();
    verifyAll();

    Assert.assertEquals(DATASOURCE, report.getId());

    KinesisSupervisorReportPayload payload = report.getPayload();

    Assert.assertEquals(DATASOURCE, payload.getDataSource());
    Assert.assertEquals(3600L, payload.getDurationSeconds());
    Assert.assertEquals(2, payload.getPartitions());
    Assert.assertEquals(1, payload.getReplicas());
    Assert.assertEquals(STREAM, payload.getStream());
    Assert.assertEquals(1, payload.getActiveTasks().size());
    Assert.assertEquals(1, payload.getPublishingTasks().size());
    Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState());
    Assert.assertEquals(0, payload.getRecentErrors().size());

    TaskReportData activeReport = payload.getActiveTasks().get(0);
    TaskReportData publishingReport = payload.getPublishingTasks().get(0);

    Assert.assertEquals("id2", activeReport.getId());
    Assert.assertEquals(startTime, activeReport.getStartTime());
    Assert.assertEquals(ImmutableMap.of(
        SHARD_ID1,
        "2",
        SHARD_ID0,
        "1"
    ), activeReport.getStartingOffsets());
    Assert.assertEquals(ImmutableMap.of(
        SHARD_ID1,
        "12",
        SHARD_ID0,
        "1"
    ), activeReport.getCurrentOffsets());
    Assert.assertEquals(timeLag, activeReport.getLagMillis());

    Assert.assertEquals("id1", publishingReport.getId());
    Assert.assertEquals(ImmutableMap.of(
        SHARD_ID1,
        "0",
        SHARD_ID0,
        "0"
    ), publishingReport.getStartingOffsets());
    Assert.assertEquals(ImmutableMap.of(
        SHARD_ID1,
        "2",
        SHARD_ID0,
        "1"
    ), publishingReport.getCurrentOffsets());
  }

  @Test
  public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception
  {
    supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null);
    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(4);
    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    List<Task> tasks = captured.getValues();

    EasyMock.reset(taskStorage, taskClient, taskQueue);

    TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>();
    checkpoints1.put(0, ImmutableMap.of(
        SHARD_ID1,

        "0",

        SHARD_ID0,

        "0"
    ));
    TreeMap<Integer, Map<String, String>> checkpoints2 = new TreeMap<>();
    checkpoints2.put(0, ImmutableMap.of(SHARD_ID0, "0"));
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints1))
            .times(2);
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints2))
            .times(2);


    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes();
    for (Task task : tasks) {
      EasyMock.expect(taskStorage.getStatus(task.getId()))
              .andReturn(Optional.of(TaskStatus.running(task.getId())))
              .anyTimes();
      EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
      EasyMock.expect(taskClient.getStatusAsync(task.getId()))
              .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED));
      EasyMock.expect(taskClient.getStartTimeAsync(task.getId()))
              .andReturn(Futures.immediateFailedFuture(new RuntimeException()));
      taskQueue.shutdown(task.getId(), "Task [%s] failed to return start time, killing task", task.getId());
    }
    EasyMock.replay(taskStorage, taskClient, taskQueue);

    supervisor.runInternal();
    verifyAll();
  }

  @Test
  public void testKillUnresponsiveTasksWhilePausing() throws Exception
  {
    final TaskLocation location = new TaskLocation("testHost", 1234, -1);

    supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(4);
    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    List<Task> tasks = captured.getValues();
    Collection workItems = new ArrayList<>();
    for (Task task : tasks) {
      workItems.add(new TestTaskRunnerWorkItem(task, null, location));
    }

    EasyMock.reset(taskStorage, taskRunner, taskClient, taskQueue);

    TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>();
    checkpoints1.put(0, ImmutableMap.of(
        SHARD_ID1,
        "0"
    ));
    TreeMap<Integer, Map<String, String>> checkpoints2 = new TreeMap<>();
    checkpoints2.put(0, ImmutableMap.of(SHARD_ID0, "0"));
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints1))
            .times(2);
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints2))
            .times(2);

    captured = Capture.newInstance(CaptureType.ALL);
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes();
    for (Task task : tasks) {
      EasyMock.expect(taskStorage.getStatus(task.getId()))
              .andReturn(Optional.of(TaskStatus.running(task.getId())))
              .anyTimes();
      EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
    }
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
            .anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.contains("sequenceName-0")))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc().minusMinutes(2)))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()));
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.contains("sequenceName-1")))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .times(2);
    EasyMock.expect(taskClient.pauseAsync(EasyMock.contains("sequenceName-0")))
            .andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(2);
    taskQueue.shutdown(
        EasyMock.contains("sequenceName-0"),
        EasyMock.eq("An exception occured while waiting for task [%s] to pause: [%s]"),
        EasyMock.contains("sequenceName-0"),
        EasyMock.anyString()
    );
    EasyMock.expectLastCall().times(2);
    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);

    EasyMock.replay(taskStorage, taskRunner, taskClient, taskQueue);

    supervisor.runInternal();
    verifyAll();

    for (Task task : captured.getValues()) {
      KinesisIndexTaskIOConfig taskConfig = ((KinesisIndexTask) task).getIOConfig();
      Assert.assertEquals(
          "0",
          taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1)
      );
      Assert.assertNull(
          taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0)
      );
    }
  }

  @Test
  public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception
  {
    final TaskLocation location = new TaskLocation("testHost", 1234, -1);

    supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(4);
    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    List<Task> tasks = captured.getValues();
    Collection workItems = new ArrayList<>();
    for (Task task : tasks) {
      workItems.add(new TestTaskRunnerWorkItem(task, null, location));
    }

    EasyMock.reset(taskStorage, taskRunner, taskClient, taskQueue);

    TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>();
    checkpoints1.put(0, ImmutableMap.of(
        SHARD_ID1,
        "0"
    ));
    TreeMap<Integer, Map<String, String>> checkpoints2 = new TreeMap<>();
    checkpoints2.put(0, ImmutableMap.of(SHARD_ID0, "0"));
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints1))
            .times(2);
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints2))
            .times(2);

    captured = Capture.newInstance(CaptureType.ALL);
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes();
    for (Task task : tasks) {
      EasyMock.expect(taskStorage.getStatus(task.getId()))
              .andReturn(Optional.of(TaskStatus.running(task.getId())))
              .anyTimes();
      EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
    }
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
            .anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.contains("sequenceName-0")))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc().minusMinutes(2)))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()));
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.contains("sequenceName-1")))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .times(2);
    EasyMock.expect(taskClient.pauseAsync(EasyMock.contains("sequenceName-0")))
            .andReturn(Futures.immediateFuture(ImmutableMap.of(
                SHARD_ID1,
                "1"
            )))
            .andReturn(Futures.immediateFuture(ImmutableMap.of(
                SHARD_ID1,
                "3"
            )));
    EasyMock.expect(
        taskClient.setEndOffsetsAsync(
            EasyMock.contains("sequenceName-0"),
            EasyMock.eq(ImmutableMap.of(
                SHARD_ID1,
                "3"
            )),
            EasyMock.eq(true)
        )
    ).andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(2);
    taskQueue.shutdown(
        EasyMock.contains("sequenceName-0"),
        EasyMock.eq("Task [%s] failed to respond to [set end offsets] in a timely manner, killing task"),
        EasyMock.contains("sequenceName-0")
    );
    EasyMock.expectLastCall().times(2);
    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);

    EasyMock.replay(taskStorage, taskRunner, taskClient, taskQueue);

    supervisor.runInternal();
    verifyAll();

    for (Task task : captured.getValues()) {
      KinesisIndexTaskIOConfig taskConfig = ((KinesisIndexTask) task).getIOConfig();
      Assert.assertEquals(
          "0",
          taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1)
      );
    }
  }

  @Test(expected = IllegalStateException.class)
  public void testStopNotStarted()
  {
    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
    supervisor.stop(false);
  }

  @Test
  public void testStop()
  {
    supervisorRecordSupplier.close();
    EasyMock.expectLastCall().anyTimes();

    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    taskClient.close();
    taskRunner.unregisterListener(StringUtils.format("KinesisSupervisor-%s", DATASOURCE));
    replayAll();

    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
    supervisor.start();
    supervisor.stop(false);

    verifyAll();
  }

  @Test
  public void testStopGracefully() throws Exception
  {
    final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
    final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
    final DateTime startTime = DateTimes.nowUtc();

    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Task id1 = createKinesisIndexTask(
        "id1",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"),
            ImmutableSet.of()
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    Task id2 = createKinesisIndexTask(
        "id2",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                "3",
                SHARD_ID0,
                "1"
            ),
            ImmutableSet.of(SHARD_ID0, SHARD_ID1)
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    Task id3 = createKinesisIndexTask(
        "id3",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "3", SHARD_ID0, "1"),
            ImmutableSet.of(SHARD_ID0, SHARD_ID1)
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    Collection workItems = new ArrayList<>();
    workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
    workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));

    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync("id1"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.PUBLISHING));
    EasyMock.expect(taskClient.getStatusAsync("id2"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING));
    EasyMock.expect(taskClient.getStatusAsync("id3"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING));
    EasyMock.expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
    EasyMock.expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));
    EasyMock.expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of(
        SHARD_ID1,
        "3",
        SHARD_ID0,
        "1"
    ));

    // getCheckpoints will not be called for id1 as it is in publishing state
    TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
    checkpoints.put(0, ImmutableMap.of(
        SHARD_ID1,
        "3",
        SHARD_ID0,
        "1"
    ));
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .times(1);
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .times(1);

    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    EasyMock.reset(taskRunner, taskClient, taskQueue);
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
    EasyMock.expect(taskClient.pauseAsync("id2"))
            .andReturn(Futures.immediateFuture(ImmutableMap.of(
                SHARD_ID1,
                "12",
                SHARD_ID0,
                "1"
            )));
    EasyMock.expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(
        SHARD_ID1,
        "12",
        SHARD_ID0,
        "1"
    ), true))
            .andReturn(Futures.immediateFuture(true));
    taskQueue.shutdown("id3", "Killing task for graceful shutdown");
    EasyMock.expectLastCall().times(1);
    taskQueue.shutdown("id3", "Killing task [%s] which hasn't been assigned to a worker", "id3");
    EasyMock.expectLastCall().times(1);

    EasyMock.replay(taskRunner, taskClient, taskQueue);

    supervisor.gracefulShutdownInternal();
    verifyAll();
  }

  @Test
  public void testResetNoTasks()
  {
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(EasyMock.anyObject()))
            .andReturn(Collections.emptySet())
            .anyTimes();

    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    replayAll();

    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);


    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    EasyMock.reset(indexerMetadataStorageCoordinator);
    EasyMock.expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true);
    EasyMock.replay(indexerMetadataStorageCoordinator);

    supervisor.resetInternal(null);
    verifyAll();

  }

  @Test
  public void testResetDataSourceMetadata() throws Exception
  {
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(EasyMock.anyObject()))
            .andReturn(Collections.emptySet())
            .anyTimes();
    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    Capture<String> captureDataSource = EasyMock.newCapture();
    Capture<DataSourceMetadata> captureDataSourceMetadata = EasyMock.newCapture();

    KinesisDataSourceMetadata kinesisDataSourceMetadata = new KinesisDataSourceMetadata(
        new SeekableStreamStartSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            ),
            ImmutableSet.of()
        )
    );

    KinesisDataSourceMetadata resetMetadata = new KinesisDataSourceMetadata(
        new SeekableStreamStartSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            ),
            ImmutableSet.of()
        )
    );

    KinesisDataSourceMetadata expectedMetadata = new KinesisDataSourceMetadata(
        new SeekableStreamStartSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            ),
            ImmutableSet.of()
        )
    );

    EasyMock.reset(indexerMetadataStorageCoordinator);
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
            .andReturn(kinesisDataSourceMetadata);
    EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(
        EasyMock.capture(captureDataSource),
        EasyMock.capture(captureDataSourceMetadata)
    )).andReturn(true);
    EasyMock.replay(indexerMetadataStorageCoordinator);

    try {
      supervisor.resetInternal(resetMetadata);
    }
    catch (NullPointerException npe) {
      // Expected as there will be an attempt to EasyMock.reset partitionGroups sequences to NOT_SET
      // however there would be no entries in the map as we have not put nay data in kafka
      Assert.assertNull(npe.getCause());
    }
    verifyAll();

    Assert.assertEquals(captureDataSource.getValue(), DATASOURCE);
    Assert.assertEquals(captureDataSourceMetadata.getValue(), expectedMetadata);
  }

  @Test
  public void testResetNoDataSourceMetadata()
  {
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(EasyMock.anyObject()))
            .andReturn(Collections.emptySet())
            .anyTimes();
    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    KinesisDataSourceMetadata resetMetadata = new KinesisDataSourceMetadata(
        new SeekableStreamStartSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(SHARD_ID0, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER),
            ImmutableSet.of()
        )
    );

    EasyMock.reset(indexerMetadataStorageCoordinator);
    // no DataSourceMetadata in metadata store
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null);
    EasyMock.replay(indexerMetadataStorageCoordinator);

    supervisor.resetInternal(resetMetadata);
    verifyAll();
  }

  @Test
  public void testGetOffsetFromStorageForPartitionWithResetOffsetAutomatically() throws Exception
  {
    supervisor = getTestableSupervisor(1, 1, true, true, "PT1H", null, null, false);

    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("300").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("400").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));

    EasyMock.reset(indexerMetadataStorageCoordinator);
    // unknown DataSourceMetadata in metadata store
    EasyMock
        .expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
        .andReturn(
            new KinesisDataSourceMetadata(
                new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID1, "100", SHARD_ID2, "200"))
            )
        )
        .times(2);

    // Since shard 2 was in metadata before but is not in the list of shards returned by the record supplier,
    // it gets deleted from metadata (it is an expired shard)
    EasyMock
        .expect(
            indexerMetadataStorageCoordinator.resetDataSourceMetadata(
                DATASOURCE,
                new KinesisDataSourceMetadata(
                    new SeekableStreamEndSequenceNumbers<>(
                        STREAM,
                        ImmutableMap.of(SHARD_ID1, "100", SHARD_ID2, KinesisSequenceNumber.EXPIRED_MARKER)
                    )
                )
            )
        )
        .andReturn(true)
        .times(1);

    // getOffsetFromStorageForPartition() throws an exception when the offsets are automatically reset.
    // Since getOffsetFromStorageForPartition() is called per partition, all partitions can't be reset at the same time.
    // Instead, subsequent partitions will be reset in the following supervisor runs.
    EasyMock
        .expect(
            indexerMetadataStorageCoordinator.resetDataSourceMetadata(
                DATASOURCE,
                new KinesisDataSourceMetadata(
                    // Only one partition is reset in a single supervisor run.
                    new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of())
                )
            )
        )
        .andReturn(true)
        .times(1);

    EasyMock
        .expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
        .andReturn(
            new KinesisDataSourceMetadata(
                new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID1, "100"))
            )
        )
        .times(2);

    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();
  }

  @Test
  public void testResetRunningTasks() throws Exception
  {
    final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
    final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
    final DateTime startTime = DateTimes.nowUtc();

    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Task id1 = createKinesisIndexTask(
        "id1",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"),
            ImmutableSet.of()
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    Task id2 = createKinesisIndexTask(
        "id2",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "3", SHARD_ID0, "1"),
            ImmutableSet.of(SHARD_ID0, SHARD_ID1)
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    Task id3 = createKinesisIndexTask(
        "id3",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "3", SHARD_ID0, "1"),
            ImmutableSet.of(SHARD_ID0, SHARD_ID1)
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    Collection workItems = new ArrayList<>();
    workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
    workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));

    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync("id1"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.PUBLISHING));
    EasyMock.expect(taskClient.getStatusAsync("id2"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING));
    EasyMock.expect(taskClient.getStatusAsync("id3"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING));
    EasyMock.expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
    EasyMock.expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));
    EasyMock.expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of(
        SHARD_ID1,

        "3",

        SHARD_ID0,

        "1"
    ));

    TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
    checkpoints.put(0, ImmutableMap.of(
        SHARD_ID1,

        "3",

        SHARD_ID0,

        "1"
    ));
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .times(1);
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .times(1);

    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    EasyMock.reset(taskQueue, indexerMetadataStorageCoordinator);
    EasyMock.expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true);
    taskQueue.shutdown("id2", "DataSourceMetadata is not found while reset");
    taskQueue.shutdown("id3", "DataSourceMetadata is not found while reset");
    EasyMock.replay(taskQueue, indexerMetadataStorageCoordinator);

    supervisor.resetInternal(null);
    verifyAll();
  }

  @Test
  public void testNoDataIngestionTasks() throws Exception
  {
    final DateTime startTime = DateTimes.nowUtc();
    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);

    //not adding any events
    Task id1 = createKinesisIndexTask(
        "id1",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"),
            ImmutableSet.of()
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    Task id2 = createKinesisIndexTask(
        "id2",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "10", SHARD_ID0, "20"),
            ImmutableSet.of(SHARD_ID0, SHARD_ID1)
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    Task id3 = createKinesisIndexTask(
        "id3",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "10", SHARD_ID0, "20"),
            ImmutableSet.of(SHARD_ID0, SHARD_ID1)
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync("id1"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING));
    EasyMock.expect(taskClient.getStatusAsync("id2"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING));
    EasyMock.expect(taskClient.getStatusAsync("id3"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING));
    EasyMock.expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime));
    EasyMock.expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
    EasyMock.expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));

    TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
    checkpoints.put(0, ImmutableMap.of(
        SHARD_ID1,
        "10",
        SHARD_ID0,
        "20"
    ));
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .times(1);
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .times(1);
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .times(1);

    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    EasyMock.reset(taskQueue, indexerMetadataStorageCoordinator);
    EasyMock.expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true);
    taskQueue.shutdown("id1", "DataSourceMetadata is not found while reset");
    taskQueue.shutdown("id2", "DataSourceMetadata is not found while reset");
    taskQueue.shutdown("id3", "DataSourceMetadata is not found while reset");
    EasyMock.replay(taskQueue, indexerMetadataStorageCoordinator);

    supervisor.resetInternal(null);
    verifyAll();
  }


  @Test(timeout = 60_000L)
  public void testCheckpointForInactiveTaskGroup() throws InterruptedException
  {
    supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false);
    //not adding any events
    final KinesisIndexTask id1;
    id1 = createKinesisIndexTask(
        "id1",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"),
            ImmutableSet.of()
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    final Task id2 = createKinesisIndexTask(
        "id2",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "10", SHARD_ID0, "20"),
            ImmutableSet.of(SHARD_ID0, SHARD_ID1)
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    final Task id3 = createKinesisIndexTask(
        "id3",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "10", SHARD_ID0, "20"),
            ImmutableSet.of(SHARD_ID0, SHARD_ID1)
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
    final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
    Collection workItems = new ArrayList<>();
    workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
    workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
    workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));

    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
    EasyMock.expect(
        indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new KinesisDataSourceMetadata(
        null)
    ).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync("id1"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING));
    EasyMock.expect(taskClient.getStatusAsync("id2"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING));
    EasyMock.expect(taskClient.getStatusAsync("id3"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING));

    final DateTime startTime = DateTimes.nowUtc();
    EasyMock.expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime));
    EasyMock.expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
    EasyMock.expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));

    final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
    checkpoints.put(0, ImmutableMap.of(SHARD_ID1, "10", SHARD_ID0, "20"));
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .times(1);
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .times(1);
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .times(1);

    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    replayAll();

    supervisor.start();
    supervisor.runInternal();

    supervisor.moveTaskGroupToPendingCompletion(0);
    supervisor.checkpoint(
        0,
        new KinesisDataSourceMetadata(
            new SeekableStreamStartSequenceNumbers<>(STREAM, checkpoints.get(0), checkpoints.get(0).keySet())
        )
    );

    while (supervisor.getNoticesQueueSize() > 0) {
      Thread.sleep(100);
    }

    verifyAll();

    Assert.assertNull(serviceEmitter.getStackTrace(), serviceEmitter.getStackTrace());
    Assert.assertNull(serviceEmitter.getExceptionMessage(), serviceEmitter.getExceptionMessage());
    Assert.assertNull(serviceEmitter.getExceptionClass());
  }

  @Test(timeout = 60_000L)
  public void testCheckpointForUnknownTaskGroup()
      throws InterruptedException
  {
    supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false);


    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    //not adding any events
    final KinesisIndexTask id1 = createKinesisIndexTask(
        "id1",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"),
            ImmutableSet.of()
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    final Task id2 = createKinesisIndexTask(
        "id2",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "10", SHARD_ID0, "20"),
            ImmutableSet.of(SHARD_ID0, SHARD_ID1)
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    final Task id3 = createKinesisIndexTask(
        "id3",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "10", SHARD_ID0, "20"),
            ImmutableSet.of(SHARD_ID0, SHARD_ID1)
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
    EasyMock.expect(
        indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new KinesisDataSourceMetadata(
        null)
    ).anyTimes();

    replayAll();

    supervisor.start();

    supervisor.checkpoint(
        0,
        new KinesisDataSourceMetadata(
            new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.emptyMap(), ImmutableSet.of())
        )
    );

    while (supervisor.getNoticesQueueSize() > 0) {
      Thread.sleep(100);
    }

    verifyAll();

    while (serviceEmitter.getStackTrace() == null) {
      Thread.sleep(100);
    }

    Assert.assertTrue(serviceEmitter.getStackTrace()
                                    .startsWith("org.apache.druid.java.util.common.ISE: Cannot find"));
    Assert.assertEquals(
        "Cannot find taskGroup [0] among all activelyReadingTaskGroups [{}]",
        serviceEmitter.getExceptionMessage()
    );
    Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
  }

  @Test
  public void testSuspendedNoRunningTasks() throws Exception
  {
    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, true);

    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(EasyMock.anyObject()))
            .andReturn(Collections.emptySet())
            .anyTimes();
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    // this asserts that taskQueue.add does not in fact get called because supervisor should be suspended
    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andAnswer((IAnswer) () -> {
      Assert.fail();
      return null;
    }).anyTimes();
    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();
  }

  @Test
  public void testSuspendedRunningTasks() throws Exception
  {
    // graceful shutdown is expected to be called on running tasks since state is suspended

    final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
    final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
    final DateTime startTime = DateTimes.nowUtc();

    supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, true);

    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD1_PARTITION)).andReturn("12").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Task id1 = createKinesisIndexTask(
        "id1",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "0", SHARD_ID0, "0"),
            ImmutableSet.of()
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    Task id2 = createKinesisIndexTask(
        "id2",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "3", SHARD_ID0, "1"),
            ImmutableSet.of(SHARD_ID0, SHARD_ID1)
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    Task id3 = createKinesisIndexTask(
        "id3",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(SHARD_ID1, "3", SHARD_ID0, "1"),
            ImmutableSet.of(SHARD_ID0, SHARD_ID1)
        ),
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
                SHARD_ID0,
                KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        ),
        null,
        null
    );

    Collection workItems = new ArrayList<>();
    workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
    workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));

    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync("id1"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.PUBLISHING));
    EasyMock.expect(taskClient.getStatusAsync("id2"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING));
    EasyMock.expect(taskClient.getStatusAsync("id3"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING));
    EasyMock.expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
    EasyMock.expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));
    EasyMock.expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of(
        SHARD_ID1,
        "3",
        SHARD_ID0,
        "1"
    ));

    // getCheckpoints will not be called for id1 as it is in publishing state
    TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
    checkpoints.put(0, ImmutableMap.of(
        SHARD_ID1,
        "3",
        SHARD_ID0,
        "1"
    ));
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .times(1);
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .times(1);

    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));

    EasyMock.expect(taskClient.pauseAsync("id2"))
            .andReturn(Futures.immediateFuture(ImmutableMap.of(
                SHARD_ID1,
                "12",
                SHARD_ID0,
                "1"
            )));
    EasyMock.expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(
        SHARD_ID1,
        "12",
        SHARD_ID0,
        "1"
    ), true))
            .andReturn(Futures.immediateFuture(true));
    taskQueue.shutdown("id3", "Killing task for graceful shutdown");
    EasyMock.expectLastCall().times(1);
    taskQueue.shutdown("id3", "Killing task [%s] which hasn't been assigned to a worker", "id3");
    EasyMock.expectLastCall().times(1);

    replayAll();
    supervisor.start();
    supervisor.runInternal();
    verifyAll();
  }

  @Test
  public void testResetSuspended()
  {
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(EasyMock.anyObject()))
            .andReturn(Collections.emptySet())
            .anyTimes();
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    replayAll();

    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, true);
    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    EasyMock.reset(indexerMetadataStorageCoordinator);
    EasyMock.expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true);
    EasyMock.replay(indexerMetadataStorageCoordinator);

    supervisor.resetInternal(null);
    verifyAll();
  }


  @Test
  public void testGetCurrentTotalStats()
  {
    supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null, false);
    supervisor.setPartitionIdsForTests(
        ImmutableList.of(SHARD_ID0, SHARD_ID1)
    );
    supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition(SHARD_ID0),
        ImmutableMap.of("0", "0"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task1"),
        ImmutableSet.of()
    );

    supervisor.addTaskGroupToPendingCompletionTaskGroup(
        supervisor.getTaskGroupIdForPartition(SHARD_ID1),
        ImmutableMap.of("0", "0"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task2"),
        ImmutableSet.of()
    );

    EasyMock.expect(taskClient.getMovingAveragesAsync("task1")).andReturn(Futures.immediateFuture(ImmutableMap.of(
        "prop1",
        "val1"
    ))).times(1);

    EasyMock.expect(taskClient.getMovingAveragesAsync("task2")).andReturn(Futures.immediateFuture(ImmutableMap.of(
        "prop2",
        "val2"
    ))).times(1);

    replayAll();

    Map<String, Map<String, Object>> stats = supervisor.getStats();

    verifyAll();

    Assert.assertEquals(2, stats.size());
    Assert.assertEquals(ImmutableSet.of("0", "1"), stats.keySet());
    Assert.assertEquals(ImmutableMap.of("task1", ImmutableMap.of("prop1", "val1")), stats.get("0"));
    Assert.assertEquals(ImmutableMap.of("task2", ImmutableMap.of("prop2", "val2")), stats.get("1"));
  }

  @Test
  public void testDoNotKillCompatibleTasks()
      throws InterruptedException, EntryExistsException
  {
    // This supervisor always returns true for isTaskCurrent -> it should not kill its tasks
    int numReplicas = 2;
    supervisor = getTestableSupervisorCustomIsTaskCurrent(
        numReplicas,
        1,
        true,
        "PT1H",
        new Period("P1D"),
        new Period("P1D"),
        false,
        42,
        1000,
        true
    );

    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)).anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Task task = createKinesisIndexTask(
        "id2",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(
                SHARD_ID0,
                "0",
                SHARD_ID1,
                "0"
            ),
            ImmutableSet.of()
        ),
        new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(
            SHARD_ID0,
            "1",
            SHARD_ID1,
            "12"
        )),
        null,
        null
    );

    List<Task> existingTasks = ImmutableList.of(task);

    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
    EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(task)).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();

    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true);

    TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
    checkpoints.put(0, ImmutableMap.of(
        SHARD_ID0,
        "0",
        SHARD_ID1,
        "0"
    ));

    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .times(numReplicas);

    replayAll();
    supervisor.start();
    supervisor.runInternal();
    verifyAll();
  }

  @Test
  public void testKillIncompatibleTasks()
      throws InterruptedException, EntryExistsException
  {
    // This supervisor always returns false for isTaskCurrent -> it should kill its tasks
    int numReplicas = 2;
    supervisor = getTestableSupervisorCustomIsTaskCurrent(
        numReplicas,
        1,
        true,
        "PT1H",
        new Period("P1D"),
        new Period("P1D"),
        false,
        42,
        1000,
        false
    );
    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)).anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
            .anyTimes();
    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Task task = createKinesisIndexTask(
        "id1",
        DATASOURCE,
        0,
        new SeekableStreamStartSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(
                SHARD_ID0,
                "0",
                SHARD_ID1,
                "0"
            ),
            ImmutableSet.of()
        ),
        new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(
            SHARD_ID0,
            "1",
            SHARD_ID1,
            "12"
        )),
        null,
        null
    );

    List<Task> existingTasks = ImmutableList.of(task);

    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();
    EasyMock.expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true));
    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true).times(2);

    replayAll();
    supervisor.start();
    supervisor.runInternal();
    verifyAll();
  }

  @Test
  public void testIsTaskCurrent()
  {
    DateTime minMessageTime = DateTimes.nowUtc();
    DateTime maxMessageTime = DateTimes.nowUtc().plus(10000);

    KinesisSupervisor supervisor = getSupervisor(
        1,
        1,
        true,
        "PT1H",
        new Period("P1D"),
        new Period("P1D"),
        false,
        42,
        42,
        dataSchema,
        tuningConfig
    );

    supervisor.addTaskGroupToActivelyReadingTaskGroup(
        42,
        ImmutableMap.of(SHARD_ID1, "3"),
        Optional.of(minMessageTime),
        Optional.of(maxMessageTime),
        ImmutableSet.of("id1", "id2", "id3", "id4"),
        ImmutableSet.of()
    );

    DataSchema modifiedDataSchema = getDataSchema("some other datasource");

    KinesisSupervisorTuningConfig modifiedTuningConfig = new KinesisSupervisorTuningConfig(
        1000,
        null,
        50000,
        null,
        new Period("P1Y"),
        new File("/test"),
        null,
        null,
        null,
        true,
        false,
        null,
        null,
        null,
        null,
        numThreads,
        TEST_CHAT_THREADS,
        TEST_CHAT_RETRIES,
        TEST_HTTP_TIMEOUT,
        TEST_SHUTDOWN_TIMEOUT,
        null,
        null,
        null,
        5000,
        null,
        null,
        null,
        null,
        42, // This property is different from tuningConfig
        null,
        null,
        null
    );

    KinesisIndexTask taskFromStorage = createKinesisIndexTask(
        "id1",
        0,
        new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(
            SHARD_ID1,
            "3"
        ), ImmutableSet.of()),
        new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of(
            SHARD_ID1,
            KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
        )),
        minMessageTime,
        maxMessageTime,
        dataSchema
    );

    KinesisIndexTask taskFromStorageMismatchedDataSchema = createKinesisIndexTask(
        "id2",
        0,
        new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(
            SHARD_ID1,
            "3"
        ), ImmutableSet.of()),
        new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of(
            SHARD_ID1,
            KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
        )),
        minMessageTime,
        maxMessageTime,
        modifiedDataSchema
    );

    KinesisIndexTask taskFromStorageMismatchedTuningConfig = createKinesisIndexTask(
        "id3",
        0,
        new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(
            SHARD_ID1,
            "3"
        ), ImmutableSet.of()),
        new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of(
            SHARD_ID1,
            KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
        )),
        minMessageTime,
        maxMessageTime,
        dataSchema,
        modifiedTuningConfig
    );

    KinesisIndexTask taskFromStorageMismatchedPartitionsWithTaskGroup = createKinesisIndexTask(
        "id4",
        0,
        new SeekableStreamStartSequenceNumbers<>(
            "stream",
            ImmutableMap.of(
                SHARD_ID1,
                "4" // this is the mismatch
            ),
            ImmutableSet.of()
        ),
        new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of(
            SHARD_ID1,
            KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
        )),
        minMessageTime,
        maxMessageTime,
        dataSchema
    );

    EasyMock.expect(taskStorage.getTask("id1"))
            .andReturn(Optional.of(taskFromStorage))
            .once();
    EasyMock.expect(taskStorage.getTask("id2"))
            .andReturn(Optional.of(taskFromStorageMismatchedDataSchema))
            .once();
    EasyMock.expect(taskStorage.getTask("id3"))
            .andReturn(Optional.of(taskFromStorageMismatchedTuningConfig))
            .once();
    EasyMock.expect(taskStorage.getTask("id4"))
            .andReturn(Optional.of(taskFromStorageMismatchedPartitionsWithTaskGroup))
            .once();

    replayAll();

    Assert.assertTrue(supervisor.isTaskCurrent(42, "id1"));
    Assert.assertFalse(supervisor.isTaskCurrent(42, "id2"));
    Assert.assertFalse(supervisor.isTaskCurrent(42, "id3"));
    Assert.assertFalse(supervisor.isTaskCurrent(42, "id4"));
    verifyAll();
  }

  @Test
  public void testShardSplit() throws Exception
  {
    supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);
    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));

    List<Task> phaseOneTasks = testShardSplitPhaseOne();

    List<Task> phaseTwoTasks = testShardSplitPhaseTwo(phaseOneTasks);

    testShardSplitPhaseThree(phaseTwoTasks);
  }

  private List<Task> testShardSplitPhaseOne() throws Exception
  {
    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();

    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID0))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD0_PARTITION))
            .anyTimes();

    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();

    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(1);

    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    List<Task> tasks = captured.getValues();

    EasyMock.reset(taskStorage);
    EasyMock.reset(taskClient);

    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();
    TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>();
    checkpoints1.put(0, ImmutableMap.of(
        SHARD_ID0,
        "0"
    ));
    // there would be 1 task, since there is only 1 shard
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints1))
            .times(1);

    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes();

    for (Task task : tasks) {
      EasyMock.expect(taskStorage.getStatus(task.getId()))
              .andReturn(Optional.of(TaskStatus.running(task.getId())))
              .anyTimes();
      EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
    }
    EasyMock.replay(taskStorage);
    EasyMock.replay(taskClient);

    supervisor.runInternal();
    verifyAll();

    return tasks;
  }

  /**
   * Test task creation after a shard split with a closed shard
   *
   * @param phaseOneTasks List of tasks from the initial phase where only one shard was present
   */
  private List<Task> testShardSplitPhaseTwo(List<Task> phaseOneTasks) throws Exception
  {
    EasyMock.reset(indexerMetadataStorageCoordinator);
    EasyMock.reset(taskStorage);
    EasyMock.reset(taskQueue);
    EasyMock.reset(taskClient);
    EasyMock.reset(taskMaster);
    EasyMock.reset(taskRunner);
    EasyMock.reset(supervisorRecordSupplier);

    // first task ran, its shard 0 has reached EOS
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            new SeekableStreamEndSequenceNumbers<String, String>(
                STREAM,
                ImmutableMap.of(SHARD_ID0, KinesisSequenceNumber.END_OF_SHARD_MARKER)
            )
        )
    ).anyTimes();

    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID0, SHARD_ID1, SHARD_ID2))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD0_PARTITION, SHARD1_PARTITION, SHARD2_PARTITION))
            .anyTimes();

    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID0)))
            .andReturn(KinesisSequenceNumber.END_OF_SHARD_MARKER).anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID1)))
            .andReturn("100").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID2)))
            .andReturn("100").anyTimes();

    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL);

    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();
    Task successfulTask = phaseOneTasks.get(0);
    EasyMock.expect(taskStorage.getStatus(successfulTask.getId()))
            .andReturn(Optional.of(TaskStatus.success(successfulTask.getId())));
    EasyMock.expect(taskStorage.getTask(successfulTask.getId())).andReturn(Optional.of(successfulTask)).anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();

    EasyMock.expect(taskQueue.add(EasyMock.capture(postSplitCaptured))).andReturn(true).times(2);

    replayAll();

    supervisor.runInternal();
    verifyAll();

    EasyMock.reset(taskStorage);
    EasyMock.reset(taskClient);

    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();
    TreeMap<Integer, Map<String, String>> checkpointsGroup0 = new TreeMap<>();
    checkpointsGroup0.put(0, ImmutableMap.of(
        SHARD_ID1, "0"
    ));
    TreeMap<Integer, Map<String, String>> checkpointsGroup1 = new TreeMap<>();
    checkpointsGroup1.put(1, ImmutableMap.of(
        SHARD_ID2, "0"
    ));
    // there would be 2 tasks, 1 for each task group
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpointsGroup0))
            .times(1);
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpointsGroup1))
            .times(1);

    List<Task> postSplitTasks = postSplitCaptured.getValues();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(postSplitTasks).anyTimes();
    for (Task task : postSplitTasks) {
      EasyMock.expect(taskStorage.getStatus(task.getId()))
              .andReturn(Optional.of(TaskStatus.running(task.getId())))
              .anyTimes();
      EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
    }
    EasyMock.replay(taskStorage);
    EasyMock.replay(taskClient);

    supervisor.runInternal();
    verifyAll();

    // Check that shardId-000000000000 which has hit EOS is not included in the sequences sent to the task for group 0
    SeekableStreamStartSequenceNumbers<String, String> group0ExpectedStartSequenceNumbers =
        new SeekableStreamStartSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(
                SHARD_ID1, "0"
            ),
            ImmutableSet.of()
        );

    SeekableStreamEndSequenceNumbers<String, String> group0ExpectedEndSequenceNumbers =
        new SeekableStreamEndSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(
                SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        );

    SeekableStreamStartSequenceNumbers<String, String> group1ExpectedStartSequenceNumbers =
        new SeekableStreamStartSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(
                SHARD_ID2, "0"
            ),
            ImmutableSet.of()
        );

    SeekableStreamEndSequenceNumbers<String, String> group1ExpectedEndSequenceNumbers =
        new SeekableStreamEndSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(
                SHARD_ID2, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        );

    Assert.assertEquals(2, postSplitTasks.size());
    KinesisIndexTaskIOConfig group0Config = ((KinesisIndexTask) postSplitTasks.get(0)).getIOConfig();
    KinesisIndexTaskIOConfig group1Config = ((KinesisIndexTask) postSplitTasks.get(1)).getIOConfig();
    Assert.assertEquals((Integer) 0, group0Config.getTaskGroupId());
    Assert.assertEquals((Integer) 1, group1Config.getTaskGroupId());
    Assert.assertEquals(group0ExpectedStartSequenceNumbers, group0Config.getStartSequenceNumbers());
    Assert.assertEquals(group0ExpectedEndSequenceNumbers, group0Config.getEndSequenceNumbers());
    Assert.assertEquals(group1ExpectedStartSequenceNumbers, group1Config.getStartSequenceNumbers());
    Assert.assertEquals(group1ExpectedEndSequenceNumbers, group1Config.getEndSequenceNumbers());

    return postSplitTasks;
  }

  /**
   * Test task creation after a shard split with a closed shard, with the closed shards expiring and no longer
   * being returned from record supplier.
   *
   * @param phaseTwoTasks List of tasks from the second phase where closed but not expired shards were present.
   */
  private void testShardSplitPhaseThree(List<Task> phaseTwoTasks) throws Exception
  {
    EasyMock.reset(indexerMetadataStorageCoordinator);
    EasyMock.reset(taskStorage);
    EasyMock.reset(taskQueue);
    EasyMock.reset(taskClient);
    EasyMock.reset(taskMaster);
    EasyMock.reset(taskRunner);
    EasyMock.reset(supervisorRecordSupplier);

    // second set of tasks ran, shard 0 has expired, but shard 1 and 2 have data
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            new SeekableStreamEndSequenceNumbers<String, String>(
                STREAM,
                ImmutableMap.of(
                    SHARD_ID0, KinesisSequenceNumber.END_OF_SHARD_MARKER,
                    SHARD_ID1, "100",
                    SHARD_ID2, "100"
                )
            )
        )
    ).anyTimes();

    EasyMock.expect(
        indexerMetadataStorageCoordinator.resetDataSourceMetadata(
            DATASOURCE,
            new KinesisDataSourceMetadata(
                new SeekableStreamEndSequenceNumbers<String, String>(
                    STREAM,
                    ImmutableMap.of(
                        SHARD_ID0, KinesisSequenceNumber.EXPIRED_MARKER,
                        SHARD_ID1, "100",
                        SHARD_ID2, "100"
                    )
                )
            )
        )
    ).andReturn(true).anyTimes();

    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID2))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD2_PARTITION))
            .anyTimes();

    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID1)))
            .andReturn("200").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID2)))
            .andReturn("200").anyTimes();

    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL);

    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();

    Task successfulTask0 = phaseTwoTasks.get(0);
    EasyMock.expect(taskStorage.getStatus(successfulTask0.getId()))
            .andReturn(Optional.of(TaskStatus.success(successfulTask0.getId())));
    EasyMock.expect(taskStorage.getTask(successfulTask0.getId())).andReturn(Optional.of(successfulTask0)).anyTimes();

    Task successfulTask1 = phaseTwoTasks.get(1);
    EasyMock.expect(taskStorage.getStatus(successfulTask1.getId()))
            .andReturn(Optional.of(TaskStatus.success(successfulTask1.getId())));
    EasyMock.expect(taskStorage.getTask(successfulTask1.getId())).andReturn(Optional.of(successfulTask1)).anyTimes();

    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();

    EasyMock.expect(taskQueue.add(EasyMock.capture(postSplitCaptured))).andReturn(true).times(2);

    replayAll();

    supervisor.runInternal();
    verifyAll();

    EasyMock.reset(taskStorage);
    EasyMock.reset(taskClient);

    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();
    TreeMap<Integer, Map<String, String>> checkpointsGroup0 = new TreeMap<>();
    checkpointsGroup0.put(0, ImmutableMap.of(
        SHARD_ID2, "100"
    ));
    TreeMap<Integer, Map<String, String>> checkpointsGroup1 = new TreeMap<>();
    checkpointsGroup1.put(1, ImmutableMap.of(
        SHARD_ID1, "100"
    ));
    // there would be 2 tasks, 1 for each task group
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpointsGroup0))
            .times(1);
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpointsGroup1))
            .times(1);

    List<Task> postSplitTasks = postSplitCaptured.getValues();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(postSplitTasks).anyTimes();
    for (Task task : postSplitTasks) {
      EasyMock.expect(taskStorage.getStatus(task.getId()))
              .andReturn(Optional.of(TaskStatus.running(task.getId())))
              .anyTimes();
      EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
    }
    EasyMock.replay(taskStorage);
    EasyMock.replay(taskClient);

    supervisor.runInternal();
    verifyAll();


    // Check that shardId-000000000000 which has hit EOS is not included in the sequences sent to the task for group 0
    SeekableStreamStartSequenceNumbers<String, String> group0ExpectedStartSequenceNumbers =
        new SeekableStreamStartSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(
                SHARD_ID1, "100"
            ),
            ImmutableSet.of(SHARD_ID1)
        );

    SeekableStreamEndSequenceNumbers<String, String> group0ExpectedEndSequenceNumbers =
        new SeekableStreamEndSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(
                SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        );

    SeekableStreamStartSequenceNumbers<String, String> group1ExpectedStartSequenceNumbers =
        new SeekableStreamStartSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(
                SHARD_ID2, "100"
            ),
            ImmutableSet.of(SHARD_ID2)
        );

    SeekableStreamEndSequenceNumbers<String, String> group1ExpectedEndSequenceNumbers =
        new SeekableStreamEndSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(
                SHARD_ID2, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        );

    Assert.assertEquals(2, postSplitTasks.size());
    KinesisIndexTaskIOConfig group0Config = ((KinesisIndexTask) postSplitTasks.get(0)).getIOConfig();
    KinesisIndexTaskIOConfig group1Config = ((KinesisIndexTask) postSplitTasks.get(1)).getIOConfig();
    Assert.assertEquals((Integer) 0, group0Config.getTaskGroupId());
    Assert.assertEquals((Integer) 1, group1Config.getTaskGroupId());
    Assert.assertEquals(group0ExpectedStartSequenceNumbers, group0Config.getStartSequenceNumbers());
    Assert.assertEquals(group0ExpectedEndSequenceNumbers, group0Config.getEndSequenceNumbers());
    Assert.assertEquals(group1ExpectedStartSequenceNumbers, group1Config.getStartSequenceNumbers());
    Assert.assertEquals(group1ExpectedEndSequenceNumbers, group1Config.getEndSequenceNumbers());

    Map<Integer, Set<String>> expectedPartitionGroups = ImmutableMap.of(
        0, ImmutableSet.of(SHARD_ID1),
        1, ImmutableSet.of(SHARD_ID2)
    );
    Assert.assertEquals(expectedPartitionGroups, supervisor.getPartitionGroups());

    ConcurrentHashMap<String, String> expectedPartitionOffsets = new ConcurrentHashMap<>(
        ImmutableMap.of(
            SHARD_ID2, "-1",
            SHARD_ID1, "-1",
            SHARD_ID0, "-1"
        )
    );
    Assert.assertEquals(expectedPartitionOffsets, supervisor.getPartitionOffsets());

  }

  @Test
  public void testShardMerge() throws Exception
  {
    supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);
    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));

    List<Task> phaseOneTasks = testShardMergePhaseOne();
    List<Task> phaseTwoTasks = testShardMergePhaseTwo(phaseOneTasks);
    testShardMergePhaseThree(phaseTwoTasks);
  }

  private List<Task> testShardMergePhaseOne() throws Exception
  {
    supervisorRecordSupplier.assign(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();

    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID0, SHARD_ID1))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD0_PARTITION, SHARD1_PARTITION))
            .anyTimes();

    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            null
        )
    ).anyTimes();

    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);

    replayAll();

    supervisor.start();
    supervisor.runInternal();
    verifyAll();

    List<Task> tasks = captured.getValues();

    EasyMock.reset(taskStorage);
    EasyMock.reset(taskClient);

    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();

    TreeMap<Integer, Map<String, String>> checkpoints0 = new TreeMap<>();
    checkpoints0.put(0, ImmutableMap.of(
        SHARD_ID1,
        "0"
    ));
    TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>();
    checkpoints1.put(0, ImmutableMap.of(
        SHARD_ID0,
        "0"
    ));
    // there would be 2 tasks, 1 for each task group
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints0))
            .times(1);
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints1))
            .times(1);

    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes();
    for (Task task : tasks) {
      EasyMock.expect(taskStorage.getStatus(task.getId()))
              .andReturn(Optional.of(TaskStatus.running(task.getId())))
              .anyTimes();
      EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
    }
    EasyMock.replay(taskStorage);
    EasyMock.replay(taskClient);

    supervisor.runInternal();
    verifyAll();

    return tasks;
  }

  /**
   * Test task creation after a shard split with a closed shard
   *
   * @param phaseOneTasks List of tasks from the initial phase where only one shard was present
   */
  private List<Task> testShardMergePhaseTwo(List<Task> phaseOneTasks) throws Exception
  {
    EasyMock.reset(indexerMetadataStorageCoordinator);
    EasyMock.reset(taskStorage);
    EasyMock.reset(taskQueue);
    EasyMock.reset(taskClient);
    EasyMock.reset(taskMaster);
    EasyMock.reset(taskRunner);
    EasyMock.reset(supervisorRecordSupplier);

    // first tasks ran, both shard 0 and shard 1 have reached EOS, merged into shard 2
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            new SeekableStreamEndSequenceNumbers<String, String>(
                STREAM,
                ImmutableMap.of(
                    SHARD_ID0, KinesisSequenceNumber.END_OF_SHARD_MARKER,
                    SHARD_ID1, KinesisSequenceNumber.END_OF_SHARD_MARKER
                )
            )
        )
    ).anyTimes();

    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID0, SHARD_ID1, SHARD_ID2))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD0_PARTITION, SHARD1_PARTITION, SHARD2_PARTITION))
            .anyTimes();

    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID0)))
            .andReturn(KinesisSequenceNumber.END_OF_SHARD_MARKER).anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID1)))
            .andReturn(KinesisSequenceNumber.END_OF_SHARD_MARKER).anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID2)))
            .andReturn("100").anyTimes();

    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Capture<Task> postMergeCaptured = Capture.newInstance(CaptureType.ALL);

    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();

    Task successfulTask0 = phaseOneTasks.get(0);
    Task successfulTask1 = phaseOneTasks.get(1);
    EasyMock.expect(taskStorage.getStatus(successfulTask0.getId()))
            .andReturn(Optional.of(TaskStatus.success(successfulTask0.getId())));
    EasyMock.expect(taskStorage.getTask(successfulTask0.getId())).andReturn(Optional.of(successfulTask0)).anyTimes();

    EasyMock.expect(taskStorage.getStatus(successfulTask1.getId()))
            .andReturn(Optional.of(TaskStatus.success(successfulTask1.getId())));
    EasyMock.expect(taskStorage.getTask(successfulTask1.getId())).andReturn(Optional.of(successfulTask1)).anyTimes();

    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();

    EasyMock.expect(taskQueue.add(EasyMock.capture(postMergeCaptured))).andReturn(true).times(1);

    replayAll();

    supervisor.runInternal();
    verifyAll();

    EasyMock.reset(taskStorage);
    EasyMock.reset(taskClient);

    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();
    TreeMap<Integer, Map<String, String>> checkpointsGroup0 = new TreeMap<>();
    checkpointsGroup0.put(0, ImmutableMap.of(
        SHARD_ID2, "0",
        SHARD_ID1, KinesisSequenceNumber.END_OF_SHARD_MARKER
    ));

    // there would be 1 tasks, 1 for each task group, but task group 1 only has closed shards, so no task is created
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpointsGroup0))
            .times(1);

    List<Task> postMergeTasks = postMergeCaptured.getValues();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(postMergeTasks).anyTimes();
    for (Task task : postMergeTasks) {
      EasyMock.expect(taskStorage.getStatus(task.getId()))
              .andReturn(Optional.of(TaskStatus.running(task.getId())))
              .anyTimes();
      EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
    }
    EasyMock.replay(taskStorage);
    EasyMock.replay(taskClient);

    supervisor.runInternal();
    verifyAll();

    // Check that shardId-000000000000 which has hit EOS is not included in the sequences sent to the task for group 0
    SeekableStreamStartSequenceNumbers<String, String> group0ExpectedStartSequenceNumbers =
        new SeekableStreamStartSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(
                SHARD_ID2, "0"
            ),
            ImmutableSet.of()
        );

    SeekableStreamEndSequenceNumbers<String, String> group0ExpectedEndSequenceNumbers =
        new SeekableStreamEndSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(
                SHARD_ID2, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        );

    Assert.assertEquals(1, postMergeTasks.size());
    KinesisIndexTaskIOConfig group0Config = ((KinesisIndexTask) postMergeTasks.get(0)).getIOConfig();
    Assert.assertEquals((Integer) 0, group0Config.getTaskGroupId());
    Assert.assertEquals(group0ExpectedStartSequenceNumbers, group0Config.getStartSequenceNumbers());
    Assert.assertEquals(group0ExpectedEndSequenceNumbers, group0Config.getEndSequenceNumbers());

    return postMergeTasks;
  }

  /**
   * Test task creation after a shard merge with two closed shards and one open shard, with the closed shards
   * expiring and no longer being returned from record supplier.
   *
   * @param phaseTwoTasks List of tasks from the second phase where closed but not expired shards were present.
   */
  private void testShardMergePhaseThree(List<Task> phaseTwoTasks) throws Exception
  {
    EasyMock.reset(indexerMetadataStorageCoordinator);
    EasyMock.reset(taskStorage);
    EasyMock.reset(taskQueue);
    EasyMock.reset(taskClient);
    EasyMock.reset(taskMaster);
    EasyMock.reset(taskRunner);
    EasyMock.reset(supervisorRecordSupplier);

    // second set of tasks ran, shard 0 has expired, but shard 1 and 2 have data
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new KinesisDataSourceMetadata(
            new SeekableStreamEndSequenceNumbers<String, String>(
                STREAM,
                ImmutableMap.of(
                    SHARD_ID0, KinesisSequenceNumber.END_OF_SHARD_MARKER,
                    SHARD_ID1, KinesisSequenceNumber.END_OF_SHARD_MARKER,
                    SHARD_ID2, "100"
                )
            )
        )
    ).anyTimes();

    EasyMock.expect(
        indexerMetadataStorageCoordinator.resetDataSourceMetadata(
            DATASOURCE,
            new KinesisDataSourceMetadata(
                new SeekableStreamEndSequenceNumbers<String, String>(
                    STREAM,
                    ImmutableMap.of(
                        SHARD_ID0, KinesisSequenceNumber.EXPIRED_MARKER,
                        SHARD_ID1, KinesisSequenceNumber.EXPIRED_MARKER,
                        SHARD_ID2, "100"
                    )
                )
            )
        )
    ).andReturn(true).anyTimes();

    EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
            .andReturn(ImmutableSet.of(SHARD_ID2))
            .anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getAssignment())
            .andReturn(ImmutableSet.of(SHARD2_PARTITION))
            .anyTimes();

    supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
    EasyMock.expectLastCall().anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
    EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID2)))
            .andReturn("200").anyTimes();

    supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
    EasyMock.expectLastCall().anyTimes();

    Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL);

    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();

    Task successfulTask0 = phaseTwoTasks.get(0);
    EasyMock.expect(taskStorage.getStatus(successfulTask0.getId()))
            .andReturn(Optional.of(TaskStatus.success(successfulTask0.getId())));
    EasyMock.expect(taskStorage.getTask(successfulTask0.getId())).andReturn(Optional.of(successfulTask0)).anyTimes();

    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();

    EasyMock.expect(taskQueue.add(EasyMock.capture(postSplitCaptured))).andReturn(true).times(1);

    replayAll();

    supervisor.runInternal();
    verifyAll();

    EasyMock.reset(taskStorage);
    EasyMock.reset(taskClient);

    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
            .anyTimes();
    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
            .anyTimes();
    TreeMap<Integer, Map<String, String>> checkpointsGroup0 = new TreeMap<>();
    checkpointsGroup0.put(0, ImmutableMap.of(
        SHARD_ID2, "100"
    ));

    // there would be 1 task, only task group 0 has a shard
    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpointsGroup0))
            .times(1);

    List<Task> postSplitTasks = postSplitCaptured.getValues();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(postSplitTasks).anyTimes();
    for (Task task : postSplitTasks) {
      EasyMock.expect(taskStorage.getStatus(task.getId()))
              .andReturn(Optional.of(TaskStatus.running(task.getId())))
              .anyTimes();
      EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
    }
    EasyMock.replay(taskStorage);
    EasyMock.replay(taskClient);

    supervisor.runInternal();
    verifyAll();


    // Check that shardId-000000000000 which has hit EOS is not included in the sequences sent to the task for group 0
    SeekableStreamStartSequenceNumbers<String, String> group0ExpectedStartSequenceNumbers =
        new SeekableStreamStartSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(
                SHARD_ID2, "100"
            ),
            ImmutableSet.of(SHARD_ID2)
        );

    SeekableStreamEndSequenceNumbers<String, String> group0ExpectedEndSequenceNumbers =
        new SeekableStreamEndSequenceNumbers<>(
            STREAM,
            ImmutableMap.of(
                SHARD_ID2, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
            )
        );

    Assert.assertEquals(1, postSplitTasks.size());
    KinesisIndexTaskIOConfig group0Config = ((KinesisIndexTask) postSplitTasks.get(0)).getIOConfig();
    Assert.assertEquals((Integer) 0, group0Config.getTaskGroupId());
    Assert.assertEquals(group0ExpectedStartSequenceNumbers, group0Config.getStartSequenceNumbers());
    Assert.assertEquals(group0ExpectedEndSequenceNumbers, group0Config.getEndSequenceNumbers());

    Map<Integer, Set<String>> expectedPartitionGroups = ImmutableMap.of(
        0, ImmutableSet.of(SHARD_ID2),
        1, ImmutableSet.of()
    );
    ConcurrentHashMap<String, String> expectedPartitionOffsets = new ConcurrentHashMap<>(
        ImmutableMap.of(
            SHARD_ID2, "-1",
            SHARD_ID1, "-1",
            SHARD_ID0, "-1"
        )
    );
    Assert.assertEquals(expectedPartitionGroups, supervisor.getPartitionGroups());
    Assert.assertEquals(expectedPartitionOffsets, supervisor.getPartitionOffsets());
  }

  private TestableKinesisSupervisor getTestableSupervisor(
      int replicas,
      int taskCount,
      boolean useEarliestOffset,
      String duration,
      Period lateMessageRejectionPeriod,
      Period earlyMessageRejectionPeriod,
      boolean suspended
  )
  {
    return getTestableSupervisor(
        replicas,
        taskCount,
        useEarliestOffset,
        false,
        duration,
        lateMessageRejectionPeriod,
        earlyMessageRejectionPeriod,
        suspended
    );
  }

  private TestableKinesisSupervisor getTestableSupervisor(
      int replicas,
      int taskCount,
      boolean useEarliestOffset,
      boolean resetOffsetAutomatically,
      String duration,
      Period lateMessageRejectionPeriod,
      Period earlyMessageRejectionPeriod,
      boolean suspended
  )
  {
    KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
        STREAM,
        INPUT_FORMAT,
        "awsEndpoint",
        null,
        replicas,
        taskCount,
        new Period(duration),
        new Period("P1D"),
        new Period("PT30S"),
        useEarliestOffset,
        new Period("PT30M"),
        lateMessageRejectionPeriod,
        earlyMessageRejectionPeriod,
        null,
        null,
        null,
        null,
        null,
        false
    );

    KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory(
        null,
        null
    )
    {
      @Override
      public KinesisIndexTaskClient build(
          TaskInfoProvider taskInfoProvider,
          String dataSource,
          int numThreads,
          Duration httpTimeout,
          long numRetries
      )
      {
        Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
        Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
        Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
        return taskClient;
      }
    };

    final KinesisSupervisorTuningConfig tuningConfig = new KinesisSupervisorTuningConfig(
        1000,
        null,
        50000,
        null,
        new Period("P1Y"),
        new File("/test"),
        null,
        null,
        null,
        true,
        false,
        null,
        resetOffsetAutomatically,
        null,
        null,
        numThreads,
        TEST_CHAT_THREADS,
        TEST_CHAT_RETRIES,
        TEST_HTTP_TIMEOUT,
        TEST_SHUTDOWN_TIMEOUT,
        null,
        null,
        null,
        5000,
        null,
        null,
        null,
        null,
        null,
        null,
        null,
        null
    );

    return new TestableKinesisSupervisor(
        taskStorage,
        taskMaster,
        indexerMetadataStorageCoordinator,
        taskClientFactory,
        OBJECT_MAPPER,
        new KinesisSupervisorSpec(
            null,
            dataSchema,
            tuningConfig,
            kinesisSupervisorIOConfig,
            null,
            suspended,
            taskStorage,
            taskMaster,
            indexerMetadataStorageCoordinator,
            taskClientFactory,
            OBJECT_MAPPER,
            new NoopServiceEmitter(),
            new DruidMonitorSchedulerConfig(),
            rowIngestionMetersFactory,
            null,
            new SupervisorStateManagerConfig()
        ),
        rowIngestionMetersFactory
    );
  }

  private TestableKinesisSupervisor getTestableSupervisor(
      int replicas,
      int taskCount,
      boolean useEarliestOffset,
      String duration,
      Period lateMessageRejectionPeriod,
      Period earlyMessageRejectionPeriod
  )
  {
    return getTestableSupervisor(
        replicas,
        taskCount,
        useEarliestOffset,
        duration,
        lateMessageRejectionPeriod,
        earlyMessageRejectionPeriod,
        false,
        null,
        null
    );
  }

  private TestableKinesisSupervisor getTestableSupervisor(
      int replicas,
      int taskCount,
      boolean useEarliestOffset,
      String duration,
      Period lateMessageRejectionPeriod,
      Period earlyMessageRejectionPeriod,
      boolean suspended,
      Integer recordsPerFetch,
      Integer fetchDelayMillis
  )
  {
    KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
        STREAM,
        INPUT_FORMAT,
        "awsEndpoint",
        null,
        replicas,
        taskCount,
        new Period(duration),
        new Period("P1D"),
        new Period("PT30S"),
        useEarliestOffset,
        new Period("PT30M"),
        lateMessageRejectionPeriod,
        earlyMessageRejectionPeriod,
        null,
        recordsPerFetch,
        fetchDelayMillis,
        null,
        null,
        false
    );

    KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory(
        null,
        null
    )
    {
      @Override
      public KinesisIndexTaskClient build(
          TaskInfoProvider taskInfoProvider,
          String dataSource,
          int numThreads,
          Duration httpTimeout,
          long numRetries
      )
      {
        Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
        Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
        Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
        return taskClient;
      }
    };

    return new TestableKinesisSupervisor(
        taskStorage,
        taskMaster,
        indexerMetadataStorageCoordinator,
        taskClientFactory,
        OBJECT_MAPPER,
        new KinesisSupervisorSpec(
            null,
            dataSchema,
            tuningConfig,
            kinesisSupervisorIOConfig,
            null,
            suspended,
            taskStorage,
            taskMaster,
            indexerMetadataStorageCoordinator,
            taskClientFactory,
            OBJECT_MAPPER,
            new NoopServiceEmitter(),
            new DruidMonitorSchedulerConfig(),
            rowIngestionMetersFactory,
            null,
            supervisorConfig
        ),
        rowIngestionMetersFactory
    );
  }

  /**
   * Use when you want to mock the return value of SeekableStreamSupervisor#isTaskCurrent()
   */
  private TestableKinesisSupervisor getTestableSupervisorCustomIsTaskCurrent(
      int replicas,
      int taskCount,
      boolean useEarliestOffset,
      String duration,
      Period lateMessageRejectionPeriod,
      Period earlyMessageRejectionPeriod,
      boolean suspended,
      Integer recordsPerFetch,
      Integer fetchDelayMillis,
      boolean isTaskCurrentReturn
  )
  {
    KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
        STREAM,
        INPUT_FORMAT,
        "awsEndpoint",
        null,
        replicas,
        taskCount,
        new Period(duration),
        new Period("P1D"),
        new Period("PT30S"),
        useEarliestOffset,
        new Period("PT30M"),
        lateMessageRejectionPeriod,
        earlyMessageRejectionPeriod,
        null,
        recordsPerFetch,
        fetchDelayMillis,
        null,
        null,
        false
    );

    KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory(
        null,
        null
    )
    {
      @Override
      public KinesisIndexTaskClient build(
          TaskInfoProvider taskInfoProvider,
          String dataSource,
          int numThreads,
          Duration httpTimeout,
          long numRetries
      )
      {
        Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
        Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
        Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
        return taskClient;
      }
    };

    return new TestableKinesisSupervisorWithCustomIsTaskCurrent(
        taskStorage,
        taskMaster,
        indexerMetadataStorageCoordinator,
        taskClientFactory,
        OBJECT_MAPPER,
        new KinesisSupervisorSpec(
            null,
            dataSchema,
            tuningConfig,
            kinesisSupervisorIOConfig,
            null,
            suspended,
            taskStorage,
            taskMaster,
            indexerMetadataStorageCoordinator,
            taskClientFactory,
            OBJECT_MAPPER,
            new NoopServiceEmitter(),
            new DruidMonitorSchedulerConfig(),
            rowIngestionMetersFactory,
            null,
            supervisorConfig
        ),
        rowIngestionMetersFactory,
        isTaskCurrentReturn
    );
  }

  /**
   * Use for tests where you don't want generateSequenceName to be overridden out
   */
  private KinesisSupervisor getSupervisor(
      int replicas,
      int taskCount,
      boolean useEarliestOffset,
      String duration,
      Period lateMessageRejectionPeriod,
      Period earlyMessageRejectionPeriod,
      boolean suspended,
      Integer recordsPerFetch,
      Integer fetchDelayMillis,
      DataSchema dataSchema,
      KinesisSupervisorTuningConfig tuningConfig
  )
  {
    KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
        STREAM,
        INPUT_FORMAT,
        "awsEndpoint",
        null,
        replicas,
        taskCount,
        new Period(duration),
        new Period("P1D"),
        new Period("PT30S"),
        useEarliestOffset,
        new Period("PT30M"),
        lateMessageRejectionPeriod,
        earlyMessageRejectionPeriod,
        null,
        recordsPerFetch,
        fetchDelayMillis,
        null,
        null,
        false
    );

    KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory(
        null,
        null
    )
    {
      @Override
      public KinesisIndexTaskClient build(
          TaskInfoProvider taskInfoProvider,
          String dataSource,
          int numThreads,
          Duration httpTimeout,
          long numRetries
      )
      {
        Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
        Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
        Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
        return taskClient;
      }
    };

    return new KinesisSupervisor(
        taskStorage,
        taskMaster,
        indexerMetadataStorageCoordinator,
        taskClientFactory,
        OBJECT_MAPPER,
        new KinesisSupervisorSpec(
            null,
            dataSchema,
            tuningConfig,
            kinesisSupervisorIOConfig,
            null,
            suspended,
            taskStorage,
            taskMaster,
            indexerMetadataStorageCoordinator,
            taskClientFactory,
            OBJECT_MAPPER,
            new NoopServiceEmitter(),
            new DruidMonitorSchedulerConfig(),
            rowIngestionMetersFactory,
            null,
            supervisorConfig
        ),
        rowIngestionMetersFactory,
        null
    );
  }

  private static DataSchema getDataSchema(String dataSource)
  {
    List<DimensionSchema> dimensions = new ArrayList<>();
    dimensions.add(StringDimensionSchema.create("dim1"));
    dimensions.add(StringDimensionSchema.create("dim2"));

    return new DataSchema(
        dataSource,
        new TimestampSpec("timestamp", "iso", null),
        new DimensionsSpec(
            dimensions,
            null,
            null
        ),
        new AggregatorFactory[]{new CountAggregatorFactory("rows")},
        new UniformGranularitySpec(
            Granularities.HOUR,
            Granularities.NONE,
            ImmutableList.of()
        ),
        null
    );
  }


  private KinesisIndexTask createKinesisIndexTask(
      String id,
      String dataSource,
      int taskGroupId,
      SeekableStreamStartSequenceNumbers<String, String> startPartitions,
      SeekableStreamEndSequenceNumbers<String, String> endPartitions,
      DateTime minimumMessageTime,
      DateTime maximumMessageTime
  )
  {
    return createKinesisIndexTask(
        id,
        taskGroupId,
        startPartitions,
        endPartitions,
        minimumMessageTime,
        maximumMessageTime,
        getDataSchema(dataSource)
    );
  }

  private KinesisIndexTask createKinesisIndexTask(
      String id,
      int taskGroupId,
      SeekableStreamStartSequenceNumbers<String, String> startPartitions,
      SeekableStreamEndSequenceNumbers<String, String> endPartitions,
      DateTime minimumMessageTime,
      DateTime maximumMessageTime,
      DataSchema dataSchema
  )
  {
    return createKinesisIndexTask(
        id,
        taskGroupId,
        startPartitions,
        endPartitions,
        minimumMessageTime,
        maximumMessageTime,
        dataSchema,
        (KinesisIndexTaskTuningConfig) tuningConfig.convertToTaskTuningConfig()
    );
  }

  private KinesisIndexTask createKinesisIndexTask(
      String id,
      int taskGroupId,
      SeekableStreamStartSequenceNumbers<String, String> startPartitions,
      SeekableStreamEndSequenceNumbers<String, String> endPartitions,
      DateTime minimumMessageTime,
      DateTime maximumMessageTime,
      DataSchema dataSchema,
      KinesisIndexTaskTuningConfig tuningConfig
  )
  {
    return new KinesisIndexTask(
        id,
        null,
        dataSchema,
        tuningConfig,
        new KinesisIndexTaskIOConfig(
            0,
            "sequenceName-" + taskGroupId,
            startPartitions,
            endPartitions,
            true,
            minimumMessageTime,
            maximumMessageTime,
            INPUT_FORMAT,
            "awsEndpoint",
            null,
            null,
            null,
            null,
            false
        ),
        Collections.emptyMap(),
        null
    );
  }

  private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
  {
    private final String taskType;
    private final TaskLocation location;
    private final String dataSource;

    public TestTaskRunnerWorkItem(Task task, ListenableFuture<TaskStatus> result, TaskLocation location)
    {
      super(task.getId(), result);
      this.taskType = task.getType();
      this.location = location;
      this.dataSource = task.getDataSource();
    }

    @Override
    public TaskLocation getLocation()
    {
      return location;
    }

    @Override
    public String getTaskType()
    {
      return taskType;
    }

    @Override
    public String getDataSource()
    {
      return dataSource;
    }

  }

  private class TestableKinesisSupervisor extends KinesisSupervisor
  {
    TestableKinesisSupervisor(
        TaskStorage taskStorage,
        TaskMaster taskMaster,
        IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
        KinesisIndexTaskClientFactory taskClientFactory,
        ObjectMapper mapper,
        KinesisSupervisorSpec spec,
        RowIngestionMetersFactory rowIngestionMetersFactory
    )
    {
      super(
          taskStorage,
          taskMaster,
          indexerMetadataStorageCoordinator,
          taskClientFactory,
          mapper,
          spec,
          rowIngestionMetersFactory,
          null
      );
    }

    @Override
    protected String generateSequenceName(
        Map<String, String> startPartitions,
        Optional<DateTime> minimumMessageTime,
        Optional<DateTime> maximumMessageTime,
        DataSchema dataSchema,
        SeekableStreamIndexTaskTuningConfig tuningConfig
    )
    {
      if (startPartitions.isEmpty()) {
        return StringUtils.format("sequenceName-NOT-USED");
      }
      final int groupId = getTaskGroupIdForPartition(startPartitions.keySet().iterator().next());
      return StringUtils.format("sequenceName-%d", groupId);
    }

    @Override
    protected RecordSupplier<String, String> setupRecordSupplier()
    {
      return supervisorRecordSupplier;
    }

    private SeekableStreamSupervisorStateManager getStateManager()
    {
      return stateManager;
    }
  }

  private class TestableKinesisSupervisorWithCustomIsTaskCurrent extends TestableKinesisSupervisor
  {
    private final boolean isTaskCurrentReturn;

    TestableKinesisSupervisorWithCustomIsTaskCurrent(
        TaskStorage taskStorage,
        TaskMaster taskMaster,
        IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
        KinesisIndexTaskClientFactory taskClientFactory,
        ObjectMapper mapper,
        KinesisSupervisorSpec spec,
        RowIngestionMetersFactory rowIngestionMetersFactory,
        boolean isTaskCurrentReturn
    )
    {
      super(
          taskStorage,
          taskMaster,
          indexerMetadataStorageCoordinator,
          taskClientFactory,
          mapper,
          spec,
          rowIngestionMetersFactory
      );
      this.isTaskCurrentReturn = isTaskCurrentReturn;
    }

    @Override
    public boolean isTaskCurrent(int taskGroupId, String taskId)
    {
      return isTaskCurrentReturn;
    }
  }
}
