blob: 58ab9c8117e07d2caa829b5652ff744fb62348e0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import java.util.Map;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
public class TestJobHistoryEvents {
private static final Log LOG = LogFactory.getLog(TestJobHistoryEvents.class);
@Test
public void testHistoryEvents() throws Exception {
Configuration conf = new Configuration();
MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
app.waitForState(job, JobState.SUCCEEDED);
//make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
/*
* Use HistoryContext to read logged events and verify the number of
* completed maps
*/
HistoryContext context = new JobHistory();
((JobHistory)context).init(conf);
Job parsedJob = context.getJob(jobId);
Assert.assertEquals("CompletedMaps not correct", 2,
parsedJob.getCompletedMaps());
Assert.assertEquals(System.getProperty("user.name"), parsedJob.getUserName());
Map<TaskId, Task> tasks = parsedJob.getTasks();
Assert.assertEquals("No of tasks not correct", 3, tasks.size());
for (Task task : tasks.values()) {
verifyTask(task);
}
Map<TaskId, Task> maps = parsedJob.getTasks(TaskType.MAP);
Assert.assertEquals("No of maps not correct", 2, maps.size());
Map<TaskId, Task> reduces = parsedJob.getTasks(TaskType.REDUCE);
Assert.assertEquals("No of reduces not correct", 1, reduces.size());
Assert.assertEquals("CompletedReduce not correct", 1,
parsedJob.getCompletedReduces());
Assert.assertEquals("Job state not currect", JobState.SUCCEEDED,
parsedJob.getState());
}
/**
* Verify that all the events are flushed on stopping the HistoryHandler
* @throws Exception
*/
@Test
public void testEventsFlushOnStop() throws Exception {
Configuration conf = new Configuration();
MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this
.getClass().getName(), true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
app.waitForState(job, JobState.SUCCEEDED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
/*
* Use HistoryContext to read logged events and verify the number of
* completed maps
*/
HistoryContext context = new JobHistory();
((JobHistory) context).init(conf);
Job parsedJob = context.getJob(jobId);
Assert.assertEquals("CompletedMaps not correct", 1, parsedJob
.getCompletedMaps());
Map<TaskId, Task> tasks = parsedJob.getTasks();
Assert.assertEquals("No of tasks not correct", 1, tasks.size());
verifyTask(tasks.values().iterator().next());
Map<TaskId, Task> maps = parsedJob.getTasks(TaskType.MAP);
Assert.assertEquals("No of maps not correct", 1, maps.size());
Assert.assertEquals("Job state not currect", JobState.SUCCEEDED,
parsedJob.getState());
}
@Test
public void testJobHistoryEventHandlerIsFirstServiceToStop() {
MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this
.getClass().getName(), true);
Configuration conf = new Configuration();
app.init(conf);
Service[] services = app.getServices().toArray(new Service[0]);
// Verifying that it is the last to be added is same as verifying that it is
// the first to be stopped. CompositeService related tests already validate
// this.
Assert.assertEquals("JobHistoryEventHandler",
services[services.length - 1].getName());
}
private void verifyTask(Task task) {
Assert.assertEquals("Task state not currect", TaskState.SUCCEEDED,
task.getState());
Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
Assert.assertEquals("No of attempts not correct", 1, attempts.size());
for (TaskAttempt attempt : attempts.values()) {
verifyAttempt(attempt);
}
}
private void verifyAttempt(TaskAttempt attempt) {
Assert.assertEquals("TaskAttempt state not currect",
TaskAttemptState.SUCCEEDED, attempt.getState());
Assert.assertNotNull(attempt.getAssignedContainerID());
//Verify the wrong ctor is not being used. Remove after mrv1 is removed.
ContainerId fakeCid = BuilderUtils.newContainerId(-1, -1, -1, -1);
Assert.assertFalse(attempt.getAssignedContainerID().equals(fakeCid));
//Verify complete contianerManagerAddress
Assert.assertEquals(MRApp.NM_HOST + ":" + MRApp.NM_PORT,
attempt.getAssignedContainerMgrAddress());
}
static class MRAppWithHistory extends MRApp {
public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
}
@Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(
context, getStartCount());
return eventHandler;
}
}
/**
* MRapp with special HistoryEventHandler that writes events only during stop.
* This is to simulate events that don't get written by the eventHandling
* thread due to say a slow DFS and verify that they are flushed during stop.
*/
private static class MRAppWithSpecialHistoryHandler extends MRApp {
public MRAppWithSpecialHistoryHandler(int maps, int reduces,
boolean autoComplete, String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
}
@Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
return new JobHistoryEventHandler(context, getStartCount()) {
@Override
public void start() {
// Don't start any event draining thread.
super.eventHandlingThread = new Thread();
super.eventHandlingThread.start();
}
};
}
}
public static void main(String[] args) throws Exception {
TestJobHistoryEvents t = new TestJobHistoryEvents();
t.testHistoryEvents();
t.testEventsFlushOnStop();
t.testJobHistoryEventHandlerIsFirstServiceToStop();
}
}