/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.druid.indexing.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IndexingServiceCondition;
import org.apache.druid.indexing.common.TestRealtimeTask;
import org.apache.druid.indexing.common.TestTasks;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.testing.DeadlockDetectingTimeout;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class RemoteTaskRunnerTest
{
  private static final Joiner JOINER = RemoteTaskRunnerTestUtils.JOINER;
  private static final String WORKER_HOST = "worker";
  private static final String ANNOUCEMENTS_PATH = JOINER.join(RemoteTaskRunnerTestUtils.ANNOUNCEMENTS_PATH, WORKER_HOST);
  private static final String STATUS_PATH = JOINER.join(RemoteTaskRunnerTestUtils.STATUS_PATH, WORKER_HOST);

  private RemoteTaskRunner remoteTaskRunner;
  private RemoteTaskRunnerTestUtils rtrTestUtils = new RemoteTaskRunnerTestUtils();
  private ObjectMapper jsonMapper;
  private CuratorFramework cf;

  private Task task;
  private Worker worker;

  @Rule
  public final TestRule timeout = new DeadlockDetectingTimeout(60, TimeUnit.SECONDS);

  @Before
  public void setUp() throws Exception
  {
    rtrTestUtils.setUp();
    jsonMapper = rtrTestUtils.getObjectMapper();
    cf = rtrTestUtils.getCuratorFramework();

    task = TestTasks.unending("task id with spaces");
    EmittingLogger.registerEmitter(new NoopServiceEmitter());
  }

  @After
  public void tearDown() throws Exception
  {
    if (remoteTaskRunner != null) {
      remoteTaskRunner.stop();
    }
    rtrTestUtils.tearDown();
  }

  @Test
  public void testRun() throws Exception
  {
    doSetup();

    Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
    Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount());
    Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount());

    ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);

    Assert.assertTrue(taskAnnounced(task.getId()));
    mockWorkerRunningTask(task);
    Assert.assertTrue(workerRunningTask(task.getId()));

    mockWorkerCompleteSuccessfulTask(task);
    Assert.assertTrue(workerCompletedTask(result));
    Assert.assertEquals(task.getId(), result.get().getId());
    Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());

    cf.delete().guaranteed().forPath(JOINER.join(STATUS_PATH, task.getId()));

    Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
    Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount());
    Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount());
  }

  @Test
  public void testRunTaskThatAlreadyPending() throws Exception
  {
    doSetup();
    remoteTaskRunner.addPendingTask(task);
    remoteTaskRunner.runPendingTasks();
    Assert.assertFalse(workerRunningTask(task.getId()));

    ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);

    Assert.assertTrue(taskAnnounced(task.getId()));
    mockWorkerRunningTask(task);
    Assert.assertTrue(workerRunningTask(task.getId()));
    mockWorkerCompleteSuccessfulTask(task);
    Assert.assertTrue(workerCompletedTask(result));

    Assert.assertEquals(task.getId(), result.get().getId());
    Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
  }

  @Test
  public void testStartWithNoWorker()
  {
    makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT1S")));
  }

  @Test
  public void testRunExistingTaskThatHasntStartedRunning() throws Exception
  {
    doSetup();

    remoteTaskRunner.run(task);
    Assert.assertTrue(taskAnnounced(task.getId()));

    ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);

    Assert.assertFalse(result.isDone());
    mockWorkerRunningTask(task);
    Assert.assertTrue(workerRunningTask(task.getId()));
    mockWorkerCompleteSuccessfulTask(task);
    Assert.assertTrue(workerCompletedTask(result));

    Assert.assertEquals(task.getId(), result.get().getId());
    Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
  }

  @Test
  public void testRunExistingTaskThatHasStartedRunning() throws Exception
  {
    doSetup();

    remoteTaskRunner.run(task);
    Assert.assertTrue(taskAnnounced(task.getId()));
    mockWorkerRunningTask(task);
    Assert.assertTrue(workerRunningTask(task.getId()));

    ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);

    Assert.assertFalse(result.isDone());

    mockWorkerCompleteSuccessfulTask(task);
    Assert.assertTrue(workerCompletedTask(result));

    Assert.assertEquals(task.getId(), result.get().getId());
    Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
  }

  @Test
  public void testRunTooMuchZKData() throws Exception
  {
    ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
    EmittingLogger.registerEmitter(emitter);
    EasyMock.replay(emitter);

    doSetup();

    remoteTaskRunner.run(TestTasks.unending(new String(new char[5000])));

    EasyMock.verify(emitter);
  }

  @Test
  public void testRunSameAvailabilityGroup() throws Exception
  {
    doSetup();

    TestRealtimeTask task1 = new TestRealtimeTask(
        "rt1",
        new TaskResource("rt1", 1),
        "foo",
        TaskStatus.running("rt1"),
        jsonMapper
    );
    remoteTaskRunner.run(task1);
    Assert.assertTrue(taskAnnounced(task1.getId()));
    mockWorkerRunningTask(task1);

    TestRealtimeTask task2 = new TestRealtimeTask(
        "rt2",
        new TaskResource("rt1", 1),
        "foo",
        TaskStatus.running("rt2"),
        jsonMapper
    );
    remoteTaskRunner.run(task2);

    TestRealtimeTask task3 = new TestRealtimeTask(
        "rt3",
        new TaskResource("rt2", 1),
        "foo",
        TaskStatus.running("rt3"),
        jsonMapper
    );
    remoteTaskRunner.run(task3);

    Assert.assertTrue(
        TestUtils.conditionValid(
            new IndexingServiceCondition()
            {
              @Override
              public boolean isValid()
              {
                return remoteTaskRunner.getRunningTasks().size() == 2;
              }
            }
        )
    );

    Assert.assertTrue(
        TestUtils.conditionValid(
            new IndexingServiceCondition()
            {
              @Override
              public boolean isValid()
              {
                return remoteTaskRunner.getPendingTasks().size() == 1;
              }
            }
        )
    );

    Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rt2"));
  }

  @Test
  public void testRunWithCapacity() throws Exception
  {
    doSetup();

    TestRealtimeTask task1 = new TestRealtimeTask(
        "rt1",
        new TaskResource("rt1", 1),
        "foo",
        TaskStatus.running("rt1"),
        jsonMapper
    );
    remoteTaskRunner.run(task1);
    Assert.assertTrue(taskAnnounced(task1.getId()));
    mockWorkerRunningTask(task1);

    TestRealtimeTask task2 = new TestRealtimeTask(
        "rt2",
        new TaskResource("rt2", 3),
        "foo",
        TaskStatus.running("rt2"),
        jsonMapper
    );
    remoteTaskRunner.run(task2);

    TestRealtimeTask task3 = new TestRealtimeTask(
        "rt3",
        new TaskResource("rt3", 2),
        "foo",
        TaskStatus.running("rt3"),
        jsonMapper
    );
    remoteTaskRunner.run(task3);
    Assert.assertTrue(taskAnnounced(task3.getId()));
    mockWorkerRunningTask(task3);

    Assert.assertTrue(
        TestUtils.conditionValid(
            new IndexingServiceCondition()
            {
              @Override
              public boolean isValid()
              {
                return remoteTaskRunner.getRunningTasks().size() == 2;
              }
            }
        )
    );

    Assert.assertTrue(
        TestUtils.conditionValid(
            new IndexingServiceCondition()
            {
              @Override
              public boolean isValid()
              {
                return remoteTaskRunner.getPendingTasks().size() == 1;
              }
            }
        )
    );

    Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rt2"));
  }

  @Test
  public void testStatusRemoved() throws Exception
  {
    doSetup();

    ListenableFuture<TaskStatus> future = remoteTaskRunner.run(task);
    Assert.assertTrue(taskAnnounced(task.getId()));
    mockWorkerRunningTask(task);

    Assert.assertTrue(workerRunningTask(task.getId()));

    Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTaskId().equals(task.getId()));

    cf.delete().forPath(JOINER.join(STATUS_PATH, task.getId()));

    TaskStatus status = future.get();

    Assert.assertEquals(status.getStatusCode(), TaskState.FAILED);
    Assert.assertNotNull(status.getErrorMsg());
    Assert.assertTrue(status.getErrorMsg().contains("The worker that this task was assigned disappeared"));
  }

  @Test
  public void testBootstrap() throws Exception
  {
    Period timeoutPeriod = Period.millis(1000);
    makeWorker();

    RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod);
    rtrConfig.setMaxPercentageBlacklistWorkers(100);

    makeRemoteTaskRunner(rtrConfig);

    TestRealtimeTask task1 = new TestRealtimeTask(
        "first",
        new TaskResource("first", 1),
        "foo",
        TaskStatus.running("first"),
        jsonMapper);
    remoteTaskRunner.run(task1);
    Assert.assertTrue(taskAnnounced(task1.getId()));
    mockWorkerRunningTask(task1);

    TestRealtimeTask task = new TestRealtimeTask(
        "second",
        new TaskResource("task", 2),
        "foo",
        TaskStatus.running("task"),
        jsonMapper);
    remoteTaskRunner.run(task);
    
    TestRealtimeTask task2 = new TestRealtimeTask(
        "second",
        new TaskResource("second", 2),
        "foo",
        TaskStatus.running("second"),
        jsonMapper);
    remoteTaskRunner.run(task2);
    Assert.assertTrue(taskAnnounced(task2.getId()));
    mockWorkerRunningTask(task2);

    final Set<String> runningTasks = Sets.newHashSet(
        Iterables.transform(
            remoteTaskRunner.getRunningTasks(),
            new Function<RemoteTaskRunnerWorkItem, String>()
            {
              @Override
              public String apply(RemoteTaskRunnerWorkItem input)
              {
                return input.getTaskId();
              }
            }
        )
    );
    Assert.assertEquals("runningTasks", ImmutableSet.of("first", "second"), runningTasks);
  }

  @Test
  public void testRunWithTaskComplete() throws Exception
  {
    doSetup();
    TestRealtimeTask task1 = new TestRealtimeTask(
        "testTask",
        new TaskResource("testTask", 2),
        "foo",
        TaskStatus.success("testTask"),
        jsonMapper);
    remoteTaskRunner.run(task1);
    Assert.assertTrue(taskAnnounced(task1.getId()));
    mockWorkerRunningTask(task1);
    mockWorkerCompleteSuccessfulTask(task1);

    Assert.assertEquals(TaskState.SUCCESS, remoteTaskRunner.run(task1).get().getStatusCode());
  }

  @Test
  public void testWorkerRemoved() throws Exception
  {
    doSetup();
    Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
    Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount());

    Future<TaskStatus> future = remoteTaskRunner.run(task);

    Assert.assertTrue(taskAnnounced(task.getId()));
    mockWorkerRunningTask(task);

    Assert.assertTrue(workerRunningTask(task.getId()));

    cf.delete().forPath(ANNOUCEMENTS_PATH);

    TaskStatus status = future.get();

    Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
    Assert.assertNotNull(status.getErrorMsg());
    Assert.assertTrue(status.getErrorMsg().contains("Canceled for worker cleanup"));
    RemoteTaskRunnerConfig config = remoteTaskRunner.getRemoteTaskRunnerConfig();
    Assert.assertTrue(
        TestUtils.conditionValid(
            new IndexingServiceCondition()
            {
              @Override
              public boolean isValid()
              {
                return remoteTaskRunner.getRemovedWorkerCleanups().isEmpty();
              }
            },
            // cleanup task is independently scheduled by event listener. we need to wait some more time.
            config.getTaskCleanupTimeout().toStandardDuration().getMillis() * 2
        )
    );
    Assert.assertNull(cf.checkExists().forPath(STATUS_PATH));

    Assert.assertEquals(0, remoteTaskRunner.getTotalTaskSlotCount());
    Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount());
  }

  @Test
  public void testWorkerDisabled() throws Exception
  {
    doSetup();
    final ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);

    Assert.assertTrue(taskAnnounced(task.getId()));
    mockWorkerRunningTask(task);
    Assert.assertTrue(workerRunningTask(task.getId()));

    // Disable while task running
    disableWorker();

    // Continue test
    mockWorkerCompleteSuccessfulTask(task);
    Assert.assertTrue(workerCompletedTask(result));
    Assert.assertEquals(task.getId(), result.get().getId());
    Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());

    // Confirm RTR thinks the worker is disabled.
    Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().getVersion());
  }

  @Test
  public void testRestartRemoteTaskRunner() throws Exception
  {
    doSetup();
    remoteTaskRunner.run(task);

    Assert.assertTrue(taskAnnounced(task.getId()));
    mockWorkerRunningTask(task);
    Assert.assertTrue(workerRunningTask(task.getId()));

    remoteTaskRunner.stop();
    makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT5S")));
    final RemoteTaskRunnerWorkItem newWorkItem = remoteTaskRunner
        .getKnownTasks()
        .stream()
        .filter(workItem -> workItem.getTaskId().equals(task.getId()))
        .findFirst()
        .orElse(null);
    final ListenableFuture<TaskStatus> result = newWorkItem.getResult();

    mockWorkerCompleteSuccessfulTask(task);
    Assert.assertTrue(workerCompletedTask(result));

    Assert.assertEquals(task.getId(), result.get().getId());
    Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
  }

  @Test
  public void testRunPendingTaskFailToAssignTask() throws Exception
  {
    doSetup();
    RemoteTaskRunnerWorkItem originalItem = remoteTaskRunner.addPendingTask(task);
    // modify taskId to make task assignment failed
    RemoteTaskRunnerWorkItem wankyItem = Mockito.mock(RemoteTaskRunnerWorkItem.class);
    Mockito.when(wankyItem.getTaskId()).thenReturn(originalItem.getTaskId()).thenReturn("wrongId");
    remoteTaskRunner.runPendingTask(wankyItem);
    TaskStatus taskStatus = originalItem.getResult().get(0, TimeUnit.MILLISECONDS);
    Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
    Assert.assertEquals(
        "Failed to assign this task. See overlord logs for more details.",
        taskStatus.getErrorMsg()
    );
  }

  @Test
  public void testRunPendingTaskTimeoutToAssign() throws Exception
  {
    makeWorker();
    makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT1S")));
    RemoteTaskRunnerWorkItem workItem = remoteTaskRunner.addPendingTask(task);
    remoteTaskRunner.runPendingTask(workItem);
    TaskStatus taskStatus = workItem.getResult().get(0, TimeUnit.MILLISECONDS);
    Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
    Assert.assertNotNull(taskStatus.getErrorMsg());
    Assert.assertTrue(
        taskStatus.getErrorMsg().startsWith("The worker that this task is assigned did not start it in timeout")
    );
  }

  private void doSetup() throws Exception
  {
    makeWorker();
    makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT5S")));
  }

  private void makeRemoteTaskRunner(RemoteTaskRunnerConfig config)
  {
    remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner(config);
  }

  private void makeWorker() throws Exception
  {
    worker = rtrTestUtils.makeWorker(WORKER_HOST, 3);
  }

  private void disableWorker() throws Exception
  {
    rtrTestUtils.disableWorker(worker);
  }

  private boolean taskAnnounced(final String taskId)
  {
    return rtrTestUtils.taskAnnounced(WORKER_HOST, taskId);
  }

  private boolean workerRunningTask(final String taskId)
  {
    return rtrTestUtils.workerRunningTask(WORKER_HOST, taskId);
  }

  private boolean workerCompletedTask(final ListenableFuture<TaskStatus> result)
  {
    return TestUtils.conditionValid(
        new IndexingServiceCondition()
        {
          @Override
          public boolean isValid()
          {
            return result.isDone();
          }
        }
    );
  }

  private void mockWorkerRunningTask(final Task task) throws Exception
  {
    rtrTestUtils.mockWorkerRunningTask("worker", task);
  }

  private void mockWorkerCompleteSuccessfulTask(final Task task) throws Exception
  {
    rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker", task);
  }

  private void mockWorkerCompleteFailedTask(final Task task) throws Exception
  {
    rtrTestUtils.mockWorkerCompleteFailedTask("worker", task);
  }

  @Test
  public void testFindLazyWorkerTaskRunning() throws Exception
  {
    doSetup();
    remoteTaskRunner.start();
    remoteTaskRunner.run(task);
    Assert.assertTrue(taskAnnounced(task.getId()));
    mockWorkerRunningTask(task);
    Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
        new Predicate<ImmutableWorkerInfo>()
        {
          @Override
          public boolean apply(ImmutableWorkerInfo input)
          {
            return true;
          }
        }, 1
    );
    Assert.assertTrue(lazyworkers.isEmpty());
    Assert.assertTrue(remoteTaskRunner.getLazyWorkers().isEmpty());
    Assert.assertEquals(1, remoteTaskRunner.getWorkers().size());
  }

  @Test
  public void testFindLazyWorkerForWorkerJustAssignedTask() throws Exception
  {
    doSetup();
    remoteTaskRunner.run(task);
    Assert.assertTrue(taskAnnounced(task.getId()));
    Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
        new Predicate<ImmutableWorkerInfo>()
        {
          @Override
          public boolean apply(ImmutableWorkerInfo input)
          {
            return true;
          }
        }, 1
    );
    Assert.assertTrue(lazyworkers.isEmpty());
    Assert.assertTrue(remoteTaskRunner.getLazyWorkers().isEmpty());
    Assert.assertEquals(1, remoteTaskRunner.getWorkers().size());
  }

  @Test
  public void testFindLazyWorkerNotRunningAnyTask() throws Exception
  {
    doSetup();
    Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
        new Predicate<ImmutableWorkerInfo>()
        {
          @Override
          public boolean apply(ImmutableWorkerInfo input)
          {
            return true;
          }
        }, 1
    );
    Assert.assertEquals(1, lazyworkers.size());
    Assert.assertEquals(1, remoteTaskRunner.getLazyWorkers().size());
    Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
    Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount());
    Assert.assertEquals(3, remoteTaskRunner.getLazyTaskSlotCount());
  }

  @Test
  public void testFindLazyWorkerNotRunningAnyTaskButWithZeroMaxWorkers() throws Exception
  {
    doSetup();
    Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
        new Predicate<ImmutableWorkerInfo>()
        {
          @Override
          public boolean apply(ImmutableWorkerInfo input)
          {
            return true;
          }
        }, 0
    );
    Assert.assertEquals(0, lazyworkers.size());
    Assert.assertEquals(0, remoteTaskRunner.getLazyWorkers().size());
  }

  @Test
  public void testWorkerZKReconnect() throws Exception
  {
    makeWorker();
    makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT5M")));
    Future<TaskStatus> future = remoteTaskRunner.run(task);

    Assert.assertTrue(taskAnnounced(task.getId()));
    mockWorkerRunningTask(task);

    Assert.assertTrue(workerRunningTask(task.getId()));
    byte[] bytes = cf.getData().forPath(ANNOUCEMENTS_PATH);
    cf.delete().forPath(ANNOUCEMENTS_PATH);
    // worker task cleanup scheduled
    Assert.assertTrue(
        TestUtils.conditionValid(
            new IndexingServiceCondition()
            {
              @Override
              public boolean isValid()
              {
                return remoteTaskRunner.getRemovedWorkerCleanups().containsKey(worker.getHost());
              }
            }
        )
    );

    // Worker got reconnected
    cf.create().forPath(ANNOUCEMENTS_PATH, bytes);

    // worker task cleanup should get cancelled and removed
    Assert.assertTrue(
        TestUtils.conditionValid(
            new IndexingServiceCondition()
            {
              @Override
              public boolean isValid()
              {
                return !remoteTaskRunner.getRemovedWorkerCleanups().containsKey(worker.getHost());
              }
            }
        )
    );

    mockWorkerCompleteSuccessfulTask(task);
    TaskStatus status = future.get();
    Assert.assertEquals(status.getStatusCode(), TaskState.SUCCESS);
    Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
  }

  @Test
  public void testSortByInsertionTime()
  {
    RemoteTaskRunnerWorkItem item1 = new RemoteTaskRunnerWorkItem("b", "t", null, null, "ds_test")
        .withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:03Z"));
    RemoteTaskRunnerWorkItem item2 = new RemoteTaskRunnerWorkItem("a", "t", null, null, "ds_test")
        .withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:02Z"));
    RemoteTaskRunnerWorkItem item3 = new RemoteTaskRunnerWorkItem("c", "t", null, null, "ds_test")
        .withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:01Z"));
    ArrayList<RemoteTaskRunnerWorkItem> workItems = Lists.newArrayList(item1, item2, item3);
    RemoteTaskRunner.sortByInsertionTime(workItems);
    Assert.assertEquals(item3, workItems.get(0));
    Assert.assertEquals(item2, workItems.get(1));
    Assert.assertEquals(item1, workItems.get(2));
  }

  @Test
  public void testBlacklistZKWorkers() throws Exception
  {
    Period timeoutPeriod = Period.millis(1000);
    makeWorker();

    RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod);
    rtrConfig.setMaxPercentageBlacklistWorkers(100);

    makeRemoteTaskRunner(rtrConfig);

    TestRealtimeTask task1 = new TestRealtimeTask(
        "realtime1",
        new TaskResource("realtime1", 1),
        "foo",
        TaskStatus.success("realtime1"),
        jsonMapper
    );
    Future<TaskStatus> taskFuture1 = remoteTaskRunner.run(task1);
    Assert.assertTrue(taskAnnounced(task1.getId()));
    mockWorkerRunningTask(task1);
    mockWorkerCompleteFailedTask(task1);
    Assert.assertTrue(taskFuture1.get().isFailure());
    Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
    Assert.assertEquals(
        1,
        remoteTaskRunner.findWorkerRunningTask(task1.getId()).getContinuouslyFailedTasksCount()
    );

    TestRealtimeTask task2 = new TestRealtimeTask(
        "realtime2",
        new TaskResource("realtime2", 1),
        "foo",
        TaskStatus.running("realtime2"),
        jsonMapper
    );
    Future<TaskStatus> taskFuture2 = remoteTaskRunner.run(task2);
    Assert.assertTrue(taskAnnounced(task2.getId()));
    mockWorkerRunningTask(task2);
    mockWorkerCompleteFailedTask(task2);
    Assert.assertTrue(taskFuture2.get().isFailure());
    Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size());
    Assert.assertEquals(
        2,
        remoteTaskRunner.findWorkerRunningTask(task2.getId()).getContinuouslyFailedTasksCount()
    );

    ((RemoteTaskRunnerTestUtils.TestableRemoteTaskRunner) remoteTaskRunner)
        .setCurrentTimeMillis(System.currentTimeMillis());
    remoteTaskRunner.checkBlackListedNodes();

    Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size());

    ((RemoteTaskRunnerTestUtils.TestableRemoteTaskRunner) remoteTaskRunner)
        .setCurrentTimeMillis(System.currentTimeMillis() + 2 * timeoutPeriod.toStandardDuration().getMillis());
    remoteTaskRunner.checkBlackListedNodes();

    // After backOffTime the nodes are removed from blacklist
    Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
    Assert.assertEquals(
        0,
        remoteTaskRunner.findWorkerRunningTask(task2.getId()).getContinuouslyFailedTasksCount()
    );

    TestRealtimeTask task3 = new TestRealtimeTask(
        "realtime3",
        new TaskResource("realtime3", 1),
        "foo",
        TaskStatus.running("realtime3"),
        jsonMapper
    );
    Future<TaskStatus> taskFuture3 = remoteTaskRunner.run(task3);
    Assert.assertTrue(taskAnnounced(task3.getId()));
    mockWorkerRunningTask(task3);
    mockWorkerCompleteSuccessfulTask(task3);
    Assert.assertTrue(taskFuture3.get().isSuccess());
    Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
    Assert.assertEquals(
        0,
        remoteTaskRunner.findWorkerRunningTask(task3.getId()).getContinuouslyFailedTasksCount()
    );
  }

  /**
   * With 2 workers and maxPercentageBlacklistWorkers(25), neither worker should ever be blacklisted even after
   * exceeding maxRetriesBeforeBlacklist.
   */
  @Test
  public void testBlacklistZKWorkers25Percent() throws Exception
  {
    Period timeoutPeriod = Period.millis(1000);

    rtrTestUtils.makeWorker("worker", 10);
    rtrTestUtils.makeWorker("worker2", 10);

    RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod);
    rtrConfig.setMaxPercentageBlacklistWorkers(25);

    makeRemoteTaskRunner(rtrConfig);

    String firstWorker = null;
    String secondWorker = null;

    for (int i = 1; i < 13; i++) {
      String taskId = StringUtils.format("rt-%d", i);
      TestRealtimeTask task = new TestRealtimeTask(
          taskId,
          new TaskResource(taskId, 1),
          "foo",
          TaskStatus.success(taskId),
          jsonMapper
      );

      Future<TaskStatus> taskFuture = remoteTaskRunner.run(task);

      if (i == 1) {
        if (rtrTestUtils.taskAnnounced("worker2", task.getId())) {
          firstWorker = "worker2";
          secondWorker = "worker";
        } else {
          firstWorker = "worker";
          secondWorker = "worker2";
        }
      }

      final String expectedWorker = i % 2 == 0 ? secondWorker : firstWorker;

      Assert.assertTrue(rtrTestUtils.taskAnnounced(expectedWorker, task.getId()));
      rtrTestUtils.mockWorkerRunningTask(expectedWorker, task);
      rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, task);

      Assert.assertTrue(taskFuture.get().isFailure());
      Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
      Assert.assertEquals(
          ((i + 1) / 2),
          remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount()
      );
    }
  }

  /**
   * With 2 workers and maxPercentageBlacklistWorkers(50), one worker should get blacklisted after the second failure
   * and the second worker should never be blacklisted even after exceeding maxRetriesBeforeBlacklist.
   */
  @Test
  public void testBlacklistZKWorkers50Percent() throws Exception
  {
    Period timeoutPeriod = Period.millis(1000);

    rtrTestUtils.makeWorker("worker", 10);
    rtrTestUtils.makeWorker("worker2", 10);

    RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod);
    rtrConfig.setMaxPercentageBlacklistWorkers(50);

    makeRemoteTaskRunner(rtrConfig);

    String firstWorker = null;
    String secondWorker = null;

    for (int i = 1; i < 13; i++) {
      String taskId = StringUtils.format("rt-%d", i);
      TestRealtimeTask task = new TestRealtimeTask(
          taskId,
          new TaskResource(taskId, 1),
          "foo",
          TaskStatus.success(taskId),
          jsonMapper
      );

      Future<TaskStatus> taskFuture = remoteTaskRunner.run(task);

      if (i == 1) {
        if (rtrTestUtils.taskAnnounced("worker2", task.getId())) {
          firstWorker = "worker2";
          secondWorker = "worker";
        } else {
          firstWorker = "worker";
          secondWorker = "worker2";
        }
      }

      final String expectedWorker = i % 2 == 0 || i > 4 ? secondWorker : firstWorker;

      Assert.assertTrue(rtrTestUtils.taskAnnounced(expectedWorker, task.getId()));
      rtrTestUtils.mockWorkerRunningTask(expectedWorker, task);
      rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, task);

      Assert.assertTrue(taskFuture.get().isFailure());
      Assert.assertEquals(i > 2 ? 1 : 0, remoteTaskRunner.getBlackListedWorkers().size());
      Assert.assertEquals(
          i > 4 ? i - 2 : ((i + 1) / 2),
          remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount()
      );
    }
  }

  @Test
  public void testSuccessfulTaskOnBlacklistedWorker() throws Exception
  {
    Period timeoutPeriod = Period.millis(1000);
    makeWorker();

    RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod);
    rtrConfig.setMaxPercentageBlacklistWorkers(100);

    makeRemoteTaskRunner(rtrConfig);

    TestRealtimeTask task1 = new TestRealtimeTask(
        "realtime1", new TaskResource("realtime1", 1), "foo", TaskStatus.success("realtime1"), jsonMapper
    );
    TestRealtimeTask task2 = new TestRealtimeTask(
        "realtime2", new TaskResource("realtime2", 1), "foo", TaskStatus.success("realtime2"), jsonMapper
    );
    TestRealtimeTask task3 = new TestRealtimeTask(
        "realtime3", new TaskResource("realtime3", 1), "foo", TaskStatus.success("realtime3"), jsonMapper
    );

    Future<TaskStatus> taskFuture1 = remoteTaskRunner.run(task1);
    Assert.assertTrue(taskAnnounced(task1.getId()));
    mockWorkerRunningTask(task1);
    mockWorkerCompleteFailedTask(task1);
    Assert.assertTrue(taskFuture1.get().isFailure());
    Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
    Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount());

    Future<TaskStatus> taskFuture2 = remoteTaskRunner.run(task2);
    Assert.assertTrue(taskAnnounced(task2.getId()));
    mockWorkerRunningTask(task2);
    Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount());

    Future<TaskStatus> taskFuture3 = remoteTaskRunner.run(task3);
    Assert.assertTrue(taskAnnounced(task3.getId()));
    mockWorkerRunningTask(task3);
    mockWorkerCompleteFailedTask(task3);
    Assert.assertTrue(taskFuture3.get().isFailure());
    Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size());
    Assert.assertEquals(3, remoteTaskRunner.getBlacklistedTaskSlotCount());

    mockWorkerCompleteSuccessfulTask(task2);
    Assert.assertTrue(taskFuture2.get().isSuccess());
    Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
    Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount());
  }

  @Test
  public void testStatusListenerEventDataNullShouldNotThrowException() throws Exception
  {
    // Set up mock emitter to verify log alert when exception is thrown inside the status listener
    Worker worker = EasyMock.createMock(Worker.class);
    EasyMock.expect(worker.getHost()).andReturn("host").atLeastOnce();
    EasyMock.replay(worker);
    ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
    Capture<EmittingLogger.EmittingAlertBuilder> capturedArgument = Capture.newInstance();
    emitter.emit(EasyMock.capture(capturedArgument));
    EasyMock.expectLastCall().atLeastOnce();
    EmittingLogger.registerEmitter(emitter);
    EasyMock.replay(emitter);

    PathChildrenCache cache = new PathChildrenCache(cf, "/test", true);
    testStartWithNoWorker();
    cache.getListenable().addListener(remoteTaskRunner.getStatusListener(worker, new ZkWorker(worker, cache, jsonMapper), null));
    cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

    // Status listener will recieve event with null data
    Assert.assertTrue(
        TestUtils.conditionValid(() -> cache.getCurrentData().size() == 1)
    );

    // Verify that the log emitter was called
    EasyMock.verify(worker);
    EasyMock.verify(emitter);
    Map<String, Object> alertDataMap = capturedArgument.getValue().build(null).getDataMap();
    Assert.assertTrue(alertDataMap.containsKey("znode"));
    Assert.assertNull(alertDataMap.get("znode"));
    // Status listener should successfully completes without throwing exception
  }
}
