blob: 00bdaebfe800c3fd935d1ab9f801c4884483bcdd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.EnumSet;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.BuilderUtils;
/**
* Mock MRAppMaster. Doesn't start RPC servers.
* No threads are started except of the event Dispatcher thread.
*/
@SuppressWarnings("unchecked")
public class MRApp extends MRAppMaster {
private static final Log LOG = LogFactory.getLog(MRApp.class);
int maps;
int reduces;
private File testWorkDir;
private Path testAbsPath;
public static String NM_HOST = "localhost";
public static int NM_PORT = 1234;
public static int NM_HTTP_PORT = 9999;
private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
//if true, tasks complete automatically as soon as they are launched
protected boolean autoComplete = false;
static ApplicationId applicationId;
static {
applicationId = recordFactory.newRecordInstance(ApplicationId.class);
applicationId.setClusterTimestamp(0);
applicationId.setId(0);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
}
@Override
protected void downloadTokensAndSetupUGI(Configuration conf) {
}
private static ApplicationAttemptId getApplicationAttemptId(
ApplicationId applicationId, int startCount) {
ApplicationAttemptId applicationAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
applicationAttemptId.setApplicationId(applicationId);
applicationAttemptId.setAttemptId(startCount);
return applicationAttemptId;
}
private static ContainerId getContainerId(ApplicationId applicationId,
int startCount) {
ApplicationAttemptId appAttemptId =
getApplicationAttemptId(applicationId, startCount);
ContainerId containerId =
BuilderUtils.newContainerId(appAttemptId, startCount);
return containerId;
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) {
this(getApplicationAttemptId(applicationId, startCount), getContainerId(
applicationId, startCount), maps, reduces, autoComplete, testName,
cleanOnStart, startCount);
}
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) {
super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, System
.currentTimeMillis());
this.testWorkDir = new File("target", testName);
testAbsPath = new Path(testWorkDir.getAbsolutePath());
LOG.info("PathUsed: " + testAbsPath);
if (cleanOnStart) {
testAbsPath = new Path(testWorkDir.getAbsolutePath());
try {
FileContext.getLocalFSFileContext().delete(testAbsPath, true);
} catch (Exception e) {
LOG.warn("COULD NOT CLEANUP: " + testAbsPath, e);
throw new YarnException("could not cleanup test dir", e);
}
}
this.maps = maps;
this.reduces = reduces;
this.autoComplete = autoComplete;
}
public Job submit(Configuration conf) throws Exception {
String user = conf.get(MRJobConfig.USER_NAME, UserGroupInformation
.getCurrentUser().getShortUserName());
conf.set(MRJobConfig.USER_NAME, user);
conf.set(MRJobConfig.MR_AM_STAGING_DIR, testAbsPath.toString());
conf.setBoolean(MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR, true);
//TODO: fix the bug where the speculator gets events with
//not-fully-constructed objects. For now, disable speculative exec
LOG.info("****DISABLING SPECULATIVE EXECUTION*****");
conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
init(conf);
start();
DefaultMetricsSystem.shutdown();
Job job = getContext().getAllJobs().values().iterator().next();
// Write job.xml
String jobFile = MRApps.getJobFile(conf, user,
TypeConverter.fromYarn(job.getID()));
LOG.info("Writing job conf to " + jobFile);
new File(jobFile).getParentFile().mkdirs();
conf.writeXml(new FileOutputStream(jobFile));
return job;
}
public void waitForState(TaskAttempt attempt,
TaskAttemptState finalState) throws Exception {
int timeoutSecs = 0;
TaskAttemptReport report = attempt.getReport();
while (!finalState.equals(report.getTaskAttemptState()) &&
timeoutSecs++ < 20) {
System.out.println("TaskAttempt State is : " + report.getTaskAttemptState() +
" Waiting for state : " + finalState +
" progress : " + report.getProgress());
report = attempt.getReport();
Thread.sleep(500);
}
System.out.println("TaskAttempt State is : " + report.getTaskAttemptState());
Assert.assertEquals("TaskAttempt state is not correct (timedout)",
finalState,
report.getTaskAttemptState());
}
public void waitForState(Task task, TaskState finalState) throws Exception {
int timeoutSecs = 0;
TaskReport report = task.getReport();
while (!finalState.equals(report.getTaskState()) &&
timeoutSecs++ < 20) {
System.out.println("Task State for " + task.getID() + " is : "
+ report.getTaskState() + " Waiting for state : " + finalState
+ " progress : " + report.getProgress());
report = task.getReport();
Thread.sleep(500);
}
System.out.println("Task State is : " + report.getTaskState());
Assert.assertEquals("Task state is not correct (timedout)", finalState,
report.getTaskState());
}
public void waitForState(Job job, JobState finalState) throws Exception {
int timeoutSecs = 0;
JobReport report = job.getReport();
while (!finalState.equals(report.getJobState()) &&
timeoutSecs++ < 20) {
System.out.println("Job State is : " + report.getJobState() +
" Waiting for state : " + finalState +
" map progress : " + report.getMapProgress() +
" reduce progress : " + report.getReduceProgress());
report = job.getReport();
Thread.sleep(500);
}
System.out.println("Job State is : " + report.getJobState());
Assert.assertEquals("Job state is not correct (timedout)", finalState,
job.getState());
}
public void waitForState(Service.STATE finalState) throws Exception {
int timeoutSecs = 0;
while (!finalState.equals(getServiceState()) && timeoutSecs++ < 20) {
System.out.println("MRApp State is : " + getServiceState()
+ " Waiting for state : " + finalState);
Thread.sleep(500);
}
System.out.println("MRApp State is : " + getServiceState());
Assert.assertEquals("MRApp state is not correct (timedout)", finalState,
getServiceState());
}
public void verifyCompleted() {
for (Job job : getContext().getAllJobs().values()) {
JobReport jobReport = job.getReport();
System.out.println("Job start time :" + jobReport.getStartTime());
System.out.println("Job finish time :" + jobReport.getFinishTime());
Assert.assertTrue("Job start time is not less than finish time",
jobReport.getStartTime() <= jobReport.getFinishTime());
Assert.assertTrue("Job finish time is in future",
jobReport.getFinishTime() <= System.currentTimeMillis());
for (Task task : job.getTasks().values()) {
TaskReport taskReport = task.getReport();
System.out.println("Task start time : " + taskReport.getStartTime());
System.out.println("Task finish time : " + taskReport.getFinishTime());
Assert.assertTrue("Task start time is not less than finish time",
taskReport.getStartTime() <= taskReport.getFinishTime());
for (TaskAttempt attempt : task.getAttempts().values()) {
TaskAttemptReport attemptReport = attempt.getReport();
Assert.assertTrue("Attempt start time is not less than finish time",
attemptReport.getStartTime() <= attemptReport.getFinishTime());
}
}
}
}
@Override
protected Job createJob(Configuration conf) {
UserGroupInformation currentUser = null;
try {
currentUser = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new YarnException(e);
}
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
getCommitter(), isNewApiCommitter(),
currentUser.getUserName());
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class,
new EventHandler<JobFinishEvent>() {
@Override
public void handle(JobFinishEvent event) {
stop();
}
});
return newJob;
}
@Override
protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
return new TaskAttemptListener(){
@Override
public InetSocketAddress getAddress() {
return NetUtils.createSocketAddr("localhost:54321");
}
@Override
public void registerLaunchedTask(TaskAttemptId attemptID,
WrappedJvmID jvmID) {
}
@Override
public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) {
}
@Override
public void registerPendingTask(org.apache.hadoop.mapred.Task task,
WrappedJvmID jvmID) {
}
};
}
@Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {//disable history
return new EventHandler<JobHistoryEvent>() {
@Override
public void handle(JobHistoryEvent event) {
}
};
}
@Override
protected ContainerLauncher createContainerLauncher(AppContext context) {
return new MockContainerLauncher();
}
protected class MockContainerLauncher implements ContainerLauncher {
//We are running locally so set the shuffle port to -1
int shufflePort = -1;
public MockContainerLauncher() {
}
@Override
public void handle(ContainerLauncherEvent event) {
switch (event.getType()) {
case CONTAINER_REMOTE_LAUNCH:
getContext().getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(),
shufflePort));
attemptLaunched(event.getTaskAttemptID());
break;
case CONTAINER_REMOTE_CLEANUP:
getContext().getEventHandler().handle(
new TaskAttemptEvent(event.getTaskAttemptID(),
TaskAttemptEventType.TA_CONTAINER_CLEANED));
break;
}
}
}
protected void attemptLaunched(TaskAttemptId attemptID) {
if (autoComplete) {
// send the done event
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_DONE));
}
}
@Override
protected ContainerAllocator createContainerAllocator(
ClientService clientService, AppContext context) {
return new ContainerAllocator(){
private int containerCount;
@Override
public void handle(ContainerAllocatorEvent event) {
ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
cId.setId(containerCount++);
NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT);
Container container = BuilderUtils.newContainer(cId, nodeId,
NM_HOST + ":" + NM_HTTP_PORT, null, null, null);
JobID id = TypeConverter.fromYarn(applicationId);
JobId jobId = TypeConverter.toYarn(id);
getContext().getEventHandler().handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.REDUCE,
100)));
getContext().getEventHandler().handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.MAP,
100)));
getContext().getEventHandler().handle(
new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
container, null));
}
};
}
@Override
protected TaskCleaner createTaskCleaner(AppContext context) {
return new TaskCleaner() {
@Override
public void handle(TaskCleanupEvent event) {
//send the cleanup done event
getContext().getEventHandler().handle(
new TaskAttemptEvent(event.getAttemptID(),
TaskAttemptEventType.TA_CLEANUP_DONE));
}
};
}
@Override
protected ClientService createClientService(AppContext context) {
return new ClientService(){
@Override
public InetSocketAddress getBindAddress() {
return NetUtils.createSocketAddr("localhost:9876");
}
@Override
public int getHttpPort() {
return -1;
}
};
}
class TestJob extends JobImpl {
//override the init transition
private final TestInitTransition initTransition = new TestInitTransition(
maps, reduces);
StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> localFactory
= stateMachineFactory.addTransition(JobState.NEW,
EnumSet.of(JobState.INITED, JobState.FAILED),
JobEventType.JOB_INIT,
// This is abusive.
initTransition);
private final StateMachine<JobState, JobEventType, JobEvent>
localStateMachine;
@Override
protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
return localStateMachine;
}
@SuppressWarnings("rawtypes")
public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Clock clock,
OutputCommitter committer, boolean newApiCommitter, String user) {
super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
conf, eventHandler, taskAttemptListener,
new JobTokenSecretManager(), new Credentials(), clock,
getCompletedTaskFromPreviousRun(), metrics, committer,
newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos());
// This "this leak" is okay because the retained pointer is in an
// instance variable.
localStateMachine = localFactory.make(this);
}
}
//Override InitTransition to not look for split files etc
static class TestInitTransition extends JobImpl.InitTransition {
private int maps;
private int reduces;
TestInitTransition(int maps, int reduces) {
this.maps = maps;
this.reduces = reduces;
}
@Override
protected void setup(JobImpl job) throws IOException {
super.setup(job);
job.conf.setInt(MRJobConfig.NUM_REDUCES, reduces);
job.remoteJobConfFile = new Path("test");
}
@Override
protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[maps];
for (int i = 0; i < maps ; i++) {
splits[i] = new TaskSplitMetaInfo();
}
return splits;
}
}
}