blob: 3ddd963b970289c51c4a9b968d846b8f6ac48ead [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
import junit.framework.TestCase;
/**
* The base class which starts up a cluster with LinuxTaskController as the task
* controller.
*
* In order to run test cases utilizing LinuxTaskController please follow the
* following steps:
* <ol>
* <li>Build LinuxTaskController by not passing any
* <code>-Dhadoop.conf.dir</code></li>
* <li>Make the built binary to setuid executable</li>
* <li>Execute following targets:
* <code>ant test -Dcompile.c++=true -Dtaskcontroller-path=<em>path to built binary</em>
* -Dtaskcontroller-ugi=<em>user,group</em></code></li>
* </ol>
*
*/
public class ClusterWithLinuxTaskController extends TestCase {
private static final Log LOG =
LogFactory.getLog(ClusterWithLinuxTaskController.class);
/**
* The wrapper class around LinuxTaskController which allows modification of
* the custom path to task-controller which we can use for task management.
*
**/
public static class MyLinuxTaskController extends LinuxTaskController {
String taskControllerExePath;
@Override
protected String getTaskControllerExecutablePath() {
return taskControllerExePath;
}
void setTaskControllerExe(String execPath) {
this.taskControllerExePath = execPath;
}
}
// cluster instances which sub classes can use
protected MiniMRCluster mrCluster = null;
protected MiniDFSCluster dfsCluster = null;
private JobConf clusterConf = null;
protected Path homeDirectory;
private static final int NUMBER_OF_NODES = 1;
static final String TASKCONTROLLER_PATH = "taskcontroller-path";
static final String TASKCONTROLLER_UGI = "taskcontroller-ugi";
private File configurationFile = null;
private UserGroupInformation taskControllerUser;
/*
* Utility method which subclasses use to start and configure the MR Cluster
* so they can directly submit a job.
*/
protected void startCluster()
throws IOException {
JobConf conf = new JobConf();
dfsCluster = new MiniDFSCluster(conf, NUMBER_OF_NODES, true, null);
conf.set(TTConfig.TT_TASK_CONTROLLER,
MyLinuxTaskController.class.getName());
mrCluster =
new MiniMRCluster(NUMBER_OF_NODES, dfsCluster.getFileSystem().getUri()
.toString(), 4, null, null, conf);
// Get the configured taskcontroller-path
String path = System.getProperty(TASKCONTROLLER_PATH);
configurationFile =
createTaskControllerConf(path, mrCluster.getTaskTrackerRunner(0)
.getLocalDirs());
String execPath = path + "/task-controller";
TaskTracker tracker = mrCluster.getTaskTrackerRunner(0).tt;
// TypeCasting the parent to our TaskController instance as we
// know that that would be instance which should be present in TT.
((MyLinuxTaskController) tracker.getTaskController())
.setTaskControllerExe(execPath);
String ugi = System.getProperty(TASKCONTROLLER_UGI);
clusterConf = mrCluster.createJobConf();
String[] splits = ugi.split(",");
taskControllerUser = new UnixUserGroupInformation(splits);
clusterConf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
createHomeDirectory(clusterConf);
}
private void createHomeDirectory(JobConf conf)
throws IOException {
FileSystem fs = dfsCluster.getFileSystem();
String path = "/user/" + taskControllerUser.getUserName();
homeDirectory = new Path(path);
LOG.info("Creating Home directory : " + homeDirectory);
fs.mkdirs(homeDirectory);
changePermission(conf, homeDirectory);
}
private void changePermission(JobConf conf, Path p)
throws IOException {
FileSystem fs = dfsCluster.getFileSystem();
fs.setOwner(homeDirectory, taskControllerUser.getUserName(),
taskControllerUser.getGroupNames()[0]);
}
/**
* Create taskcontroller.cfg.
*
* @param path Path to the taskcontroller binary.
* @param localDirs
* @return the created conf file
* @throws IOException
*/
static File createTaskControllerConf(String path, String[] localDirs)
throws IOException {
File confDirectory = new File(path, "../conf");
if (!confDirectory.exists()) {
confDirectory.mkdirs();
}
File configurationFile = new File(confDirectory, "taskcontroller.cfg");
PrintWriter writer =
new PrintWriter(new FileOutputStream(configurationFile));
StringBuffer sb = new StringBuffer();
for (int i = 0; i < localDirs.length; i++) {
sb.append(localDirs[i]);
if ((i + 1) != localDirs.length) {
sb.append(",");
}
}
writer.println(String.format(MRConfig.LOCAL_DIR + "=%s", sb.toString()));
writer
.println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir()));
writer.flush();
writer.close();
return configurationFile;
}
/**
* Can we run the tests with LinuxTaskController?
*
* @return boolean
*/
protected static boolean shouldRun() {
if (!isTaskExecPathPassed() || !isUserPassed()) {
LOG.info("Not running test.");
return false;
}
return true;
}
private static boolean isTaskExecPathPassed() {
String path = System.getProperty(TASKCONTROLLER_PATH);
if (path == null || path.isEmpty()
|| path.equals("${" + TASKCONTROLLER_PATH + "}")) {
LOG.info("Invalid taskcontroller-path : " + path);
return false;
}
return true;
}
private static boolean isUserPassed() {
String ugi = System.getProperty(TASKCONTROLLER_UGI);
if (ugi != null && !(ugi.equals("${" + TASKCONTROLLER_UGI + "}"))
&& !ugi.isEmpty()) {
if (ugi.indexOf(",") > 1) {
return true;
}
LOG.info("Invalid taskcontroller-ugi : " + ugi);
return false;
}
LOG.info("Invalid taskcontroller-ugi : " + ugi);
return false;
}
protected JobConf getClusterConf() {
return new JobConf(clusterConf);
}
@Override
protected void tearDown()
throws Exception {
if (mrCluster != null) {
mrCluster.shutdown();
}
if (dfsCluster != null) {
dfsCluster.shutdown();
}
if (configurationFile != null) {
configurationFile.delete();
}
super.tearDown();
}
/**
* Assert that the job is actually run by the specified user by verifying the
* permissions of the output part-files.
*
* @param outDir
* @throws IOException
*/
protected void assertOwnerShip(Path outDir)
throws IOException {
FileSystem fs = outDir.getFileSystem(clusterConf);
assertOwnerShip(outDir, fs);
}
/**
* Assert that the job is actually run by the specified user by verifying the
* permissions of the output part-files.
*
* @param outDir
* @param fs
* @throws IOException
*/
protected void assertOwnerShip(Path outDir, FileSystem fs)
throws IOException {
for (FileStatus status : fs.listStatus(outDir, new OutputLogFilter())) {
String owner = status.getOwner();
String group = status.getGroup();
LOG.info("Ownership of the file is " + status.getPath() + " is " + owner
+ "," + group);
assertTrue("Output part-file's owner is not correct. Expected : "
+ taskControllerUser.getUserName() + " Found : " + owner, owner
.equals(taskControllerUser.getUserName()));
assertTrue("Output part-file's group is not correct. Expected : "
+ taskControllerUser.getGroupNames()[0] + " Found : " + group, group
.equals(taskControllerUser.getGroupNames()[0]));
}
}
}