MAPREDUCE-1845. FairScheduler.tasksToPreempt() can return negative number. Contributed by Scott Chen.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@959806 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index cbf3afe..902e7a4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -82,6 +82,9 @@
BUG FIXES
+ MAPREDUCE-1845. FairScheduler.tasksToPreempt() can return negative number.
+ (Scott Chen via matei)
+
MAPREDUCE-1707. TaskRunner can get NPE in getting ugi from TaskTracker.
(Vinod Kumar Vavilapalli)
diff --git a/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java b/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
index 68bf910..afe8ba8 100644
--- a/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
+++ b/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
@@ -850,11 +850,11 @@
int tasksDueToFairShare = 0;
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
int target = Math.min(sched.getMinShare(), sched.getDemand());
- tasksDueToMinShare = target - sched.getRunningTasks();
+ tasksDueToMinShare = Math.max(0, target - sched.getRunningTasks());
}
if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
int target = (int) Math.min(sched.getFairShare(), sched.getDemand());
- tasksDueToFairShare = target - sched.getRunningTasks();
+ tasksDueToFairShare = Math.max(0, target - sched.getRunningTasks());
}
int tasksToPreempt = Math.max(tasksDueToMinShare, tasksDueToFairShare);
if (tasksToPreempt > 0) {
diff --git a/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java b/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
index 3539ddd..e2be37a 100644
--- a/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
+++ b/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
@@ -2621,4 +2621,81 @@
for (int i = 0; i < tasks.size(); i++)
assertEquals("assignment " + i, expectedTasks[i], tasks.get(i).toString());
}
+
+ /**
+ * This test submits a job that takes all 2 slots in a pool has both a min
+ * share of 2 slots with minshare timeout of 5s, and then a second job in
+ * default pool with a fair share timeout of 5s. After 60 seconds, this pool
+ * will be starved of fair share (2 slots of each type), and we test that it
+ * does not kill more than 2 tasks of each type.
+ */
+ public void testFairSharePreemptionWithShortTimeout() throws Exception {
+ // Enable preemption in scheduler
+ scheduler.preemptionEnabled = true;
+ // Set up pools file
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<fairSharePreemptionTimeout>5</fairSharePreemptionTimeout>");
+ out.println("<pool name=\"pool1\">");
+ out.println("<minMaps>2</minMaps>");
+ out.println("<minReduces>2</minReduces>");
+ out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
+ out.println("</pool>");
+ out.println("</allocations>");
+ out.close();
+ scheduler.getPoolManager().reloadAllocs();
+ Pool pool1 = scheduler.getPoolManager().getPool("pool1");
+ Pool defaultPool = scheduler.getPoolManager().getPool("default");
+
+ // Submit job 1 and assign all slots to it. Sleep a bit before assigning
+ // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
+ JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10, "pool1");
+ JobInfo info1 = scheduler.infos.get(job1);
+ checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+ advanceTime(100);
+ checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
+
+ advanceTime(10000);
+ assertEquals(4, info1.mapSchedulable.getRunningTasks());
+ assertEquals(4, info1.reduceSchedulable.getRunningTasks());
+ assertEquals(4.0, info1.mapSchedulable.getFairShare());
+ assertEquals(4.0, info1.reduceSchedulable.getFairShare());
+ // Ten seconds later, submit job 2.
+ JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "default");
+
+ // Advance time by 6 seconds without update the scheduler.
+ // This simulates the time gap between update and task preemption.
+ clock.advance(6000);
+ assertEquals(4, info1.mapSchedulable.getRunningTasks());
+ assertEquals(4, info1.reduceSchedulable.getRunningTasks());
+ assertEquals(2.0, info1.mapSchedulable.getFairShare());
+ assertEquals(2.0, info1.reduceSchedulable.getFairShare());
+ assertEquals(0, scheduler.tasksToPreempt(pool1.getMapSchedulable(),
+ clock.getTime()));
+ assertEquals(0, scheduler.tasksToPreempt(pool1.getReduceSchedulable(),
+ clock.getTime()));
+ assertEquals(2, scheduler.tasksToPreempt(defaultPool.getMapSchedulable(),
+ clock.getTime()));
+ assertEquals(2, scheduler.tasksToPreempt(defaultPool.getReduceSchedulable(),
+ clock.getTime()));
+
+ // Test that the tasks actually get preempted and we can assign new ones
+ scheduler.preemptTasksIfNecessary();
+ scheduler.update();
+ assertEquals(2, job1.runningMaps());
+ assertEquals(2, job1.runningReduces());
+ checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+ assertNull(scheduler.assignTasks(tracker("tt2")));
+ }
}