blob: 3a7e6d0aeac679944538f9dfbf2e655f7413b690 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.llap.tezplugins;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.records.TezTaskID;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl;
import org.apache.hadoop.hive.llap.testhelpers.ControlledClock;
import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService.TaskInfo;
import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService.TaskInfo.State;
import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableSet;
public class TestLlapTaskSchedulerService {
private static final Logger LOG = LoggerFactory.getLogger(TestLlapTaskSchedulerService.class);
private static final String HOST1 = "host1";
private static final String HOST2 = "host2";
private static final String HOST3 = "host3";
private static final String HOST4 = "host4";
@Test(timeout = 10000)
public void testSimpleLocalAllocation() throws IOException, InterruptedException {
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
try {
Priority priority1 = Priority.newInstance(1);
String[] hosts1 = new String[]{HOST1};
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie1 = new Object();
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1);
tsWrapper.awaitLocalTaskAllocations(1);
verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class));
assertEquals(1, tsWrapper.ts.dagStats.getNumLocalAllocations());
assertEquals(1, tsWrapper.ts.dagStats.getNumAllocationsPerHost().get(HOST1).get());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testGuaranteedScheduling() throws IOException, InterruptedException {
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
// Schedule a task - it should get the only duck; the 2nd one at the same pri doesn't get one.
// When the first one finishes, the duck goes to the 2nd, and then becomes unused.
try {
Priority priority = Priority.newInstance(1);
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(), task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie1 = new Object(), clientCookie2 = new Object();
tsWrapper.ts.updateGuaranteedCount(1);
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, null, priority, clientCookie1);
tsWrapper.awaitTotalTaskAllocations(1);
TaskInfo ti = tsWrapper.ts.getTaskInfo(task1);
assertTrue(ti.isGuaranteed());
assertEquals(State.ASSIGNED, ti.getState());
assertEquals(0, tsWrapper.ts.getUnusedGuaranteedCount());
tsWrapper.allocateTask(task2, null, priority, clientCookie2);
tsWrapper.awaitTotalTaskAllocations(2);
TaskInfo ti2 = tsWrapper.ts.getTaskInfo(task2);
assertFalse(ti2.isGuaranteed());
assertEquals(0, tsWrapper.ts.getUnusedGuaranteedCount());
tsWrapper.deallocateTask(task1, true, TaskAttemptEndReason.CONTAINER_EXITED);
assertTrue(ti2.isGuaranteed());
assertEquals(0, tsWrapper.ts.getUnusedGuaranteedCount());
tsWrapper.deallocateTask(task2, true, TaskAttemptEndReason.CONTAINER_EXITED);
assertEquals(1, tsWrapper.ts.getUnusedGuaranteedCount());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testGuaranteedTransfer() throws IOException, InterruptedException {
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
// Schedule low pri first. When high pri is scheduled, it takes away the duck from the
// low pri task. When the high pri finishes, low pri gets the duck back.
try {
Priority highPri = Priority.newInstance(1), lowPri = Priority.newInstance(2);
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(), task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
tsWrapper.ts.updateGuaranteedCount(1);
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, null, lowPri, new Object());
tsWrapper.awaitTotalTaskAllocations(1);
TaskInfo ti1 = tsWrapper.ts.getTaskInfo(task1);
assertTrue(ti1.isGuaranteed());
assertEquals(State.ASSIGNED, ti1.getState());
assertEquals(0, tsWrapper.ts.getUnusedGuaranteedCount());
tsWrapper.allocateTask(task2, null, highPri, new Object());
tsWrapper.awaitTotalTaskAllocations(2);
TaskInfo ti2 = tsWrapper.ts.getTaskInfo(task2);
assertTrue(ti2.isGuaranteed());
assertFalse(ti1.isGuaranteed());
assertEquals(0, tsWrapper.ts.getUnusedGuaranteedCount());
tsWrapper.deallocateTask(task2, true, TaskAttemptEndReason.CONTAINER_EXITED);
assertTrue(ti1.isGuaranteed());
assertEquals(0, tsWrapper.ts.getUnusedGuaranteedCount());
tsWrapper.deallocateTask(task1, true, TaskAttemptEndReason.CONTAINER_EXITED);
assertEquals(1, tsWrapper.ts.getUnusedGuaranteedCount());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testChangeGuaranteedTotal() throws IOException, InterruptedException {
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
// Schedule 3 tasks. Give out two ducks - two higher pri tasks get them. Give out 2 more
// - the last task gets it and one duck is unused. Give out 2 more - goes to unused.
// Then revoke similarly in steps (1, 4, 1), with the opposite effect.
try {
Priority highPri = Priority.newInstance(1), lowPri = Priority.newInstance(2);
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(), task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(), task3 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
tsWrapper.ts.updateGuaranteedCount(0);
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, null, highPri, new Object());
tsWrapper.allocateTask(task2, null, lowPri, new Object());
tsWrapper.allocateTask(task3, null, lowPri, new Object());
tsWrapper.awaitTotalTaskAllocations(3);
TaskInfo ti1 = tsWrapper.ts.getTaskInfo(task1), ti2 = tsWrapper.ts.getTaskInfo(task2),
ti3 = tsWrapper.ts.getTaskInfo(task3);
assertFalse(ti1.isGuaranteed() || ti2.isGuaranteed() || ti3.isGuaranteed());
assertEquals(0, tsWrapper.ts.getUnusedGuaranteedCount());
tsWrapper.ts.updateGuaranteedCount(2);
assertTrue(ti1.isGuaranteed());
// This particular test doesn't care which of the lower pri tasks gets the duck.
TaskInfo ti23High = ti2.isGuaranteed() ? ti2 : ti3, ti23Low = (ti2 == ti23High) ? ti3 : ti2;
assertTrue(ti23High.isGuaranteed());
assertFalse(ti23Low.isGuaranteed());
assertEquals(0, tsWrapper.ts.getUnusedGuaranteedCount());
tsWrapper.ts.updateGuaranteedCount(4);
assertTrue(ti1.isGuaranteed());
assertTrue(ti23High.isGuaranteed());
assertTrue(ti23Low.isGuaranteed());
assertEquals(1, tsWrapper.ts.getUnusedGuaranteedCount());
tsWrapper.ts.updateGuaranteedCount(6);
assertTrue(ti1.isGuaranteed());
assertTrue(ti23High.isGuaranteed());
assertTrue(ti23Low.isGuaranteed());
assertEquals(3, tsWrapper.ts.getUnusedGuaranteedCount());
tsWrapper.ts.updateGuaranteedCount(5);
assertTrue(ti1.isGuaranteed());
assertTrue(ti23High.isGuaranteed());
assertTrue(ti23Low.isGuaranteed());
assertEquals(2, tsWrapper.ts.getUnusedGuaranteedCount());
tsWrapper.ts.updateGuaranteedCount(1);
assertTrue(ti1.isGuaranteed());
assertFalse(ti23High.isGuaranteed());
assertFalse(ti23Low.isGuaranteed());
assertEquals(0, tsWrapper.ts.getUnusedGuaranteedCount());
tsWrapper.ts.updateGuaranteedCount(0);
assertFalse(ti1.isGuaranteed());
assertFalse(ti23High.isGuaranteed());
assertFalse(ti23Low.isGuaranteed());
assertEquals(0, tsWrapper.ts.getUnusedGuaranteedCount());
tsWrapper.deallocateTask(task1, true, TaskAttemptEndReason.CONTAINER_EXITED);
tsWrapper.deallocateTask(task2, true, TaskAttemptEndReason.CONTAINER_EXITED);
tsWrapper.deallocateTask(task3, true, TaskAttemptEndReason.CONTAINER_EXITED);
assertEquals(0, tsWrapper.ts.getUnusedGuaranteedCount());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 20000)
public void testConcurrentUpdates() throws IOException, InterruptedException {
final TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
// Test 4 variations of callbacks. 2 increases/2 revokes - do not update the same task again;
// Then, increase + decrease and decrease + increase, the 2nd call coming after the message is sent;
// the message callback should undo the change.
try {
Priority highPri = Priority.newInstance(1), lowPri = Priority.newInstance(2);
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(), task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
tsWrapper.ts.updateGuaranteedCount(0);
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, null, highPri, new Object());
tsWrapper.allocateTask(task2, null, lowPri, new Object());
tsWrapper.awaitTotalTaskAllocations(2);
TaskInfo ti1 = tsWrapper.ts.getTaskInfo(task1), ti2 = tsWrapper.ts.getTaskInfo(task2);
assertFalse(ti1.isGuaranteed() || ti2.isGuaranteed());
// Boring scenario #1 - two concurrent increases.
tsWrapper.ts.updateGuaranteedCount(1);
tsWrapper.ts.waitForMessagesSent(1);
assertTrue(ti1.isGuaranteed());
assertFalse(ti1.getLastSetGuaranteed()); // Not updated yet.
assertFalse(ti2.isGuaranteed());
// We are now "sending" a message... update again, "return" both callbacks.
tsWrapper.ts.updateGuaranteedCount(2);
tsWrapper.ts.waitForMessagesSent(1);
assertTrue(ti1.isGuaranteed());
assertTrue(ti2.isGuaranteed());
tsWrapper.ts.handleUpdateResult(ti1, true);
tsWrapper.ts.handleUpdateResult(ti2, true);
assertTrue(ti1.isGuaranteed());
assertTrue(ti2.isGuaranteed());
assertTrue(ti1.getLastSetGuaranteed());
assertTrue(ti2.getLastSetGuaranteed());
// Boring scenario #2 - two concurrent revokes. Same as above.
tsWrapper.ts.updateGuaranteedCount(1);
tsWrapper.ts.waitForMessagesSent(1);
assertTrue(ti1.isGuaranteed());
assertFalse(ti2.isGuaranteed());
assertTrue(ti2.getLastSetGuaranteed()); // Not updated yet.
tsWrapper.ts.updateGuaranteedCount(0);
tsWrapper.ts.waitForMessagesSent(1);
assertFalse(ti1.isGuaranteed());
assertFalse(ti2.isGuaranteed());
tsWrapper.ts.handleUpdateResult(ti1, true);
tsWrapper.ts.handleUpdateResult(ti2, true);
assertFalse(ti1.isGuaranteed());
assertFalse(ti2.isGuaranteed());
assertFalse(ti1.getLastSetGuaranteed());
assertFalse(ti2.getLastSetGuaranteed());
// Concurrent increase and revocation, then another increase - after the message is sent.
tsWrapper.ts.updateGuaranteedCount(1);
tsWrapper.ts.waitForMessagesSent(1);
assertTrue(ti1.isGuaranteed());
assertFalse(ti1.getLastSetGuaranteed()); // Not updated yet.
assertTrue(ti1.isUpdateInProgress());
tsWrapper.ts.updateGuaranteedCount(0);
tsWrapper.ts.assertNoMessagesSent(); // We are revoking from an updating task.
assertFalse(ti1.isGuaranteed());
tsWrapper.ts.handleUpdateResult(ti1, true);
tsWrapper.ts.waitForMessagesSent(1); // We should send a message to undo what we just did.
assertFalse(ti1.isGuaranteed());
assertTrue(ti1.getLastSetGuaranteed());
assertTrue(ti1.isUpdateInProgress());
tsWrapper.ts.updateGuaranteedCount(1);
tsWrapper.ts.assertNoMessagesSent();
assertTrue(ti1.isGuaranteed());
assertTrue(ti1.getLastSetGuaranteed());
tsWrapper.ts.handleUpdateResult(ti1, true);
tsWrapper.ts.waitForMessagesSent(1);
assertTrue(ti1.isGuaranteed());
assertFalse(ti1.getLastSetGuaranteed());
assertTrue(ti1.isUpdateInProgress());
tsWrapper.ts.handleUpdateResult(ti1, true);
tsWrapper.ts.assertNoMessagesSent();
assertTrue(ti1.isGuaranteed());
assertTrue(ti1.getLastSetGuaranteed());
assertFalse(ti1.isUpdateInProgress());
tsWrapper.deallocateTask(task1, true, TaskAttemptEndReason.CONTAINER_EXITED);
tsWrapper.deallocateTask(task2, true, TaskAttemptEndReason.CONTAINER_EXITED);
assertEquals(1, tsWrapper.ts.getUnusedGuaranteedCount());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testUpdateOnFinishingTask() throws IOException, InterruptedException {
final TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
// The update fails because the task has terminated on the node.
try {
Priority highPri = Priority.newInstance(1), lowPri = Priority.newInstance(2);
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(), task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
tsWrapper.ts.updateGuaranteedCount(0);
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, null, highPri, new Object());
tsWrapper.allocateTask(task2, null, lowPri, new Object());
tsWrapper.awaitTotalTaskAllocations(2);
TaskInfo ti1 = tsWrapper.ts.getTaskInfo(task1), ti2 = tsWrapper.ts.getTaskInfo(task2);
// Concurrent increase and termination, increase fails.
tsWrapper.ts.updateGuaranteedCount(1);
tsWrapper.ts.waitForMessagesSent(1);
assertTrue(ti1.isGuaranteed());
assertFalse(ti1.getLastSetGuaranteed()); // Not updated yet.
assertTrue(ti1.isUpdateInProgress());
tsWrapper.deallocateTask(task1, true, TaskAttemptEndReason.CONTAINER_EXITED);
tsWrapper.ts.handleUpdateResult(ti1, false);
// We must have the duck still; it should just go to the other task.
assertTrue(ti2.isGuaranteed());
assertTrue(ti2.isUpdateInProgress());
tsWrapper.ts.handleUpdateResult(ti2, false);
tsWrapper.deallocateTask(task2, true, TaskAttemptEndReason.CONTAINER_EXITED);
// Same; with the termination after the failed update, we should maintain the correct count.
assertEquals(1, tsWrapper.ts.getUnusedGuaranteedCount());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testConcurrentUpdateWithError() throws IOException, InterruptedException {
final TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
// The update has failed but the state has changed since then - no retry needed.
try {
Priority highPri = Priority.newInstance(1);
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
tsWrapper.ts.updateGuaranteedCount(0);
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, null, highPri, new Object());
tsWrapper.awaitTotalTaskAllocations(1);
TaskInfo ti1 = tsWrapper.ts.getTaskInfo(task1);
assertFalse(ti1.isGuaranteed());
// Concurrent increase and revocation, increase fails - no revocation is needed.
tsWrapper.ts.updateGuaranteedCount(1);
tsWrapper.ts.waitForMessagesSent(1);
assertTrue(ti1.isGuaranteed());
assertFalse(ti1.getLastSetGuaranteed()); // Not updated yet.
assertTrue(ti1.isUpdateInProgress());
tsWrapper.ts.updateGuaranteedCount(0);
tsWrapper.ts.assertNoMessagesSent(); // We are revoking from an updating task.
assertFalse(ti1.isGuaranteed());
tsWrapper.ts.handleUpdateResult(ti1, false);
assertFalse(ti1.isGuaranteed());
assertFalse(ti1.getLastSetGuaranteed());
assertFalse(ti1.isUpdateInProgress());
tsWrapper.ts.assertNoMessagesSent();
tsWrapper.deallocateTask(task1, true, TaskAttemptEndReason.CONTAINER_EXITED);
assertEquals(0, tsWrapper.ts.getUnusedGuaranteedCount());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testUpdateWithError() throws IOException, InterruptedException {
final TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
// The update has failed; we'd try with another candidate first, but only at the same priority.
try {
Priority highPri = Priority.newInstance(1), lowPri = Priority.newInstance(2);
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(), task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(), task3 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
tsWrapper.ts.updateGuaranteedCount(0);
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, null, highPri, new Object());
tsWrapper.allocateTask(task2, null, highPri, new Object());
tsWrapper.awaitTotalTaskAllocations(2);
TaskInfo ti1 = tsWrapper.ts.getTaskInfo(task1), ti2 = tsWrapper.ts.getTaskInfo(task2);
assertFalse(ti1.isGuaranteed() || ti2.isGuaranteed());
tsWrapper.ts.updateGuaranteedCount(1);
tsWrapper.ts.waitForMessagesSent(1);
TaskInfo tiHigher = ti1.isGuaranteed() ? ti1 : ti2, tiLower = (tiHigher == ti1) ? ti2 : ti1;
assertTrue(tiHigher.isGuaranteed());
assertFalse(tiHigher.getLastSetGuaranteed()); // Not updated yet.
assertTrue(tiHigher.isUpdateInProgress());
tsWrapper.ts.handleUpdateResult(tiHigher, false); // Update has failed. We should try task2.
tsWrapper.ts.waitForMessagesSent(1);
assertFalse(tiHigher.isGuaranteed());
assertFalse(tiHigher.getLastSetGuaranteed());
assertFalse(tiHigher.isUpdateInProgress());
assertTrue(tiLower.isGuaranteed());
assertFalse(tiLower.getLastSetGuaranteed());
assertTrue(tiLower.isUpdateInProgress());
// Fail the 2nd update too to get rid of the duck for the next test.
tsWrapper.ts.updateGuaranteedCount(0);
tsWrapper.ts.handleUpdateResult(tiLower, false);
tsWrapper.ts.assertNoMessagesSent();
// Now run a lower priority task.
tsWrapper.deallocateTask(task2, true, TaskAttemptEndReason.CONTAINER_EXITED);
tsWrapper.allocateTask(task3, null, lowPri, new Object());
tsWrapper.awaitTotalTaskAllocations(3);
TaskInfo ti3 = tsWrapper.ts.getTaskInfo(task3);
tsWrapper.ts.updateGuaranteedCount(1);
tsWrapper.ts.waitForMessagesSent(1);
assertTrue(ti1.isGuaranteed());
assertTrue(ti1.isUpdateInProgress());
tsWrapper.ts.handleUpdateResult(ti1, false); // Update has failed. We won't try a low pri task.
assertTrue(ti1.isGuaranteed());
assertFalse(ti1.getLastSetGuaranteed());
assertTrue(ti1.isUpdateInProgress());
assertFalse(ti3.isGuaranteed());
assertFalse(ti3.isUpdateInProgress());
assertEquals(0, tsWrapper.ts.getUnusedGuaranteedCount());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testConcurrentUpdatesBeforeMessage() throws IOException, InterruptedException {
final TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
// 2 more variations of callbacks; increase + decrease and decrease + increase, the 2nd call coming
// before the message is sent; no message should ever be sent.
try {
Priority highPri = Priority.newInstance(1), lowPri = Priority.newInstance(2);
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(), task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
tsWrapper.ts.updateGuaranteedCount(0);
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, null, highPri, new Object());
tsWrapper.allocateTask(task2, null, lowPri, new Object());
tsWrapper.awaitTotalTaskAllocations(2);
TaskInfo ti1 = tsWrapper.ts.getTaskInfo(task1), ti2 = tsWrapper.ts.getTaskInfo(task2);
assertFalse(ti1.isGuaranteed() || ti2.isGuaranteed());
// Concurrent increase and revocation - before the message is sent.
tsWrapper.ts.clearTestCounts();
tsWrapper.ts.setDelayCheckAndSend(true);
Thread updateThread = new Thread(new Runnable() {
@Override
public void run() {
tsWrapper.ts.updateGuaranteedCount(1);
}
}, "test-update-thread");
updateThread.start(); // This should eventually hang in the delay code.
tsWrapper.ts.waitForCheckAndSendCall(1); // From the background thread.
assertTrue(ti1.isGuaranteed());
assertTrue(ti1.isUpdateInProgress());
assertFalse(ti1.getLastSetGuaranteed());
tsWrapper.ts.updateGuaranteedCount(0); // This won't go into checkAndSend.
tsWrapper.ts.assertNoMessagesSent();
// Release the background thread.
tsWrapper.ts.setDelayCheckAndSend(false);
updateThread.join();
tsWrapper.ts.assertNoMessagesSent(); // No message is needed.
assertFalse(ti1.isGuaranteed());
assertFalse(ti1.getLastSetGuaranteed());
assertFalse(ti1.isUpdateInProgress());
// Concurrent revocation and increase - before the message is sent.
// First, actually give it a duck.
tsWrapper.ts.updateGuaranteedCount(1);
tsWrapper.ts.handleUpdateResult(ti1, true);
tsWrapper.ts.clearTestCounts();
assertTrue(ti1.isGuaranteed() && ti1.getLastSetGuaranteed());
tsWrapper.ts.setDelayCheckAndSend(true);
updateThread = new Thread(new Runnable() {
@Override
public void run() {
tsWrapper.ts.updateGuaranteedCount(0);
}
}, "test-update-thread");
updateThread.start(); // This should eventually hang in the delay code.
tsWrapper.ts.waitForCheckAndSendCall(1);
assertFalse(ti1.isGuaranteed());
assertTrue(ti1.isUpdateInProgress());
assertTrue(ti1.getLastSetGuaranteed());
tsWrapper.ts.updateGuaranteedCount(1); // This won't go into checkAndSend.
tsWrapper.ts.assertNoMessagesSent();
// Release the background thread.
tsWrapper.ts.setDelayCheckAndSend(false);
updateThread.join();
tsWrapper.ts.assertNoMessagesSent(); // No message is needed.
assertTrue(ti1.isGuaranteed());
assertTrue(ti1.getLastSetGuaranteed());
assertFalse(ti1.isUpdateInProgress());
tsWrapper.deallocateTask(task1, true, TaskAttemptEndReason.CONTAINER_EXITED);
tsWrapper.deallocateTask(task2, true, TaskAttemptEndReason.CONTAINER_EXITED);
assertEquals(1, tsWrapper.ts.getUnusedGuaranteedCount());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testHeartbeatInconsistency() throws IOException, InterruptedException {
final TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
// Guaranteed flag is inconsistent based on heartbeat - another message should be send.
try {
Priority highPri = Priority.newInstance(1);
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
tsWrapper.ts.updateGuaranteedCount(0);
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, null, highPri, new Object());
tsWrapper.awaitTotalTaskAllocations(1);
TaskInfo ti1 = tsWrapper.ts.getTaskInfo(task1);
assertFalse(ti1.isGuaranteed());
// Heartbeat indicates task has a duck - this must be reverted.
tsWrapper.ts.taskInfoUpdated(task1, true);
tsWrapper.ts.waitForMessagesSent(1);
assertTrue(ti1.getLastSetGuaranteed());
assertTrue(ti1.isUpdateInProgress());
assertFalse(ti1.isGuaranteed());
tsWrapper.ts.handleUpdateResult(ti1, true);
assertFalse(ti1.getLastSetGuaranteed());
tsWrapper.deallocateTask(task1, true, TaskAttemptEndReason.CONTAINER_EXITED);
assertEquals(0, tsWrapper.ts.getUnusedGuaranteedCount());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testSimpleNoLocalityAllocation() throws IOException, InterruptedException {
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
try {
Priority priority1 = Priority.newInstance(1);
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie1 = new Object();
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, null, priority1, clientCookie1);
tsWrapper.awaitTotalTaskAllocations(1);
verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class));
assertEquals(1, tsWrapper.ts.dagStats.getNumAllocationsNoLocalityRequest());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
@org.junit.Ignore("HIVE-25713")
public void testPreemption() throws InterruptedException, IOException {
Priority priority1 = Priority.newInstance(1);
Priority priority2 = Priority.newInstance(2);
String [] hosts = new String[] {HOST1};
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1);
try {
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie1 = "cookie1";
TezTaskAttemptID task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie2 = "cookie2";
TezTaskAttemptID task3 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie3 = "cookie3";
TezTaskAttemptID task4 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie4 = "cookie4";
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, hosts, priority2, clientCookie1);
tsWrapper.allocateTask(task2, hosts, priority2, clientCookie2);
tsWrapper.allocateTask(task3, hosts, priority2, clientCookie3);
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumLocalAllocations() == 2) {
break;
}
}
verify(tsWrapper.mockAppCallback, times(2)).taskAllocated(any(Object.class),
any(Object.class), any(Container.class));
assertEquals(2, tsWrapper.ts.dagStats.getNumLocalAllocations());
assertEquals(0, tsWrapper.ts.dagStats.getNumAllocationsNoLocalityRequest());
reset(tsWrapper.mockAppCallback);
tsWrapper.allocateTask(task4, hosts, priority1, clientCookie4);
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumPreemptedTasks() == 1) {
break;
}
}
verify(tsWrapper.mockAppCallback).preemptContainer(any(ContainerId.class));
tsWrapper.deallocateTask(task2, false, TaskAttemptEndReason.INTERNAL_PREEMPTION);
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumTotalAllocations() == 3) {
break;
}
}
verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4),
eq(clientCookie4), any(Container.class));
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testNodeDisabled() throws IOException, InterruptedException {
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(10000l);
try {
Priority priority1 = Priority.newInstance(1);
String[] hosts1 = new String[]{HOST1};
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie1 = new Object();
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1);
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumTotalAllocations() == 1) {
break;
}
}
verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1),
any(Container.class));
assertEquals(1, tsWrapper.ts.dagStats.getNumLocalAllocations());
assertEquals(0, tsWrapper.ts.dagStats.getNumAllocationsNoLocalityRequest());
assertEquals(0, tsWrapper.ts.dagStats.getNumNonLocalAllocations());
assertEquals(1, tsWrapper.ts.dagStats.getNumTotalAllocations());
tsWrapper.resetAppCallback();
tsWrapper.clock.setTime(10000l);
tsWrapper.rejectExecution(task1);
// Verify that the node is blacklisted
assertEquals(1, tsWrapper.ts.dagStats.getNumRejectedTasks());
assertEquals(3, tsWrapper.ts.instanceToNodeMap.size());
LlapTaskSchedulerService.NodeInfo disabledNodeInfo = tsWrapper.ts.disabledNodesQueue.peek();
assertNotNull(disabledNodeInfo);
assertEquals(HOST1, disabledNodeInfo.getHost());
assertEquals((10000l), disabledNodeInfo.getDelay(TimeUnit.MILLISECONDS));
assertEquals((10000l + 10000l), disabledNodeInfo.expireTimeMillis);
TezTaskAttemptID task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie2 = new Object();
tsWrapper.allocateTask(task2, hosts1, priority1, clientCookie2);
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumTotalAllocations() == 2) {
break;
}
}
verify(tsWrapper.mockAppCallback).taskAllocated(eq(task2), eq(clientCookie2), any(Container.class));
assertEquals(1, tsWrapper.ts.dagStats.getNumLocalAllocations());
assertEquals(0, tsWrapper.ts.dagStats.getNumAllocationsNoLocalityRequest());
assertEquals(1, tsWrapper.ts.dagStats.getNumNonLocalAllocations());
assertEquals(2, tsWrapper.ts.dagStats.getNumTotalAllocations());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testNodeReEnabled() throws InterruptedException, IOException {
// Based on actual timing.
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(1000l);
try {
Priority priority1 = Priority.newInstance(1);
String[] hosts1 = new String[]{HOST1};
String[] hosts2 = new String[]{HOST2};
String[] hosts3 = new String[]{HOST3};
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie1 = new Object();
TezTaskAttemptID task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie2 = new Object();
TezTaskAttemptID task3 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie3 = new Object();
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1);
tsWrapper.allocateTask(task2, hosts2, priority1, clientCookie2);
tsWrapper.allocateTask(task3, hosts3, priority1, clientCookie3);
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumTotalAllocations() == 3) {
break;
}
}
verify(tsWrapper.mockAppCallback, times(3)).taskAllocated(any(Object.class), any(Object.class), any(Container.class));
assertEquals(3, tsWrapper.ts.dagStats.getNumLocalAllocations());
assertEquals(0, tsWrapper.ts.dagStats.getNumAllocationsNoLocalityRequest());
assertEquals(3, tsWrapper.ts.dagStats.getNumTotalAllocations());
tsWrapper.resetAppCallback();
tsWrapper.rejectExecution(task1);
tsWrapper.rejectExecution(task2);
tsWrapper.rejectExecution(task3);
// Verify that the node is blacklisted
assertEquals(3, tsWrapper.ts.dagStats.getNumRejectedTasks());
assertEquals(3, tsWrapper.ts.instanceToNodeMap.size());
assertEquals(3, tsWrapper.ts.disabledNodesQueue.size());
TezTaskAttemptID task4 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie4 = new Object();
TezTaskAttemptID task5 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie5 = new Object();
TezTaskAttemptID task6 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie6 = new Object();
tsWrapper.allocateTask(task4, hosts1, priority1, clientCookie4);
tsWrapper.allocateTask(task5, hosts2, priority1, clientCookie5);
tsWrapper.allocateTask(task6, hosts3, priority1, clientCookie6);
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumTotalAllocations() == 6) {
break;
}
}
ArgumentCaptor<Container> argumentCaptor = ArgumentCaptor.forClass(Container.class);
verify(tsWrapper.mockAppCallback, times(3)).taskAllocated(any(Object.class), any(Object.class), argumentCaptor.capture());
// which affects the locality matching
assertEquals(0, tsWrapper.ts.dagStats.getNumAllocationsNoLocalityRequest());
assertEquals(6, tsWrapper.ts.dagStats.getNumTotalAllocations());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testForceLocalityTest1() throws IOException, InterruptedException {
// 2 hosts. 2 per host. 5 requests at the same priority.
// First 3 on host1, Next at host2, Last with no host.
// Third request on host1 should not be allocated immediately.
forceLocalityTest1(true);
}
@Test(timeout = 10000)
public void testNoForceLocalityCounterTest1() throws IOException, InterruptedException {
// 2 hosts. 2 per host. 5 requests at the same priority.
// First 3 on host1, Next at host2, Last with no host.
// Third should allocate on host2, 4th on host2, 5th will wait.
forceLocalityTest1(false);
}
private void forceLocalityTest1(boolean forceLocality) throws IOException, InterruptedException {
Priority priority1 = Priority.newInstance(1);
String[] hosts = new String[] {HOST1, HOST2};
String[] hostsH1 = new String[] {HOST1};
String[] hostsH2 = new String[] {HOST2};
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, (forceLocality ? -1l : 0l));
try {
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie1 = "cookie1";
TezTaskAttemptID task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie2 = "cookie2";
TezTaskAttemptID task3 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie3 = "cookie3";
TezTaskAttemptID task4 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie4 = "cookie4";
TezTaskAttemptID task5 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie5 = "cookie5";
tsWrapper.controlScheduler(true);
//H1 - should allocate
tsWrapper.allocateTask(task1, hostsH1, priority1, clientCookie1);
//H1 - should allocate
tsWrapper.allocateTask(task2, hostsH1, priority1, clientCookie2);
//H1 - no capacity if force, should allocate otherwise
tsWrapper.allocateTask(task3, hostsH1, priority1, clientCookie3);
//H2 - should allocate
tsWrapper.allocateTask(task4, hostsH2, priority1, clientCookie4);
//No location - should allocate if force, no capacity otherwise
tsWrapper.allocateTask(task5, null, priority1, clientCookie5);
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumTotalAllocations() == 4) {
break;
}
}
// Verify no preemption requests - since everything is at the same priority
verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
verify(tsWrapper.mockAppCallback, times(4)).taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
assertEquals(4, argumentCaptor.getAllValues().size());
assertEquals(task1, argumentCaptor.getAllValues().get(0));
assertEquals(task2, argumentCaptor.getAllValues().get(1));
if (forceLocality) {
// task3 not allocated
assertEquals(task4, argumentCaptor.getAllValues().get(2));
assertEquals(task5, argumentCaptor.getAllValues().get(3));
} else {
assertEquals(task3, argumentCaptor.getAllValues().get(2));
assertEquals(task4, argumentCaptor.getAllValues().get(3));
}
//Complete one task on host1.
tsWrapper.deallocateTask(task1, true, null);
reset(tsWrapper.mockAppCallback);
// Try scheduling again.
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumTotalAllocations() == 5) {
break;
}
}
verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
argumentCaptor = ArgumentCaptor.forClass(Object.class);
verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
assertEquals(1, argumentCaptor.getAllValues().size());
if (forceLocality) {
assertEquals(task3, argumentCaptor.getAllValues().get(0));
} else {
assertEquals(task5, argumentCaptor.getAllValues().get(0));
}
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testAdjustLocalityDelay() throws IOException, InterruptedException {
Priority priority1 = Priority.newInstance(1);
String[] host = new String[]{HOST1};
TestTaskSchedulerServiceWrapper tsWrapper =
new TestTaskSchedulerServiceWrapper(2000, host, 1, 0, 1000l);
try {
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie1 = "cookie1";
TezTaskAttemptID task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie2 = "cookie2";
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, host, priority1, clientCookie1);
tsWrapper.allocateTask(task2, host, priority1, clientCookie2);
// There are enough resources for 1 task, the second one should just adjustLocalityDelay
assertFalse(tsWrapper.ts.getTaskInfo(task2).adjustedLocalityDelay);
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumTotalAllocations() == 1) {
break;
}
}
// Active node instances do exist so delay should be adjusted
assertTrue(tsWrapper.ts.getTaskInfo(task2).adjustedLocalityDelay);
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testForcedLocalityUnknownHost() throws IOException, InterruptedException {
Priority priority1 = Priority.newInstance(1);
String[] hostsKnown = new String[]{HOST1};
String[] hostsUnknown = new String[]{HOST2};
TestTaskSchedulerServiceWrapper tsWrapper =
new TestTaskSchedulerServiceWrapper(2000, hostsKnown, 1, 1, -1l);
try {
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie1 = "cookie1";
TezTaskAttemptID task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie2 = "cookie2";
tsWrapper.controlScheduler(true);
// Should allocate since H2 is not known.
tsWrapper.allocateTask(task1, hostsUnknown, priority1, clientCookie1);
tsWrapper.allocateTask(task2, hostsKnown, priority1, clientCookie2);
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumTotalAllocations() == 2) {
break;
}
}
ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
verify(tsWrapper.mockAppCallback, times(2))
.taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
assertEquals(2, argumentCaptor.getAllValues().size());
assertEquals(task1, argumentCaptor.getAllValues().get(0));
assertEquals(task2, argumentCaptor.getAllValues().get(1));
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testHostPreferenceUnknownAndNotSpecified() throws IOException, InterruptedException {
Priority priority1 = Priority.newInstance(1);
String[] hostsKnown = new String[]{HOST1, HOST2};
String[] hostsUnknown = new String[]{HOST3};
String[] noHosts = new String[]{};
TestTaskSchedulerServiceWrapper tsWrapper =
new TestTaskSchedulerServiceWrapper(2000, hostsKnown, 1, 1, -1l);
try {
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie1 = "cookie1";
TezTaskAttemptID task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie2 = "cookie2";
TezTaskAttemptID task3 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie3 = "cookie3";
TezTaskAttemptID task4 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie4 = "cookie4";
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, hostsKnown, priority1, clientCookie1);
tsWrapper.allocateTask(task2, hostsKnown, priority1, clientCookie2);
tsWrapper.allocateTask(task3, hostsUnknown, priority1, clientCookie3);
tsWrapper.allocateTask(task4, noHosts, priority1, clientCookie4);
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumTotalAllocations() == 4) {
break;
}
}
ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
ArgumentCaptor<Container> argumentCaptor2 = ArgumentCaptor.forClass(Container.class);
verify(tsWrapper.mockAppCallback, times(4))
.taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture());
assertEquals(4, argumentCaptor.getAllValues().size());
assertEquals(task1, argumentCaptor.getAllValues().get(0));
assertEquals(task2, argumentCaptor.getAllValues().get(1));
assertEquals(task3, argumentCaptor.getAllValues().get(2));
assertEquals(task4, argumentCaptor.getAllValues().get(3));
// 1st task requested host1, got host1
assertEquals(HOST1, argumentCaptor2.getAllValues().get(0).getNodeId().getHost());
// 2nd task requested host1, got host1
assertEquals(HOST1, argumentCaptor2.getAllValues().get(1).getNodeId().getHost());
// 3rd task requested unknown host, got host2 since host1 is full and only host2 is left in random pool
assertEquals(HOST2, argumentCaptor2.getAllValues().get(2).getNodeId().getHost());
// 4rd task provided no location preference, got host2 since host1 is full and only host2 is left in random pool
assertEquals(HOST2, argumentCaptor2.getAllValues().get(3).getNodeId().getHost());
assertEquals(1, tsWrapper.ts.dagStats.getNumAllocationsNoLocalityRequest());
assertEquals(2, tsWrapper.ts.dagStats.getNumLocalAllocations());
assertEquals(1, tsWrapper.ts.dagStats.getNumNonLocalAllocations());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testHostPreferenceMissesConsistentRollover() throws IOException, InterruptedException {
Priority priority1 = Priority.newInstance(1);
String[] hostsKnown = new String[]{HOST1, HOST2, HOST3};
String[] hostsLive = new String[]{HOST1, HOST2, HOST3};
String[] hostsH2 = new String[]{HOST2};
TestTaskSchedulerServiceWrapper tsWrapper =
new TestTaskSchedulerServiceWrapper(2000, hostsKnown, 1, 0, 0l, false, hostsLive, true);
try {
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie1 = "cookie1";
TezTaskAttemptID task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie2 = "cookie2";
TezTaskAttemptID task3 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie3 = "cookie3";
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, hostsH2, priority1, clientCookie1);
tsWrapper.allocateTask(task2, hostsH2, priority1, clientCookie2);
tsWrapper.allocateTask(task3, hostsH2, priority1, clientCookie3);
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumTotalAllocations() == 3) {
break;
}
}
ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
ArgumentCaptor<Container> argumentCaptor2 = ArgumentCaptor.forClass(Container.class);
verify(tsWrapper.mockAppCallback, times(3))
.taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture());
assertEquals(3, argumentCaptor.getAllValues().size());
assertEquals(task1, argumentCaptor.getAllValues().get(0));
assertEquals(task2, argumentCaptor.getAllValues().get(1));
assertEquals(task3, argumentCaptor.getAllValues().get(2));
// 1st task requested host2, got host2
assertEquals(HOST2, argumentCaptor2.getAllValues().get(0).getNodeId().getHost());
// 2nd task requested host2, got host3 as host2 is full
assertEquals(HOST3, argumentCaptor2.getAllValues().get(1).getNodeId().getHost());
// 3rd task requested host2, got host1 as host2 and host3 are full
assertEquals(HOST1, argumentCaptor2.getAllValues().get(2).getNodeId().getHost());
verify(tsWrapper.mockServiceInstanceSet, atLeast(2)).getAllInstancesOrdered(true);
assertEquals(0, tsWrapper.ts.dagStats.getNumAllocationsNoLocalityRequest());
assertEquals(1, tsWrapper.ts.dagStats.getNumLocalAllocations());
assertEquals(2, tsWrapper.ts.dagStats.getNumNonLocalAllocations());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testHostPreferenceMissesConsistentPartialAlive() throws IOException, InterruptedException {
Priority priority1 = Priority.newInstance(1);
String[] hostsKnown = new String[]{HOST1, HOST2, HOST3, HOST4};
String[] hostsLive = new String[]{HOST1, HOST2, null, HOST4}; // host3 dead before scheduling
String[] hostsH2 = new String[]{HOST2};
String[] hostsH3 = new String[]{HOST3};
TestTaskSchedulerServiceWrapper tsWrapper =
new TestTaskSchedulerServiceWrapper(2000, hostsKnown, 1, 0, 0l, false, hostsLive, true);
try {
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie1 = "cookie1";
TezTaskAttemptID task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie2 = "cookie2";
TezTaskAttemptID task3 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie3 = "cookie3";
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, hostsH2, priority1, clientCookie1);
tsWrapper.allocateTask(task2, hostsH2, priority1, clientCookie2);
tsWrapper.allocateTask(task3, hostsH3, priority1, clientCookie3);
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumTotalAllocations() == 3) {
break;
}
}
ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
ArgumentCaptor<Container> argumentCaptor2 = ArgumentCaptor.forClass(Container.class);
verify(tsWrapper.mockAppCallback, times(3))
.taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture());
assertEquals(3, argumentCaptor.getAllValues().size());
assertEquals(task1, argumentCaptor.getAllValues().get(0));
assertEquals(task2, argumentCaptor.getAllValues().get(1));
assertEquals(task3, argumentCaptor.getAllValues().get(2));
// 1st task requested host2, got host2
assertEquals(HOST2, argumentCaptor2.getAllValues().get(0).getNodeId().getHost());
// 2nd task requested host2, got host4 since host3 is dead and host2 is full
assertEquals(HOST4, argumentCaptor2.getAllValues().get(1).getNodeId().getHost());
// 3rd task requested host3, got host1 since host3 is dead and host4 is full
assertEquals(HOST1, argumentCaptor2.getAllValues().get(2).getNodeId().getHost());
verify(tsWrapper.mockServiceInstanceSet, atLeast(2)).getAllInstancesOrdered(true);
assertEquals(0, tsWrapper.ts.dagStats.getNumAllocationsNoLocalityRequest());
assertEquals(1, tsWrapper.ts.dagStats.getNumLocalAllocations());
assertEquals(2, tsWrapper.ts.dagStats.getNumNonLocalAllocations());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testForcedLocalityPreemption() throws IOException, InterruptedException {
Priority priority1 = Priority.newInstance(1);
Priority priority2 = Priority.newInstance(2);
String [] hosts = new String[] {HOST1, HOST2};
String [] hostsH1 = new String[] {HOST1};
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 2, 0, -1l);
// Fill up host1 with p2 tasks.
// Leave host2 empty
// Try running p1 task on host1 - should preempt
try {
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie1 = "cookie1";
TezTaskAttemptID task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie2 = "cookie2";
TezTaskAttemptID task3 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie3 = "cookie3";
TezTaskAttemptID task4 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie4 = "cookie4";
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, hostsH1, priority2, clientCookie1);
tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2);
// This request at a lower priority should not affect anything.
tsWrapper.allocateTask(task3, hostsH1, priority2, clientCookie3);
tsWrapper.awaitLocalTaskAllocations(2);
verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
verify(tsWrapper.mockAppCallback, times(2))
.taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
assertEquals(2, argumentCaptor.getAllValues().size());
assertEquals(task1, argumentCaptor.getAllValues().get(0));
assertEquals(task2, argumentCaptor.getAllValues().get(1));
reset(tsWrapper.mockAppCallback);
// Allocate t4 at higher priority. t3 should not be allocated,
// and a preemption should be attempted on host1, despite host2 having available capacity
tsWrapper.allocateTask(task4, hostsH1, priority1, clientCookie4);
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumPreemptedTasks() == 1) {
break;
}
}
verify(tsWrapper.mockAppCallback).preemptContainer(any(ContainerId.class));
tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.INTERNAL_PREEMPTION);
tsWrapper.awaitLocalTaskAllocations(3);
verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4),
eq(clientCookie4), any(Container.class));
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testPreemptionChoiceTimeOrdering() throws IOException, InterruptedException {
Priority priority1 = Priority.newInstance(1);
Priority priority2 = Priority.newInstance(2);
String [] hosts = new String[] {HOST1, HOST2};
String [] hostsH1 = new String[] {HOST1};
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l);
try {
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie1 = "cookie1";
TezTaskAttemptID task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie2 = "cookie2";
TezTaskAttemptID task3 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie3 = "cookie3";
tsWrapper.controlScheduler(true);
ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
ArgumentCaptor<Container> cArgCaptor = ArgumentCaptor.forClass(Container.class);
// Request task1
tsWrapper.getClock().setTime(10000l);
tsWrapper.allocateTask(task1, hostsH1, priority2, clientCookie1);
tsWrapper.awaitLocalTaskAllocations(1);
verify(tsWrapper.mockAppCallback, times(1))
.taskAllocated(argumentCaptor.capture(), any(Object.class), cArgCaptor.capture());
ContainerId t1Cid = cArgCaptor.getValue().getId();
reset(tsWrapper.mockAppCallback);
// Move clock backwards (so that t1 allocation is after t2 allocation)
// Request task2 (task1 already started at previously set time)
tsWrapper.getClock().setTime(tsWrapper.getClock().getTime() - 1000);
tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2);
tsWrapper.awaitLocalTaskAllocations(2);
verify(tsWrapper.mockAppCallback, times(1))
.taskAllocated(argumentCaptor.capture(), any(Object.class), cArgCaptor.capture());
reset(tsWrapper.mockAppCallback);
// Move clock forward, and request a task at p=1
tsWrapper.getClock().setTime(tsWrapper.getClock().getTime() + 2000);
tsWrapper.allocateTask(task3, hostsH1, priority1, clientCookie3);
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumPreemptedTasks() == 1) {
break;
}
}
// Ensure task1 is preempted based on time (match it's allocated containerId)
ArgumentCaptor<ContainerId> cIdArgCaptor = ArgumentCaptor.forClass(ContainerId.class);
verify(tsWrapper.mockAppCallback).preemptContainer(cIdArgCaptor.capture());
assertEquals(t1Cid, cIdArgCaptor.getValue());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
@org.junit.Ignore("HIVE-25248")
public void testForcedLocalityMultiplePreemptionsSameHost1() throws IOException,
InterruptedException {
Priority priority1 = Priority.newInstance(1);
Priority priority2 = Priority.newInstance(2);
String [] hosts = new String[] {HOST1, HOST2};
String [] hostsH1 = new String[] {HOST1};
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l);
// Fill up host1 with p2 tasks.
// Leave host2 empty
// Try running p1 task on host1 - should preempt
// Await preemption request.
// Try running another p1 task on host1 - should preempt
// Await preemption request.
try {
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie1 = "cookie1";
TezTaskAttemptID task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie2 = "cookie2";
TezTaskAttemptID task3 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie3 = "cookie3";
TezTaskAttemptID task4 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie4 = "cookie4";
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, hostsH1, priority2, clientCookie1);
tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2);
tsWrapper.awaitLocalTaskAllocations(2);
verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
ArgumentCaptor<Container> cArgCaptor = ArgumentCaptor.forClass(Container.class);
verify(tsWrapper.mockAppCallback, times(2))
.taskAllocated(argumentCaptor.capture(), any(Object.class), cArgCaptor.capture());
assertEquals(2, argumentCaptor.getAllValues().size());
assertEquals(task1, argumentCaptor.getAllValues().get(0));
assertEquals(task2, argumentCaptor.getAllValues().get(1));
assertEquals(2, cArgCaptor.getAllValues().size());
ContainerId t1CId = cArgCaptor.getAllValues().get(0).getId();
reset(tsWrapper.mockAppCallback);
// At this point. 2 tasks running - both at priority 2.
// Try running a priority 1 task
tsWrapper.allocateTask(task3, hostsH1, priority1, clientCookie3);
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumPreemptedTasks() == 1) {
break;
}
}
ArgumentCaptor<ContainerId> cIdArgCaptor = ArgumentCaptor.forClass(ContainerId.class);
verify(tsWrapper.mockAppCallback).preemptContainer(cIdArgCaptor.capture());
// Determin which task has been preempted. Normally task2 would be preempted based on it starting
// later. However - both may have the same start time, so either could be picked.
Object deallocatedTask1; // De-allocated now
Object deallocatedTask2; // Will be de-allocated later.
if (cIdArgCaptor.getValue().equals(t1CId)) {
deallocatedTask1 = task1;
deallocatedTask2 = task2;
} else {
deallocatedTask1 = task2;
deallocatedTask2 = task1;
}
tsWrapper.deallocateTask(deallocatedTask1, false, TaskAttemptEndReason.INTERNAL_PREEMPTION);
tsWrapper.awaitLocalTaskAllocations(3);
verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task3),
eq(clientCookie3), any(Container.class));
reset(tsWrapper.mockAppCallback);
// At this point. one p=2 task and task3(p=1) running. Ask for another p1 task.
tsWrapper.allocateTask(task4, hostsH1, priority1, clientCookie4);
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumPreemptedTasks() == 2) {
break;
}
}
verify(tsWrapper.mockAppCallback).preemptContainer(any(ContainerId.class));
tsWrapper.deallocateTask(deallocatedTask2, false, TaskAttemptEndReason.INTERNAL_PREEMPTION);
tsWrapper.awaitLocalTaskAllocations(4);
verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4),
eq(clientCookie4), any(Container.class));
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testForcedLocalityMultiplePreemptionsSameHost2() throws IOException,
InterruptedException {
Priority priority1 = Priority.newInstance(1);
Priority priority2 = Priority.newInstance(2);
String [] hosts = new String[] {HOST1, HOST2};
String [] hostsH1 = new String[] {HOST1};
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l);
// Fill up host1 with p2 tasks.
// Leave host2 empty
// Try running both p1 tasks on host1.
// R: Single preemption triggered, followed by allocation, followed by another preemption.
//
try {
TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie1 = "cookie1";
TezTaskAttemptID task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie2 = "cookie2";
TezTaskAttemptID task3 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie3 = "cookie3";
TezTaskAttemptID task4 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId();
Object clientCookie4 = "cookie4";
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, hostsH1, priority2, clientCookie1);
tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2);
tsWrapper.awaitLocalTaskAllocations(2);
verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
ArgumentCaptor<Container> cArgCaptor = ArgumentCaptor.forClass(Container.class);
verify(tsWrapper.mockAppCallback, times(2))
.taskAllocated(argumentCaptor.capture(), any(Object.class), cArgCaptor.capture());
assertEquals(2, argumentCaptor.getAllValues().size());
assertEquals(task1, argumentCaptor.getAllValues().get(0));
assertEquals(task2, argumentCaptor.getAllValues().get(1));
assertEquals(2, cArgCaptor.getAllValues().size());
ContainerId t1CId = cArgCaptor.getAllValues().get(0).getId();
reset(tsWrapper.mockAppCallback);
// At this point. 2 tasks running - both at priority 2.
// Try running a priority 1 task
tsWrapper.allocateTask(task3, hostsH1, priority1, clientCookie3);
tsWrapper.allocateTask(task4, hostsH1, priority1, clientCookie4);
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun();
if (tsWrapper.ts.dagStats.getNumPreemptedTasks() == 1) {
break;
}
}
ArgumentCaptor<ContainerId> cIdArgCaptor = ArgumentCaptor.forClass(ContainerId.class);
verify(tsWrapper.mockAppCallback).preemptContainer(cIdArgCaptor.capture());
// Determin which task has been preempted. Normally task2 would be preempted based on it starting
// later. However - both may have the same start time, so either could be picked.
Object deallocatedTask1; // De-allocated now
Object deallocatedTask2; // Will be de-allocated later.
if (cIdArgCaptor.getValue().equals(t1CId)) {
deallocatedTask1 = task1;
deallocatedTask2 = task2;
} else {
deallocatedTask1 = task2;
deallocatedTask2 = task1;
}
tsWrapper.deallocateTask(deallocatedTask1, false, TaskAttemptEndReason.INTERNAL_PREEMPTION);
tsWrapper.awaitLocalTaskAllocations(3);
verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task3),
eq(clientCookie3), any(Container.class));
// At this point. one p=2 task and task3(p=1) running. Ask for another p1 task.
while (true) {
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun(1000l);
if (tsWrapper.ts.dagStats.getNumPreemptedTasks() == 2) {
break;
}
}
verify(tsWrapper.mockAppCallback, times(2)).preemptContainer(any(ContainerId.class));
tsWrapper.deallocateTask(deallocatedTask2, false, TaskAttemptEndReason.INTERNAL_PREEMPTION);
tsWrapper.awaitLocalTaskAllocations(4);
verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4),
eq(clientCookie4), any(Container.class));
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testForcedLocalityNotInDelayedQueue() throws IOException, InterruptedException {
String[] hosts = new String[]{HOST1, HOST2};
String[] hostsH1 = new String[]{HOST1};
TestTaskSchedulerServiceWrapper tsWrapper =
new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l);
testNotInQueue(tsWrapper, hostsH1);
}
@Test(timeout = 10000)
public void testNoLocalityNotInDelayedQueue() throws IOException, InterruptedException {
String[] hosts = new String[]{HOST1};
String[] hostsH1 = new String[]{HOST1};
TestTaskSchedulerServiceWrapper tsWrapper =
new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 0l);
testNotInQueue(tsWrapper, hostsH1);
}
private void testNotInQueue(TestTaskSchedulerServiceWrapper tsWrapper, String[] hosts) throws
InterruptedException {
Priority priority1 = Priority.newInstance(1);
try {
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(hosts, priority1);
tsWrapper.allocateTask(hosts, priority1);
tsWrapper.allocateTask(hosts, priority1); // 1 more than capacity.
tsWrapper.awaitLocalTaskAllocations(2);
assertEquals(0, tsWrapper.ts.delayedTaskQueue.size());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testDelayedLocalityFallbackToNonLocal() throws IOException, InterruptedException {
Priority priority1 = Priority.newInstance(1);
String [] hosts = new String[] {HOST1, HOST2};
String [] hostsH1 = new String[] {HOST1};
TestTaskSchedulerServiceWrapper tsWrapper =
new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 10000l, true);
LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
delayedTaskSchedulerCallableControlled =
(LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
ControlledClock clock = tsWrapper.getClock();
clock.setTime(clock.getTime());
// Fill up host1 with tasks. Leave host2 empty.
try {
tsWrapper.controlScheduler(true);
TezTaskAttemptID task1 = tsWrapper.allocateTask(hostsH1, priority1);
TezTaskAttemptID task2 = tsWrapper.allocateTask(hostsH1, priority1);
TezTaskAttemptID task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
tsWrapper.awaitLocalTaskAllocations(2);
verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
verify(tsWrapper.mockAppCallback, times(2))
.taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
assertEquals(2, argumentCaptor.getAllValues().size());
assertEquals(task1, argumentCaptor.getAllValues().get(0));
assertEquals(task2, argumentCaptor.getAllValues().get(1));
reset(tsWrapper.mockAppCallback);
// No capacity left on node1. The next task should be allocated to node2 after it times out.
clock.setTime(clock.getTime() + 10000l); // Past the timeout.
assertEquals(
LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_NOT_RUN,
delayedTaskSchedulerCallableControlled.lastState);
delayedTaskSchedulerCallableControlled.triggerGetNextTask();
delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
// Verify that an attempt was made to schedule the task, but the decision was to skip scheduling
assertEquals(
LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_RETURNED_TASK,
delayedTaskSchedulerCallableControlled.lastState);
assertTrue(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered &&
delayedTaskSchedulerCallableControlled.lastShouldScheduleTaskResult);
tsWrapper.awaitChangeInTotalAllocations(2);
verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
argumentCaptor = ArgumentCaptor.forClass(Object.class);
ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
verify(tsWrapper.mockAppCallback, times(1))
.taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
assertEquals(1, argumentCaptor.getAllValues().size());
assertEquals(task3, argumentCaptor.getAllValues().get(0));
Container assignedContainer = containerCaptor.getValue();
assertEquals(HOST2, assignedContainer.getNodeId().getHost());
assertEquals(2, tsWrapper.ts.dagStats.getNumLocalAllocations());
assertEquals(1, tsWrapper.ts.dagStats.getNumNonLocalAllocations());
assertEquals(1, tsWrapper.ts.dagStats.getNumDelayedAllocations());
assertEquals(2, tsWrapper.ts.dagStats.getNumAllocationsPerHost().get(HOST1).get());
assertEquals(1, tsWrapper.ts.dagStats.getNumAllocationsPerHost().get(HOST2).get());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testDelayedLocalityDelayedAllocation() throws InterruptedException, IOException {
Priority priority1 = Priority.newInstance(1);
String [] hosts = new String[] {HOST1, HOST2};
String [] hostsH1 = new String[] {HOST1};
TestTaskSchedulerServiceWrapper tsWrapper =
new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 10000l, true);
LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
delayedTaskSchedulerCallableControlled =
(LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
ControlledClock clock = tsWrapper.getClock();
clock.setTime(clock.getTime());
// Fill up host1 with tasks. Leave host2 empty.
try {
tsWrapper.controlScheduler(true);
TezTaskAttemptID task1 = tsWrapper.allocateTask(hostsH1, priority1);
TezTaskAttemptID task2 = tsWrapper.allocateTask(hostsH1, priority1);
TezTaskAttemptID task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
tsWrapper.awaitLocalTaskAllocations(2);
verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
verify(tsWrapper.mockAppCallback, times(2))
.taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
assertEquals(2, argumentCaptor.getAllValues().size());
assertEquals(task1, argumentCaptor.getAllValues().get(0));
assertEquals(task2, argumentCaptor.getAllValues().get(1));
reset(tsWrapper.mockAppCallback);
// Move the clock forward 2000ms, and check the delayed queue
clock.setTime(clock.getTime() + 2000l); // Past the timeout.
assertEquals(
LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_NOT_RUN,
delayedTaskSchedulerCallableControlled.lastState);
delayedTaskSchedulerCallableControlled.triggerGetNextTask();
delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
// Verify that an attempt was made to schedule the task, but the decision was to skip scheduling
assertEquals(
LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_TIMEOUT_NOT_EXPIRED,
delayedTaskSchedulerCallableControlled.lastState);
assertFalse(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered);
tsWrapper.deallocateTask(task1, true, null);
// Node1 now has free capacity. task1 should be allocated to it.
tsWrapper.awaitChangeInTotalAllocations(2);
verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
argumentCaptor = ArgumentCaptor.forClass(Object.class);
ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
verify(tsWrapper.mockAppCallback, times(1))
.taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
assertEquals(1, argumentCaptor.getAllValues().size());
assertEquals(task3, argumentCaptor.getAllValues().get(0));
Container assignedContainer = containerCaptor.getValue();
assertEquals(HOST1, assignedContainer.getNodeId().getHost());
assertEquals(3, tsWrapper.ts.dagStats.getNumLocalAllocations());
assertEquals(0, tsWrapper.ts.dagStats.getNumNonLocalAllocations());
assertEquals(1, tsWrapper.ts.dagStats.getNumDelayedAllocations());
assertEquals(3, tsWrapper.ts.dagStats.getNumAllocationsPerHost().get(HOST1).get());
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testDelayedQueeTaskSelectionAfterScheduled() throws IOException,
InterruptedException {
Priority priority1 = Priority.newInstance(1);
String [] hosts = new String[] {HOST1, HOST2};
String [] hostsH1 = new String[] {HOST1};
TestTaskSchedulerServiceWrapper tsWrapper =
new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 10000l, true);
LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
delayedTaskSchedulerCallableControlled =
(LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
ControlledClock clock = tsWrapper.getClock();
clock.setTime(clock.getTime());
// Fill up host1 with tasks. Leave host2 empty.
try {
tsWrapper.controlScheduler(true);
TezTaskAttemptID task1 = tsWrapper.allocateTask(hostsH1, priority1);
TezTaskAttemptID task2 = tsWrapper.allocateTask(hostsH1, priority1);
TezTaskAttemptID task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
tsWrapper.awaitLocalTaskAllocations(2);
verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
verify(tsWrapper.mockAppCallback, times(2))
.taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
assertEquals(2, argumentCaptor.getAllValues().size());
assertEquals(task1, argumentCaptor.getAllValues().get(0));
assertEquals(task2, argumentCaptor.getAllValues().get(1));
// Simulate a 2s delay before finishing the task.
clock.setTime(clock.getTime() + 2000);
assertEquals(
LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_NOT_RUN,
delayedTaskSchedulerCallableControlled.lastState);
delayedTaskSchedulerCallableControlled.triggerGetNextTask();
delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
assertEquals(
LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_TIMEOUT_NOT_EXPIRED,
delayedTaskSchedulerCallableControlled.lastState);
assertFalse(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered);
reset(tsWrapper.mockAppCallback);
// Now finish task1, which will make capacity for task3 to run. Nothing is coming out of the delayed queue yet.
tsWrapper.deallocateTask(task1, true, null);
tsWrapper.awaitLocalTaskAllocations(3);
verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
argumentCaptor = ArgumentCaptor.forClass(Object.class);
ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
verify(tsWrapper.mockAppCallback, times(1))
.taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
assertEquals(1, argumentCaptor.getAllValues().size());
assertEquals(task3, argumentCaptor.getAllValues().get(0));
Container assignedContainer = containerCaptor.getValue();
assertEquals(HOST1, assignedContainer.getNodeId().getHost());
reset(tsWrapper.mockAppCallback);
// Move the clock forward and trigger a run.
clock.setTime(clock.getTime() + 8000); // Set to start + 10000 which is the timeout
delayedTaskSchedulerCallableControlled.triggerGetNextTask();
delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
assertEquals(
LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_RETURNED_TASK,
delayedTaskSchedulerCallableControlled.lastState);
// Verify that an attempt was made to schedule the task, but the decision was to skip scheduling
assertTrue(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered &&
!delayedTaskSchedulerCallableControlled.lastShouldScheduleTaskResult);
// Ensure there's no more invocations.
verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
verify(tsWrapper.mockAppCallback, never()).taskAllocated(any(Object.class), any(Object.class), any(Container.class));
} finally {
tsWrapper.shutdown();
}
}
@Test(timeout = 10000)
public void testTaskInfoDelay() {
LlapTaskSchedulerService.LocalityDelayConf localityDelayConf1 =
new LlapTaskSchedulerService.LocalityDelayConf(3000);
ControlledClock clock = new ControlledClock(new MonotonicClock());
clock.setTime(clock.getTime());
// With a timeout of 3000.
LlapTaskSchedulerService.TaskInfo taskInfo =
new LlapTaskSchedulerService.TaskInfo(localityDelayConf1, clock, new Object(), new Object(),
mock(Priority.class), mock(Resource.class), null, null, clock.getTime(), null);
assertFalse(taskInfo.shouldForceLocality());
assertEquals(3000, taskInfo.getDelay(TimeUnit.MILLISECONDS));
assertTrue(taskInfo.shouldDelayForLocality(clock.getTime()));
clock.setTime(clock.getTime() + 500);
assertEquals(2500, taskInfo.getDelay(TimeUnit.MILLISECONDS));
assertTrue(taskInfo.shouldDelayForLocality(clock.getTime()));
clock.setTime(clock.getTime() + 2500);
assertEquals(0, taskInfo.getDelay(TimeUnit.MILLISECONDS));
assertFalse(taskInfo.shouldDelayForLocality(clock.getTime()));
// No locality delay
LlapTaskSchedulerService.LocalityDelayConf localityDelayConf2 =
new LlapTaskSchedulerService.LocalityDelayConf(0);
taskInfo =
new LlapTaskSchedulerService.TaskInfo(localityDelayConf2, clock, new Object(), new Object(),
mock(Priority.class), mock(Resource.class), null, null, clock.getTime(), null);
assertFalse(taskInfo.shouldDelayForLocality(clock.getTime()));
assertFalse(taskInfo.shouldForceLocality());
assertTrue(taskInfo.getDelay(TimeUnit.MILLISECONDS) < 0);
// Force locality
LlapTaskSchedulerService.LocalityDelayConf localityDelayConf3 =
new LlapTaskSchedulerService.LocalityDelayConf(-1);
taskInfo =
new LlapTaskSchedulerService.TaskInfo(localityDelayConf3, clock, new Object(), new Object(),
mock(Priority.class), mock(Resource.class), null, null, clock.getTime(), null);
assertTrue(taskInfo.shouldDelayForLocality(clock.getTime()));
assertTrue(taskInfo.shouldForceLocality());
assertFalse(taskInfo.getDelay(TimeUnit.MILLISECONDS) < 0);
}
@Test(timeout = 10000)
public void testLocalityDelayTaskOrdering() throws InterruptedException, IOException {
LlapTaskSchedulerService.LocalityDelayConf localityDelayConf =
new LlapTaskSchedulerService.LocalityDelayConf(3000);
ControlledClock clock = new ControlledClock(new MonotonicClock());
clock.setTime(clock.getTime());
DelayQueue<LlapTaskSchedulerService.TaskInfo> delayedQueue = new DelayQueue<>();
LlapTaskSchedulerService.TaskInfo taskInfo1 =
new LlapTaskSchedulerService.TaskInfo(localityDelayConf, clock, new Object(), new Object(),
mock(Priority.class), mock(Resource.class), null, null, clock.getTime(), null);
clock.setTime(clock.getTime() + 1000);
LlapTaskSchedulerService.TaskInfo taskInfo2 =
new LlapTaskSchedulerService.TaskInfo(localityDelayConf, clock, new Object(), new Object(),
mock(Priority.class), mock(Resource.class), null, null, clock.getTime(), null);
delayedQueue.add(taskInfo1);
delayedQueue.add(taskInfo2);
assertEquals(taskInfo1, delayedQueue.peek());
}
@Test (timeout = 15000)
public void testDelayedLocalityNodeCommErrorImmediateAllocation() throws IOException, InterruptedException {
Priority priority1 = Priority.newInstance(1);
String [] hosts = new String[] {HOST1, HOST2};
String [] hostsH1 = new String[] {HOST1};
// Node disable timeout higher than locality delay.
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(20000, hosts, 1, 1, 9000l);
// Fill up host1 with tasks. Leave host2 empty.
try {
long startTime = tsWrapper.getClock().getTime();
tsWrapper.controlScheduler(true);
TezTaskAttemptID task1 = tsWrapper.allocateTask(hostsH1, priority1);
TezTaskAttemptID task2 = tsWrapper.allocateTask(hostsH1, priority1);
TezTaskAttemptID task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
tsWrapper.awaitLocalTaskAllocations(2);
verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
verify(tsWrapper.mockAppCallback, times(2))
.taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
assertEquals(2, argumentCaptor.getAllValues().size());
assertEquals(task1, argumentCaptor.getAllValues().get(0));
assertEquals(task2, argumentCaptor.getAllValues().get(1));
reset(tsWrapper.mockAppCallback);
// Flush any pending scheduler runs which may be blocked. Wait 2 seconds for the run to complete.
tsWrapper.signalSchedulerRun();
tsWrapper.awaitSchedulerRun(2000l);
// Mark a task as failed due to a comm failure.
tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.COMMUNICATION_ERROR);
// Node1 marked as failed, node2 has capacity.
// Timeout for nodes is larger than delay - immediate allocation
tsWrapper.awaitChangeInTotalAllocations(2);
long thirdAllocateTime = tsWrapper.getClock().getTime();
long diff = thirdAllocateTime - startTime;
// diffAfterSleep < total sleepTime
assertTrue("Task not allocated in expected time window: duration=" + diff, diff < 9000l);
verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
argumentCaptor = ArgumentCaptor.forClass(Object.class);
ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
verify(tsWrapper.mockAppCallback, times(1))
.taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
assertEquals(1, argumentCaptor.getAllValues().size());
assertEquals(task3, argumentCaptor.getAllValues().get(0));
Container assignedContainer = containerCaptor.getValue();
assertEquals(HOST2, assignedContainer.getNodeId().getHost());
assertEquals(2, tsWrapper.ts.dagStats.getNumLocalAllocations());
assertEquals(1, tsWrapper.ts.dagStats.getNumNonLocalAllocations());
assertEquals(1, tsWrapper.ts.dagStats.getNumDelayedAllocations());
assertEquals(2, tsWrapper.ts.dagStats.getNumAllocationsPerHost().get(HOST1).get());
assertEquals(1, tsWrapper.ts.dagStats.getNumAllocationsPerHost().get(HOST2).get());
} finally {
tsWrapper.shutdown();
}
}
@Test (timeout = 15000)
public void testDelayedLocalityNodeCommErrorDelayedAllocation() throws IOException, InterruptedException {
Priority priority1 = Priority.newInstance(1);
String [] hosts = new String[] {HOST1, HOST2};
String [] hostsH1 = new String[] {HOST1};
TestTaskSchedulerServiceWrapper tsWrapper =
new TestTaskSchedulerServiceWrapper(5000, hosts, 1, 1, 10000l, true);
LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
delayedTaskSchedulerCallableControlled =
(LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
ControlledClock clock = tsWrapper.getClock();
clock.setTime(clock.getTime());
// Fill up host1 with tasks. Leave host2 empty.
try {
tsWrapper.controlScheduler(true);
TezTaskAttemptID task1 = tsWrapper.allocateTask(hostsH1, priority1);
TezTaskAttemptID task2 = tsWrapper.allocateTask(hostsH1, priority1);
TezTaskAttemptID task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
tsWrapper.awaitLocalTaskAllocations(2);
verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
verify(tsWrapper.mockAppCallback, times(2))
.taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
assertEquals(2, argumentCaptor.getAllValues().size());
assertEquals(task1, argumentCaptor.getAllValues().get(0));
assertEquals(task2, argumentCaptor.getAllValues().get(1));
reset(tsWrapper.mockAppCallback);
// Mark a task as failed due to a comm failure.
tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.COMMUNICATION_ERROR);
// Node1 has free capacity but is disabled. Node 2 has capcaity. Delay > re-enable tiemout
tsWrapper.ensureNoChangeInTotalAllocations(2, 2000l);
} finally {
tsWrapper.shutdown();
}
}
@Test
public void testInitialGuaranteedInfoIsEncodedInContainerId() {
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1);
ContainerFactory containerFactory = new ContainerFactory(appAttemptId, 15);
Container c1 = createDummyContainer(containerFactory, false);
Container c2 = createDummyContainer(containerFactory, true);
Container c3 = createDummyContainer(containerFactory, true);
Container c4 = createDummyContainer(containerFactory, false);
Container c5 = createDummyContainer(containerFactory, true);
Container c6 = createDummyContainer(containerFactory, false);
Container c7 = createDummyContainer(containerFactory, false);
Container c8 = createDummyContainer(containerFactory, true);
assertFalse(ContainerFactory.isContainerInitializedAsGuaranteed(c1.getId()));
assertTrue(ContainerFactory.isContainerInitializedAsGuaranteed(c2.getId()));
assertTrue(ContainerFactory.isContainerInitializedAsGuaranteed(c3.getId()));
assertFalse(ContainerFactory.isContainerInitializedAsGuaranteed(c4.getId()));
assertTrue(ContainerFactory.isContainerInitializedAsGuaranteed(c5.getId()));
assertFalse(ContainerFactory.isContainerInitializedAsGuaranteed(c6.getId()));
assertFalse(ContainerFactory.isContainerInitializedAsGuaranteed(c7.getId()));
assertTrue(ContainerFactory.isContainerInitializedAsGuaranteed(c8.getId()));
}
private Container createDummyContainer(ContainerFactory containerFactory, boolean isGuaranteed) {
return containerFactory.createContainer(mock(Resource.class), mock(Priority.class), "hostname", 0, null, isGuaranteed);
}
private static class TestTaskSchedulerServiceWrapper {
static final Resource resource = Resource.newInstance(1024, 1);
Configuration conf;
TaskSchedulerContext mockAppCallback = mock(TaskSchedulerContext.class);
LlapServiceInstanceSet mockServiceInstanceSet = mock(LlapServiceInstanceSet.class);
ControlledClock clock = new ControlledClock(new MonotonicClock());
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1);
LlapTaskSchedulerServiceForTest ts;
TestTaskSchedulerServiceWrapper() throws IOException, InterruptedException {
this(2000l);
}
TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) throws IOException,
InterruptedException {
this(disableTimeoutMillis, new String[]{HOST1, HOST2, HOST3}, 4,
ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.defaultIntVal);
}
TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize) throws
IOException, InterruptedException {
this(disableTimeoutMillis, hosts, numExecutors, waitQueueSize, 0l);
}
TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors,
int waitQueueSize, long localityDelayMs) throws
IOException, InterruptedException {
this(nodeDisableTimeoutMillis, hosts, numExecutors, waitQueueSize, localityDelayMs, false);
}
TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors,
int waitQueueSize, long localityDelayMs, boolean controlledDelayedTaskQueue) throws
IOException, InterruptedException {
this(nodeDisableTimeoutMillis, hosts, numExecutors, waitQueueSize, localityDelayMs, controlledDelayedTaskQueue,
hosts, false);
}
TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors,
int waitQueueSize, long localityDelayMs, boolean controlledDelayedTaskQueue, String[] liveHosts,
boolean useMockRegistry) throws
IOException, InterruptedException {
conf = new Configuration();
conf.setStrings(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, hosts);
conf.setInt(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, numExecutors);
conf.setInt(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname, waitQueueSize);
conf.set(ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS.varname,
nodeDisableTimeoutMillis + "ms");
conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, false);
conf.setLong(ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY.varname, localityDelayMs);
conf.set(ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME.varname, "");
doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId();
doReturn(11111l).when(mockAppCallback).getCustomClusterIdentifier();
UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
doReturn(userPayload).when(mockAppCallback).getInitialUserPayload();
if (useMockRegistry) {
List<LlapServiceInstance> liveInstances = new ArrayList<>();
for (String host : liveHosts) {
if (host == null) {
LlapServiceInstance mockInactive = mock(InactiveServiceInstance.class);
doReturn(host).when(mockInactive).getHost();
doReturn(Resource.newInstance(100, 1)).when(mockInactive).getResource();
doReturn("inactive-host-" + host).when(mockInactive).getWorkerIdentity();
doReturn(ImmutableSet.builder().add(mockInactive).build()).when(mockServiceInstanceSet).getByHost(host);
liveInstances.add(mockInactive);
} else {
LlapServiceInstance mockActive = mock(LlapServiceInstance.class);
doReturn(host).when(mockActive).getHost();
doReturn(Resource.newInstance(100, 1)).when(mockActive).getResource();
doReturn("host-" + host).when(mockActive).getWorkerIdentity();
doReturn(ImmutableSet.builder().add(mockActive).build()).when(mockServiceInstanceSet).getByHost(host);
liveInstances.add(mockActive);
}
}
doReturn(liveInstances).when(mockServiceInstanceSet).getAllInstancesOrdered(true);
List<LlapServiceInstance> allInstances = new ArrayList<>();
for (String host : hosts) {
LlapServiceInstance mockActive = mock(LlapServiceInstance.class);
doReturn(host).when(mockActive).getHost();
doReturn(Resource.newInstance(100, 1)).when(mockActive).getResource();
doReturn("host-" + host).when(mockActive).getWorkerIdentity();
allInstances.add(mockActive);
}
doReturn(allInstances).when(mockServiceInstanceSet).getAll();
}
if (controlledDelayedTaskQueue) {
ts = new LlapTaskSchedulerServiceForTestControlled(mockAppCallback, clock);
} else {
ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock);
}
controlScheduler(true);
ts.initialize();
ts.start();
if (useMockRegistry) {
ts.setServiceInstanceSet(mockServiceInstanceSet);
}
// One scheduler pass from the nodes that are added at startup
signalSchedulerRun();
controlScheduler(false);
awaitSchedulerRun();
}
ControlledClock getClock() {
return clock;
}
void controlScheduler(boolean val) {
ts.forTestsetControlScheduling(val);
}
void signalSchedulerRun() throws InterruptedException {
ts.forTestSignalSchedulingRun();
}
void awaitSchedulerRun() throws InterruptedException {
ts.forTestAwaitSchedulingRun(-1);
}
/**
*
* @param timeoutMs
* @return false if the time elapsed
* @throws InterruptedException
*/
boolean awaitSchedulerRun(long timeoutMs) throws InterruptedException {
return ts.forTestAwaitSchedulingRun(timeoutMs);
}
void resetAppCallback() {
reset(mockAppCallback);
}
void shutdown() {
ts.shutdown();
}
void allocateTask(TezTaskAttemptID task, String[] hosts, Priority priority, Object clientCookie) {
ts.allocateTask(task, resource, hosts, null, priority, null, clientCookie);
}
private static final AtomicInteger TASK_COUNTER = new AtomicInteger(0);
private static final TezVertexID VERTEX_ID = TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(1, 1), 0), 0);
public static TezTaskAttemptID generateTaskAttemptId() {
int taskId = TASK_COUNTER.getAndIncrement();
return TezTaskAttemptID.getInstance(TezTaskID.getInstance(VERTEX_ID, taskId), 0);
}
void deallocateTask(Object task, boolean succeeded, TaskAttemptEndReason endReason) {
ts.deallocateTask(task, succeeded, endReason, null);
}
void rejectExecution(Object task) {
ts.deallocateTask(task, false, TaskAttemptEndReason.EXECUTOR_BUSY, null);
}
// More complex methods which may wrap multiple operations
TezTaskAttemptID allocateTask(String[] hosts, Priority priority) {
TezTaskAttemptID task = generateTaskAttemptId();
Object clientCookie = new Object();
allocateTask(task, hosts, priority, clientCookie);
return task;
}
public void awaitTotalTaskAllocations(int numTasks) throws InterruptedException {
while (true) {
signalSchedulerRun();
awaitSchedulerRun();
if (ts.dagStats.getNumTotalAllocations() == numTasks) {
break;
}
}
}
public void awaitLocalTaskAllocations(int numTasks) throws InterruptedException {
while (true) {
signalSchedulerRun();
awaitSchedulerRun();
if (ts.dagStats.getNumLocalAllocations() == numTasks) {
break;
}
}
}
public void awaitChangeInTotalAllocations(int previousAllocations) throws InterruptedException {
while (true) {
signalSchedulerRun();
awaitSchedulerRun();
if (ts.dagStats.getNumTotalAllocations() > previousAllocations) {
break;
}
Thread.sleep(200l);
}
}
public void ensureNoChangeInTotalAllocations(int previousAllocations, long timeout) throws
InterruptedException {
long startTime = Time.monotonicNow();
long timeLeft = timeout;
while (timeLeft > 0) {
signalSchedulerRun();
awaitSchedulerRun(Math.min(200, timeLeft));
if (ts.dagStats.getNumTotalAllocations() != previousAllocations) {
throw new IllegalStateException("NumTotalAllocations expected to stay at " + previousAllocations + ". Actual=" + ts.dagStats.getNumTotalAllocations());
}
timeLeft = (startTime + timeout) - Time.monotonicNow();
}
}
}
private static class LlapTaskSchedulerServiceForTest extends LlapTaskSchedulerService {
private AtomicBoolean controlScheduling = new AtomicBoolean(false);
private final Lock testLock = new ReentrantLock();
private final Condition schedulingCompleteCondition = testLock.newCondition();
private boolean schedulingComplete = false;
private final Condition triggerSchedulingCondition = testLock.newCondition();
private boolean schedulingTriggered = false;
private final AtomicInteger numSchedulerRuns = new AtomicInteger(0);
private final Object messageLock = new Object();
private int sentCount = 0, checkAndSendCount = 0;
private final Object checkDelay = new Object();
private boolean doDelayCheckAndSend = false;
public LlapTaskSchedulerServiceForTest(
TaskSchedulerContext appClient, Clock clock) {
super(appClient, clock, false);
}
@Override
protected void registerRunningTask(TaskInfo taskInfo) {
super.registerRunningTask(taskInfo);
notifyStarted(taskInfo.getAttemptId()); // Do this here; normally communicator does this.
}
@Override
protected void checkAndSendGuaranteedStateUpdate(TaskInfo ti) {
// A test-specific delay just before the check happens.
synchronized (checkDelay) {
boolean isFirst = true;
while (doDelayCheckAndSend) {
if (isFirst) {
synchronized (messageLock) {
++checkAndSendCount;
messageLock.notifyAll();
}
isFirst = false;
}
try {
checkDelay.wait(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
super.checkAndSendGuaranteedStateUpdate(ti);
}
public void setDelayCheckAndSend(boolean value) {
synchronized (checkDelay) {
doDelayCheckAndSend = value;
if (!value) {
checkDelay.notifyAll();
}
}
}
@Override
public void handleUpdateResult(TaskInfo ti, boolean isOk) {
super.handleUpdateResult(ti, isOk);
}
public void clearTestCounts() {
synchronized (messageLock) {
sentCount = checkAndSendCount = 0;
}
}
public void waitForMessagesSent(int count) throws InterruptedException {
while (true) {
synchronized (messageLock) {
assert sentCount <= count;
if (sentCount == count) {
sentCount = 0;
return;
}
messageLock.wait(200);
}
}
}
/** Note: this only works for testing the lack of invocations from the main thread. */
public void assertNoMessagesSent() {
synchronized (messageLock) {
assert sentCount == 0;
}
}
public void waitForCheckAndSendCall(int count) throws InterruptedException {
while (true) {
synchronized (messageLock) {
assert checkAndSendCount <= count;
if (checkAndSendCount == count) {
checkAndSendCount = 0;
return;
}
messageLock.wait(200);
}
}
}
@Override
protected void sendUpdateMessageAsync(TaskInfo ti, boolean newState) {
synchronized (messageLock) {
++sentCount;
messageLock.notifyAll();
}
}
@Override
protected TezTaskAttemptID getTaskAttemptId(Object task) {
if (task instanceof TezTaskAttemptID) {
return (TezTaskAttemptID)task;
}
return null;
}
@Override
protected void schedulePendingTasks() throws InterruptedException {
LOG.info("Attempted schedulPendingTasks");
testLock.lock();
try {
if (controlScheduling.get()) {
while (!schedulingTriggered) {
try {
triggerSchedulingCondition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
numSchedulerRuns.incrementAndGet();
super.schedulePendingTasks();
schedulingTriggered = false;
schedulingComplete = true;
schedulingCompleteCondition.signal();
} finally {
testLock.unlock();
}
}
// Enable or disable test scheduling control.
void forTestsetControlScheduling(boolean control) {
this.controlScheduling.set(control);
}
void forTestSignalSchedulingRun() throws InterruptedException {
testLock.lock();
try {
schedulingTriggered = true;
triggerSchedulingCondition.signal();
} finally {
testLock.unlock();
}
}
boolean forTestAwaitSchedulingRun(long timeout) throws InterruptedException {
testLock.lock();
try {
boolean success = true;
while (!schedulingComplete) {
if (timeout == -1) {
schedulingCompleteCondition.await();
} else {
success = schedulingCompleteCondition.await(timeout, TimeUnit.MILLISECONDS);
break;
}
}
schedulingComplete = false;
return success;
} finally {
testLock.unlock();
}
}
}
private static class LlapTaskSchedulerServiceForTestControlled extends LlapTaskSchedulerServiceForTest {
private DelayedTaskSchedulerCallableControlled controlledTSCallable;
public LlapTaskSchedulerServiceForTestControlled(
TaskSchedulerContext appClient, Clock clock) {
super(appClient, clock);
}
@Override
LlapTaskSchedulerService.DelayedTaskSchedulerCallable createDelayedTaskSchedulerCallable() {
controlledTSCallable = new DelayedTaskSchedulerCallableControlled();
return controlledTSCallable;
}
class DelayedTaskSchedulerCallableControlled extends DelayedTaskSchedulerCallable {
private final ReentrantLock lock = new ReentrantLock();
private final Condition triggerRunCondition = lock.newCondition();
private boolean shouldRun = false;
private final Condition runCompleteCondition = lock.newCondition();
private boolean runComplete = false;
static final int STATE_NOT_RUN = 0;
static final int STATE_NULL_FOUND = 1;
static final int STATE_TIMEOUT_NOT_EXPIRED = 2;
static final int STATE_RETURNED_TASK = 3;
volatile int lastState = STATE_NOT_RUN;
volatile boolean lastShouldScheduleTaskResult = false;
volatile boolean shouldScheduleTaskTriggered = false;
@Override
public void processEvictedTask(TaskInfo taskInfo) {
super.processEvictedTask(taskInfo);
signalRunComplete();
}
@Override
public TaskInfo getNextTask() throws InterruptedException {
while (true) {
lock.lock();
try {
while (!shouldRun) {
triggerRunCondition.await();
}
// Preven subsequent runs until a new trigger is set.
shouldRun = false;
} finally {
lock.unlock();
}
TaskInfo taskInfo = delayedTaskQueue.peek();
if (taskInfo == null) {
LOG.info("Triggered getTask but the queue is empty");
lastState = STATE_NULL_FOUND;
signalRunComplete();
continue;
}
if (taskInfo.shouldDelayForLocality(
LlapTaskSchedulerServiceForTestControlled.this.clock.getTime())) {
LOG.info("Triggered getTask but the first element is not ready to execute");
lastState = STATE_TIMEOUT_NOT_EXPIRED;
signalRunComplete();
continue;
} else {
delayedTaskQueue.poll(); // Remove the previously peeked element.
lastState = STATE_RETURNED_TASK;
return taskInfo;
}
}
}
@Override
public boolean shouldScheduleTask(TaskInfo taskInfo) {
shouldScheduleTaskTriggered = true;
lastShouldScheduleTaskResult = super.shouldScheduleTask(taskInfo);
return lastShouldScheduleTaskResult;
}
void resetShouldScheduleInformation() {
shouldScheduleTaskTriggered = false;
lastShouldScheduleTaskResult = false;
}
private void signalRunComplete() {
lock.lock();
try {
runComplete = true;
runCompleteCondition.signal();
} finally {
lock.unlock();
}
}
void triggerGetNextTask() {
lock.lock();
try {
shouldRun = true;
triggerRunCondition.signal();
} finally {
lock.unlock();
}
}
void awaitGetNextTaskProcessing() throws InterruptedException {
lock.lock();
try {
while (!runComplete) {
runCompleteCondition.await();
}
runComplete = false;
} finally {
lock.unlock();
}
}
}
}
}