blob: 3533c285300f8342322c4fbaaa2a856da50385b8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.junit.Test;
/**
* Tests the state machine with respect to Job/Task/TaskAttempt kill scenarios.
*
*/
public class TestKill {
@Test
public void testKillJob() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
MRApp app = new BlockingMRApp(1, 0, latch);
//this will start the job but job won't complete as task is
//blocked
Job job = app.submit(new Configuration());
//wait and vailidate for Job to become RUNNING
app.waitForState(job, JobState.RUNNING);
//send the kill signal to Job
app.getContext().getEventHandler().handle(
new JobEvent(job.getID(), JobEventType.JOB_KILL));
//unblock Task
latch.countDown();
//wait and validate for Job to be KILLED
app.waitForState(job, JobState.KILLED);
Map<TaskId,Task> tasks = job.getTasks();
Assert.assertEquals("No of tasks is not correct", 1,
tasks.size());
Task task = tasks.values().iterator().next();
Assert.assertEquals("Task state not correct", TaskState.KILLED,
task.getReport().getTaskState());
Map<TaskAttemptId, TaskAttempt> attempts =
tasks.values().iterator().next().getAttempts();
Assert.assertEquals("No of attempts is not correct", 1,
attempts.size());
Iterator<TaskAttempt> it = attempts.values().iterator();
Assert.assertEquals("Attempt state not correct", TaskAttemptState.KILLED,
it.next().getReport().getTaskAttemptState());
}
@Test
public void testKillTask() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
MRApp app = new BlockingMRApp(2, 0, latch);
//this will start the job but job won't complete as Task is blocked
Job job = app.submit(new Configuration());
//wait and vailidate for Job to become RUNNING
app.waitForState(job, JobState.RUNNING);
Map<TaskId,Task> tasks = job.getTasks();
Assert.assertEquals("No of tasks is not correct", 2,
tasks.size());
Iterator<Task> it = tasks.values().iterator();
Task task1 = it.next();
Task task2 = it.next();
//send the kill signal to the first Task
app.getContext().getEventHandler().handle(
new TaskEvent(task1.getID(), TaskEventType.T_KILL));
//unblock Task
latch.countDown();
//wait and validate for Job to become SUCCEEDED
app.waitForState(job, JobState.SUCCEEDED);
//first Task is killed and second is Succeeded
//Job is succeeded
Assert.assertEquals("Task state not correct", TaskState.KILLED,
task1.getReport().getTaskState());
Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED,
task2.getReport().getTaskState());
Map<TaskAttemptId, TaskAttempt> attempts = task1.getAttempts();
Assert.assertEquals("No of attempts is not correct", 1,
attempts.size());
Iterator<TaskAttempt> iter = attempts.values().iterator();
Assert.assertEquals("Attempt state not correct", TaskAttemptState.KILLED,
iter.next().getReport().getTaskAttemptState());
attempts = task2.getAttempts();
Assert.assertEquals("No of attempts is not correct", 1,
attempts.size());
iter = attempts.values().iterator();
Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED,
iter.next().getReport().getTaskAttemptState());
}
@Test
public void testKillTaskAttempt() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
MRApp app = new BlockingMRApp(2, 0, latch);
//this will start the job but job won't complete as Task is blocked
Job job = app.submit(new Configuration());
//wait and vailidate for Job to become RUNNING
app.waitForState(job, JobState.RUNNING);
Map<TaskId,Task> tasks = job.getTasks();
Assert.assertEquals("No of tasks is not correct", 2,
tasks.size());
Iterator<Task> it = tasks.values().iterator();
Task task1 = it.next();
Task task2 = it.next();
//wait for tasks to become running
app.waitForState(task1, TaskState.SCHEDULED);
app.waitForState(task2, TaskState.SCHEDULED);
//send the kill signal to the first Task's attempt
TaskAttempt attempt = task1.getAttempts().values().iterator().next();
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_KILL));
//unblock
latch.countDown();
//wait and validate for Job to become SUCCEEDED
//job will still succeed
app.waitForState(job, JobState.SUCCEEDED);
//first Task will have two attempts 1st is killed, 2nd Succeeds
//both Tasks and Job succeeds
Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED,
task1.getReport().getTaskState());
Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED,
task2.getReport().getTaskState());
Map<TaskAttemptId, TaskAttempt> attempts = task1.getAttempts();
Assert.assertEquals("No of attempts is not correct", 2,
attempts.size());
Iterator<TaskAttempt> iter = attempts.values().iterator();
Assert.assertEquals("Attempt state not correct", TaskAttemptState.KILLED,
iter.next().getReport().getTaskAttemptState());
Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED,
iter.next().getReport().getTaskAttemptState());
attempts = task2.getAttempts();
Assert.assertEquals("No of attempts is not correct", 1,
attempts.size());
iter = attempts.values().iterator();
Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED,
iter.next().getReport().getTaskAttemptState());
}
static class BlockingMRApp extends MRApp {
private CountDownLatch latch;
BlockingMRApp(int maps, int reduces, CountDownLatch latch) {
super(maps, reduces, true, "testKill", true);
this.latch = latch;
}
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
//this blocks the first task's first attempt
//the subsequent ones are completed
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_DONE));
}
}
}
public static void main(String[] args) throws Exception {
TestKill t = new TestKill();
t.testKillJob();
t.testKillTask();
t.testKillTaskAttempt();
}
}