blob: 28a891850e4a87b9300fdc5ccb0df64adb39a3b1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestLocalContainerLauncher {
private static final Log LOG =
LogFactory.getLog(TestLocalContainerLauncher.class);
private static File testWorkDir;
private static final String[] localDirs = new String[2];
private static void delete(File dir) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
Path p = fs.makeQualified(new Path(dir.getAbsolutePath()));
fs.delete(p, true);
}
@BeforeClass
public static void setupTestDirs() throws IOException {
testWorkDir = new File("target",
TestLocalContainerLauncher.class.getCanonicalName());
testWorkDir.delete();
testWorkDir.mkdirs();
testWorkDir = testWorkDir.getAbsoluteFile();
for (int i = 0; i < localDirs.length; i++) {
final File dir = new File(testWorkDir, "local-" + i);
dir.mkdirs();
localDirs[i] = dir.toString();
}
}
@AfterClass
public static void cleanupTestDirs() throws IOException {
if (testWorkDir != null) {
delete(testWorkDir);
}
}
@SuppressWarnings("rawtypes")
@Test(timeout=10000)
public void testKillJob() throws Exception {
JobConf conf = new JobConf();
AppContext context = mock(AppContext.class);
// a simple event handler solely to detect the container cleaned event
final CountDownLatch isDone = new CountDownLatch(1);
EventHandler handler = new EventHandler() {
@Override
public void handle(Event event) {
LOG.info("handling event " + event.getClass() +
" with type " + event.getType());
if (event instanceof TaskAttemptEvent) {
if (event.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) {
isDone.countDown();
}
}
}
};
when(context.getEventHandler()).thenReturn(handler);
// create and start the launcher
LocalContainerLauncher launcher =
new LocalContainerLauncher(context, mock(TaskUmbilicalProtocol.class));
launcher.init(conf);
launcher.start();
// create mocked job, task, and task attempt
// a single-mapper job
JobId jobId = MRBuilderUtils.newJobId(System.currentTimeMillis(), 1, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId taId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
Job job = mock(Job.class);
when(job.getTotalMaps()).thenReturn(1);
when(job.getTotalReduces()).thenReturn(0);
Map<JobId,Job> jobs = new HashMap<JobId,Job>();
jobs.put(jobId, job);
// app context returns the one and only job
when(context.getAllJobs()).thenReturn(jobs);
org.apache.hadoop.mapreduce.v2.app.job.Task ytask =
mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
when(ytask.getType()).thenReturn(TaskType.MAP);
when(job.getTask(taskId)).thenReturn(ytask);
// create a sleeping mapper that runs beyond the test timeout
MapTask mapTask = mock(MapTask.class);
when(mapTask.isMapOrReduce()).thenReturn(true);
when(mapTask.isMapTask()).thenReturn(true);
TaskAttemptID taskID = TypeConverter.fromYarn(taId);
when(mapTask.getTaskID()).thenReturn(taskID);
when(mapTask.getJobID()).thenReturn(taskID.getJobID());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
// sleep for a long time
LOG.info("sleeping for 5 minutes...");
Thread.sleep(5*60*1000);
return null;
}
}).when(mapTask).run(isA(JobConf.class), isA(TaskUmbilicalProtocol.class));
// pump in a task attempt launch event
ContainerLauncherEvent launchEvent =
new ContainerRemoteLaunchEvent(taId, null, createMockContainer(), mapTask);
launcher.handle(launchEvent);
Thread.sleep(200);
// now pump in a container clean-up event
ContainerLauncherEvent cleanupEvent =
new ContainerLauncherEvent(taId, null, null, null,
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP);
launcher.handle(cleanupEvent);
// wait for the event to fire: this should be received promptly
isDone.await();
launcher.close();
}
private static Container createMockContainer() {
Container container = mock(Container.class);
NodeId nodeId = NodeId.newInstance("foo.bar.org", 1234);
when(container.getNodeId()).thenReturn(nodeId);
return container;
}
@Test
public void testRenameMapOutputForReduce() throws Exception {
final JobConf conf = new JobConf();
final MROutputFiles mrOutputFiles = new MROutputFiles();
mrOutputFiles.setConf(conf);
// make sure both dirs are distinct
//
conf.set(MRConfig.LOCAL_DIR, localDirs[0].toString());
final Path mapOut = mrOutputFiles.getOutputFileForWrite(1);
conf.set(MRConfig.LOCAL_DIR, localDirs[1].toString());
final Path mapOutIdx = mrOutputFiles.getOutputIndexFileForWrite(1);
Assert.assertNotEquals("Paths must be different!",
mapOut.getParent(), mapOutIdx.getParent());
// make both dirs part of LOCAL_DIR
conf.setStrings(MRConfig.LOCAL_DIR, localDirs);
final FileContext lfc = FileContext.getLocalFSFileContext(conf);
lfc.create(mapOut, EnumSet.of(CREATE)).close();
lfc.create(mapOutIdx, EnumSet.of(CREATE)).close();
final JobId jobId = MRBuilderUtils.newJobId(12345L, 1, 2);
final TaskId tid = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 0);
LocalContainerLauncher.renameMapOutputForReduce(conf, taid, mrOutputFiles);
}
}