blob: 589c6e1f62f39615e23556f31760b2489d162583 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IndexingServiceCondition;
import org.apache.druid.indexing.common.TestRealtimeTask;
import org.apache.druid.indexing.common.TestTasks;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.testing.DeadlockDetectingTimeout;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class RemoteTaskRunnerTest
{
private static final Joiner JOINER = RemoteTaskRunnerTestUtils.JOINER;
private static final String WORKER_HOST = "worker";
private static final String ANNOUCEMENTS_PATH = JOINER.join(RemoteTaskRunnerTestUtils.ANNOUNCEMENTS_PATH, WORKER_HOST);
private static final String STATUS_PATH = JOINER.join(RemoteTaskRunnerTestUtils.STATUS_PATH, WORKER_HOST);
private RemoteTaskRunner remoteTaskRunner;
private RemoteTaskRunnerTestUtils rtrTestUtils = new RemoteTaskRunnerTestUtils();
private ObjectMapper jsonMapper;
private CuratorFramework cf;
private Task task;
private Worker worker;
@Rule
public final TestRule timeout = new DeadlockDetectingTimeout(60, TimeUnit.SECONDS);
@Before
public void setUp() throws Exception
{
rtrTestUtils.setUp();
jsonMapper = rtrTestUtils.getObjectMapper();
cf = rtrTestUtils.getCuratorFramework();
task = TestTasks.unending("task id with spaces");
EmittingLogger.registerEmitter(new NoopServiceEmitter());
}
@After
public void tearDown() throws Exception
{
if (remoteTaskRunner != null) {
remoteTaskRunner.stop();
}
rtrTestUtils.tearDown();
}
@Test
public void testRun() throws Exception
{
doSetup();
Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount());
Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount());
ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
mockWorkerCompleteSuccessfulTask(task);
Assert.assertTrue(workerCompletedTask(result));
Assert.assertEquals(task.getId(), result.get().getId());
Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
cf.delete().guaranteed().forPath(JOINER.join(STATUS_PATH, task.getId()));
Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount());
Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount());
}
@Test
public void testRunTaskThatAlreadyPending() throws Exception
{
doSetup();
remoteTaskRunner.addPendingTask(task);
remoteTaskRunner.runPendingTasks();
Assert.assertFalse(workerRunningTask(task.getId()));
ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
mockWorkerCompleteSuccessfulTask(task);
Assert.assertTrue(workerCompletedTask(result));
Assert.assertEquals(task.getId(), result.get().getId());
Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
}
@Test
public void testStartWithNoWorker()
{
makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT1S")));
}
@Test
public void testRunExistingTaskThatHasntStartedRunning() throws Exception
{
doSetup();
remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
Assert.assertFalse(result.isDone());
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
mockWorkerCompleteSuccessfulTask(task);
Assert.assertTrue(workerCompletedTask(result));
Assert.assertEquals(task.getId(), result.get().getId());
Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
}
@Test
public void testRunExistingTaskThatHasStartedRunning() throws Exception
{
doSetup();
remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
Assert.assertFalse(result.isDone());
mockWorkerCompleteSuccessfulTask(task);
Assert.assertTrue(workerCompletedTask(result));
Assert.assertEquals(task.getId(), result.get().getId());
Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
}
@Test
public void testRunTooMuchZKData() throws Exception
{
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
EasyMock.replay(emitter);
doSetup();
remoteTaskRunner.run(TestTasks.unending(new String(new char[5000])));
EasyMock.verify(emitter);
}
@Test
public void testRunSameAvailabilityGroup() throws Exception
{
doSetup();
TestRealtimeTask task1 = new TestRealtimeTask(
"rt1",
new TaskResource("rt1", 1),
"foo",
TaskStatus.running("rt1"),
jsonMapper
);
remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);
TestRealtimeTask task2 = new TestRealtimeTask(
"rt2",
new TaskResource("rt1", 1),
"foo",
TaskStatus.running("rt2"),
jsonMapper
);
remoteTaskRunner.run(task2);
TestRealtimeTask task3 = new TestRealtimeTask(
"rt3",
new TaskResource("rt2", 1),
"foo",
TaskStatus.running("rt3"),
jsonMapper
);
remoteTaskRunner.run(task3);
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getRunningTasks().size() == 2;
}
}
)
);
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getPendingTasks().size() == 1;
}
}
)
);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rt2"));
}
@Test
public void testRunWithCapacity() throws Exception
{
doSetup();
TestRealtimeTask task1 = new TestRealtimeTask(
"rt1",
new TaskResource("rt1", 1),
"foo",
TaskStatus.running("rt1"),
jsonMapper
);
remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);
TestRealtimeTask task2 = new TestRealtimeTask(
"rt2",
new TaskResource("rt2", 3),
"foo",
TaskStatus.running("rt2"),
jsonMapper
);
remoteTaskRunner.run(task2);
TestRealtimeTask task3 = new TestRealtimeTask(
"rt3",
new TaskResource("rt3", 2),
"foo",
TaskStatus.running("rt3"),
jsonMapper
);
remoteTaskRunner.run(task3);
Assert.assertTrue(taskAnnounced(task3.getId()));
mockWorkerRunningTask(task3);
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getRunningTasks().size() == 2;
}
}
)
);
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getPendingTasks().size() == 1;
}
}
)
);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rt2"));
}
@Test
public void testStatusRemoved() throws Exception
{
doSetup();
ListenableFuture<TaskStatus> future = remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTaskId().equals(task.getId()));
cf.delete().forPath(JOINER.join(STATUS_PATH, task.getId()));
TaskStatus status = future.get();
Assert.assertEquals(status.getStatusCode(), TaskState.FAILED);
Assert.assertNotNull(status.getErrorMsg());
Assert.assertTrue(status.getErrorMsg().contains("The worker that this task was assigned disappeared"));
}
@Test
public void testBootstrap() throws Exception
{
Period timeoutPeriod = Period.millis(1000);
makeWorker();
RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod);
rtrConfig.setMaxPercentageBlacklistWorkers(100);
makeRemoteTaskRunner(rtrConfig);
TestRealtimeTask task1 = new TestRealtimeTask(
"first",
new TaskResource("first", 1),
"foo",
TaskStatus.running("first"),
jsonMapper);
remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);
TestRealtimeTask task = new TestRealtimeTask(
"second",
new TaskResource("task", 2),
"foo",
TaskStatus.running("task"),
jsonMapper);
remoteTaskRunner.run(task);
TestRealtimeTask task2 = new TestRealtimeTask(
"second",
new TaskResource("second", 2),
"foo",
TaskStatus.running("second"),
jsonMapper);
remoteTaskRunner.run(task2);
Assert.assertTrue(taskAnnounced(task2.getId()));
mockWorkerRunningTask(task2);
final Set<String> runningTasks = Sets.newHashSet(
Iterables.transform(
remoteTaskRunner.getRunningTasks(),
new Function<RemoteTaskRunnerWorkItem, String>()
{
@Override
public String apply(RemoteTaskRunnerWorkItem input)
{
return input.getTaskId();
}
}
)
);
Assert.assertEquals("runningTasks", ImmutableSet.of("first", "second"), runningTasks);
}
@Test
public void testRunWithTaskComplete() throws Exception
{
doSetup();
TestRealtimeTask task1 = new TestRealtimeTask(
"testTask",
new TaskResource("testTask", 2),
"foo",
TaskStatus.success("testTask"),
jsonMapper);
remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);
mockWorkerCompleteSuccessfulTask(task1);
Assert.assertEquals(TaskState.SUCCESS, remoteTaskRunner.run(task1).get().getStatusCode());
}
@Test
public void testWorkerRemoved() throws Exception
{
doSetup();
Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount());
Future<TaskStatus> future = remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
cf.delete().forPath(ANNOUCEMENTS_PATH);
TaskStatus status = future.get();
Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
Assert.assertNotNull(status.getErrorMsg());
Assert.assertTrue(status.getErrorMsg().contains("Canceled for worker cleanup"));
RemoteTaskRunnerConfig config = remoteTaskRunner.getRemoteTaskRunnerConfig();
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getRemovedWorkerCleanups().isEmpty();
}
},
// cleanup task is independently scheduled by event listener. we need to wait some more time.
config.getTaskCleanupTimeout().toStandardDuration().getMillis() * 2
)
);
Assert.assertNull(cf.checkExists().forPath(STATUS_PATH));
Assert.assertEquals(0, remoteTaskRunner.getTotalTaskSlotCount());
Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount());
}
@Test
public void testWorkerDisabled() throws Exception
{
doSetup();
final ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
// Disable while task running
disableWorker();
// Continue test
mockWorkerCompleteSuccessfulTask(task);
Assert.assertTrue(workerCompletedTask(result));
Assert.assertEquals(task.getId(), result.get().getId());
Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
// Confirm RTR thinks the worker is disabled.
Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().getVersion());
}
@Test
public void testRestartRemoteTaskRunner() throws Exception
{
doSetup();
remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
remoteTaskRunner.stop();
makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT5S")));
final RemoteTaskRunnerWorkItem newWorkItem = remoteTaskRunner
.getKnownTasks()
.stream()
.filter(workItem -> workItem.getTaskId().equals(task.getId()))
.findFirst()
.orElse(null);
final ListenableFuture<TaskStatus> result = newWorkItem.getResult();
mockWorkerCompleteSuccessfulTask(task);
Assert.assertTrue(workerCompletedTask(result));
Assert.assertEquals(task.getId(), result.get().getId());
Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
}
@Test
public void testRunPendingTaskFailToAssignTask() throws Exception
{
doSetup();
RemoteTaskRunnerWorkItem originalItem = remoteTaskRunner.addPendingTask(task);
// modify taskId to make task assignment failed
RemoteTaskRunnerWorkItem wankyItem = Mockito.mock(RemoteTaskRunnerWorkItem.class);
Mockito.when(wankyItem.getTaskId()).thenReturn(originalItem.getTaskId()).thenReturn("wrongId");
remoteTaskRunner.runPendingTask(wankyItem);
TaskStatus taskStatus = originalItem.getResult().get(0, TimeUnit.MILLISECONDS);
Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
Assert.assertEquals(
"Failed to assign this task. See overlord logs for more details.",
taskStatus.getErrorMsg()
);
}
@Test
public void testRunPendingTaskTimeoutToAssign() throws Exception
{
makeWorker();
makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT1S")));
RemoteTaskRunnerWorkItem workItem = remoteTaskRunner.addPendingTask(task);
remoteTaskRunner.runPendingTask(workItem);
TaskStatus taskStatus = workItem.getResult().get(0, TimeUnit.MILLISECONDS);
Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
Assert.assertNotNull(taskStatus.getErrorMsg());
Assert.assertTrue(
taskStatus.getErrorMsg().startsWith("The worker that this task is assigned did not start it in timeout")
);
}
private void doSetup() throws Exception
{
makeWorker();
makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT5S")));
}
private void makeRemoteTaskRunner(RemoteTaskRunnerConfig config)
{
remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner(config);
}
private void makeWorker() throws Exception
{
worker = rtrTestUtils.makeWorker(WORKER_HOST, 3);
}
private void disableWorker() throws Exception
{
rtrTestUtils.disableWorker(worker);
}
private boolean taskAnnounced(final String taskId)
{
return rtrTestUtils.taskAnnounced(WORKER_HOST, taskId);
}
private boolean workerRunningTask(final String taskId)
{
return rtrTestUtils.workerRunningTask(WORKER_HOST, taskId);
}
private boolean workerCompletedTask(final ListenableFuture<TaskStatus> result)
{
return TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return result.isDone();
}
}
);
}
private void mockWorkerRunningTask(final Task task) throws Exception
{
rtrTestUtils.mockWorkerRunningTask("worker", task);
}
private void mockWorkerCompleteSuccessfulTask(final Task task) throws Exception
{
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker", task);
}
private void mockWorkerCompleteFailedTask(final Task task) throws Exception
{
rtrTestUtils.mockWorkerCompleteFailedTask("worker", task);
}
@Test
public void testFindLazyWorkerTaskRunning() throws Exception
{
doSetup();
remoteTaskRunner.start();
remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<ImmutableWorkerInfo>()
{
@Override
public boolean apply(ImmutableWorkerInfo input)
{
return true;
}
}, 1
);
Assert.assertTrue(lazyworkers.isEmpty());
Assert.assertTrue(remoteTaskRunner.getLazyWorkers().isEmpty());
Assert.assertEquals(1, remoteTaskRunner.getWorkers().size());
}
@Test
public void testFindLazyWorkerForWorkerJustAssignedTask() throws Exception
{
doSetup();
remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<ImmutableWorkerInfo>()
{
@Override
public boolean apply(ImmutableWorkerInfo input)
{
return true;
}
}, 1
);
Assert.assertTrue(lazyworkers.isEmpty());
Assert.assertTrue(remoteTaskRunner.getLazyWorkers().isEmpty());
Assert.assertEquals(1, remoteTaskRunner.getWorkers().size());
}
@Test
public void testFindLazyWorkerNotRunningAnyTask() throws Exception
{
doSetup();
Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<ImmutableWorkerInfo>()
{
@Override
public boolean apply(ImmutableWorkerInfo input)
{
return true;
}
}, 1
);
Assert.assertEquals(1, lazyworkers.size());
Assert.assertEquals(1, remoteTaskRunner.getLazyWorkers().size());
Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount());
Assert.assertEquals(3, remoteTaskRunner.getLazyTaskSlotCount());
}
@Test
public void testFindLazyWorkerNotRunningAnyTaskButWithZeroMaxWorkers() throws Exception
{
doSetup();
Collection<Worker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<ImmutableWorkerInfo>()
{
@Override
public boolean apply(ImmutableWorkerInfo input)
{
return true;
}
}, 0
);
Assert.assertEquals(0, lazyworkers.size());
Assert.assertEquals(0, remoteTaskRunner.getLazyWorkers().size());
}
@Test
public void testWorkerZKReconnect() throws Exception
{
makeWorker();
makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT5M")));
Future<TaskStatus> future = remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
byte[] bytes = cf.getData().forPath(ANNOUCEMENTS_PATH);
cf.delete().forPath(ANNOUCEMENTS_PATH);
// worker task cleanup scheduled
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getRemovedWorkerCleanups().containsKey(worker.getHost());
}
}
)
);
// Worker got reconnected
cf.create().forPath(ANNOUCEMENTS_PATH, bytes);
// worker task cleanup should get cancelled and removed
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return !remoteTaskRunner.getRemovedWorkerCleanups().containsKey(worker.getHost());
}
}
)
);
mockWorkerCompleteSuccessfulTask(task);
TaskStatus status = future.get();
Assert.assertEquals(status.getStatusCode(), TaskState.SUCCESS);
Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
}
@Test
public void testSortByInsertionTime()
{
RemoteTaskRunnerWorkItem item1 = new RemoteTaskRunnerWorkItem("b", "t", null, null, "ds_test")
.withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:03Z"));
RemoteTaskRunnerWorkItem item2 = new RemoteTaskRunnerWorkItem("a", "t", null, null, "ds_test")
.withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:02Z"));
RemoteTaskRunnerWorkItem item3 = new RemoteTaskRunnerWorkItem("c", "t", null, null, "ds_test")
.withQueueInsertionTime(DateTimes.of("2015-01-01T00:00:01Z"));
ArrayList<RemoteTaskRunnerWorkItem> workItems = Lists.newArrayList(item1, item2, item3);
RemoteTaskRunner.sortByInsertionTime(workItems);
Assert.assertEquals(item3, workItems.get(0));
Assert.assertEquals(item2, workItems.get(1));
Assert.assertEquals(item1, workItems.get(2));
}
@Test
public void testBlacklistZKWorkers() throws Exception
{
Period timeoutPeriod = Period.millis(1000);
makeWorker();
RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod);
rtrConfig.setMaxPercentageBlacklistWorkers(100);
makeRemoteTaskRunner(rtrConfig);
TestRealtimeTask task1 = new TestRealtimeTask(
"realtime1",
new TaskResource("realtime1", 1),
"foo",
TaskStatus.success("realtime1"),
jsonMapper
);
Future<TaskStatus> taskFuture1 = remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);
mockWorkerCompleteFailedTask(task1);
Assert.assertTrue(taskFuture1.get().isFailure());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(
1,
remoteTaskRunner.findWorkerRunningTask(task1.getId()).getContinuouslyFailedTasksCount()
);
TestRealtimeTask task2 = new TestRealtimeTask(
"realtime2",
new TaskResource("realtime2", 1),
"foo",
TaskStatus.running("realtime2"),
jsonMapper
);
Future<TaskStatus> taskFuture2 = remoteTaskRunner.run(task2);
Assert.assertTrue(taskAnnounced(task2.getId()));
mockWorkerRunningTask(task2);
mockWorkerCompleteFailedTask(task2);
Assert.assertTrue(taskFuture2.get().isFailure());
Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(
2,
remoteTaskRunner.findWorkerRunningTask(task2.getId()).getContinuouslyFailedTasksCount()
);
((RemoteTaskRunnerTestUtils.TestableRemoteTaskRunner) remoteTaskRunner)
.setCurrentTimeMillis(System.currentTimeMillis());
remoteTaskRunner.checkBlackListedNodes();
Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size());
((RemoteTaskRunnerTestUtils.TestableRemoteTaskRunner) remoteTaskRunner)
.setCurrentTimeMillis(System.currentTimeMillis() + 2 * timeoutPeriod.toStandardDuration().getMillis());
remoteTaskRunner.checkBlackListedNodes();
// After backOffTime the nodes are removed from blacklist
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(
0,
remoteTaskRunner.findWorkerRunningTask(task2.getId()).getContinuouslyFailedTasksCount()
);
TestRealtimeTask task3 = new TestRealtimeTask(
"realtime3",
new TaskResource("realtime3", 1),
"foo",
TaskStatus.running("realtime3"),
jsonMapper
);
Future<TaskStatus> taskFuture3 = remoteTaskRunner.run(task3);
Assert.assertTrue(taskAnnounced(task3.getId()));
mockWorkerRunningTask(task3);
mockWorkerCompleteSuccessfulTask(task3);
Assert.assertTrue(taskFuture3.get().isSuccess());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(
0,
remoteTaskRunner.findWorkerRunningTask(task3.getId()).getContinuouslyFailedTasksCount()
);
}
/**
* With 2 workers and maxPercentageBlacklistWorkers(25), neither worker should ever be blacklisted even after
* exceeding maxRetriesBeforeBlacklist.
*/
@Test
public void testBlacklistZKWorkers25Percent() throws Exception
{
Period timeoutPeriod = Period.millis(1000);
rtrTestUtils.makeWorker("worker", 10);
rtrTestUtils.makeWorker("worker2", 10);
RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod);
rtrConfig.setMaxPercentageBlacklistWorkers(25);
makeRemoteTaskRunner(rtrConfig);
String firstWorker = null;
String secondWorker = null;
for (int i = 1; i < 13; i++) {
String taskId = StringUtils.format("rt-%d", i);
TestRealtimeTask task = new TestRealtimeTask(
taskId,
new TaskResource(taskId, 1),
"foo",
TaskStatus.success(taskId),
jsonMapper
);
Future<TaskStatus> taskFuture = remoteTaskRunner.run(task);
if (i == 1) {
if (rtrTestUtils.taskAnnounced("worker2", task.getId())) {
firstWorker = "worker2";
secondWorker = "worker";
} else {
firstWorker = "worker";
secondWorker = "worker2";
}
}
final String expectedWorker = i % 2 == 0 ? secondWorker : firstWorker;
Assert.assertTrue(rtrTestUtils.taskAnnounced(expectedWorker, task.getId()));
rtrTestUtils.mockWorkerRunningTask(expectedWorker, task);
rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, task);
Assert.assertTrue(taskFuture.get().isFailure());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(
((i + 1) / 2),
remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount()
);
}
}
/**
* With 2 workers and maxPercentageBlacklistWorkers(50), one worker should get blacklisted after the second failure
* and the second worker should never be blacklisted even after exceeding maxRetriesBeforeBlacklist.
*/
@Test
public void testBlacklistZKWorkers50Percent() throws Exception
{
Period timeoutPeriod = Period.millis(1000);
rtrTestUtils.makeWorker("worker", 10);
rtrTestUtils.makeWorker("worker2", 10);
RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod);
rtrConfig.setMaxPercentageBlacklistWorkers(50);
makeRemoteTaskRunner(rtrConfig);
String firstWorker = null;
String secondWorker = null;
for (int i = 1; i < 13; i++) {
String taskId = StringUtils.format("rt-%d", i);
TestRealtimeTask task = new TestRealtimeTask(
taskId,
new TaskResource(taskId, 1),
"foo",
TaskStatus.success(taskId),
jsonMapper
);
Future<TaskStatus> taskFuture = remoteTaskRunner.run(task);
if (i == 1) {
if (rtrTestUtils.taskAnnounced("worker2", task.getId())) {
firstWorker = "worker2";
secondWorker = "worker";
} else {
firstWorker = "worker";
secondWorker = "worker2";
}
}
final String expectedWorker = i % 2 == 0 || i > 4 ? secondWorker : firstWorker;
Assert.assertTrue(rtrTestUtils.taskAnnounced(expectedWorker, task.getId()));
rtrTestUtils.mockWorkerRunningTask(expectedWorker, task);
rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, task);
Assert.assertTrue(taskFuture.get().isFailure());
Assert.assertEquals(i > 2 ? 1 : 0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(
i > 4 ? i - 2 : ((i + 1) / 2),
remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount()
);
}
}
@Test
public void testSuccessfulTaskOnBlacklistedWorker() throws Exception
{
Period timeoutPeriod = Period.millis(1000);
makeWorker();
RemoteTaskRunnerConfig rtrConfig = new TestRemoteTaskRunnerConfig(timeoutPeriod);
rtrConfig.setMaxPercentageBlacklistWorkers(100);
makeRemoteTaskRunner(rtrConfig);
TestRealtimeTask task1 = new TestRealtimeTask(
"realtime1", new TaskResource("realtime1", 1), "foo", TaskStatus.success("realtime1"), jsonMapper
);
TestRealtimeTask task2 = new TestRealtimeTask(
"realtime2", new TaskResource("realtime2", 1), "foo", TaskStatus.success("realtime2"), jsonMapper
);
TestRealtimeTask task3 = new TestRealtimeTask(
"realtime3", new TaskResource("realtime3", 1), "foo", TaskStatus.success("realtime3"), jsonMapper
);
Future<TaskStatus> taskFuture1 = remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);
mockWorkerCompleteFailedTask(task1);
Assert.assertTrue(taskFuture1.get().isFailure());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount());
Future<TaskStatus> taskFuture2 = remoteTaskRunner.run(task2);
Assert.assertTrue(taskAnnounced(task2.getId()));
mockWorkerRunningTask(task2);
Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount());
Future<TaskStatus> taskFuture3 = remoteTaskRunner.run(task3);
Assert.assertTrue(taskAnnounced(task3.getId()));
mockWorkerRunningTask(task3);
mockWorkerCompleteFailedTask(task3);
Assert.assertTrue(taskFuture3.get().isFailure());
Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(3, remoteTaskRunner.getBlacklistedTaskSlotCount());
mockWorkerCompleteSuccessfulTask(task2);
Assert.assertTrue(taskFuture2.get().isSuccess());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount());
}
@Test
public void testStatusListenerEventDataNullShouldNotThrowException() throws Exception
{
// Set up mock emitter to verify log alert when exception is thrown inside the status listener
Worker worker = EasyMock.createMock(Worker.class);
EasyMock.expect(worker.getHost()).andReturn("host").atLeastOnce();
EasyMock.replay(worker);
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
Capture<EmittingLogger.EmittingAlertBuilder> capturedArgument = Capture.newInstance();
emitter.emit(EasyMock.capture(capturedArgument));
EasyMock.expectLastCall().atLeastOnce();
EmittingLogger.registerEmitter(emitter);
EasyMock.replay(emitter);
PathChildrenCache cache = new PathChildrenCache(cf, "/test", true);
testStartWithNoWorker();
cache.getListenable().addListener(remoteTaskRunner.getStatusListener(worker, new ZkWorker(worker, cache, jsonMapper), null));
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
// Status listener will recieve event with null data
Assert.assertTrue(
TestUtils.conditionValid(() -> cache.getCurrentData().size() == 1)
);
// Verify that the log emitter was called
EasyMock.verify(worker);
EasyMock.verify(emitter);
Map<String, Object> alertDataMap = capturedArgument.getValue().build(null).getDataMap();
Assert.assertTrue(alertDataMap.containsKey("znode"));
Assert.assertNull(alertDataMap.get("znode"));
// Status listener should successfully completes without throwing exception
}
}