blob: f8f2cdabeb450c869d032b6c59d1c4cc0329aeaa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
import org.apache.hadoop.mapreduce.test.system.JTProtocol;
import org.apache.hadoop.mapreduce.test.system.JobInfo;
import org.apache.hadoop.mapreduce.test.system.MRCluster;
import org.apache.hadoop.mapreduce.test.system.TTClient;
import org.apache.hadoop.mapreduce.test.system.TTInfo;
import org.apache.hadoop.mapreduce.test.system.TTTaskInfo;
import org.apache.hadoop.mapreduce.test.system.TaskInfo;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestFileOwner {
public static MRCluster cluster;
private StringBuffer jobIdDir = new StringBuffer();
private JTProtocol wovenClient = null;
private static final Log LOG = LogFactory.getLog(TestFileOwner.class);
private String taskController = null;
private final FsPermission PERM_777 = new FsPermission("777");
private final FsPermission PERM_755 = new FsPermission("755");
private final FsPermission PERM_644 = new FsPermission("644");
@BeforeClass
public static void setUp() throws java.lang.Exception {
cluster = MRCluster.createCluster(new Configuration());
cluster.setUp();
}
/*
* The test is used to check the file permission of local files in
* mapred.local.dir. The job control is used which will make the tasks wait
* for completion until it is signaled
*
* @throws Exception in case of test errors
*/
@Test
public void testFilePermission() throws Exception {
wovenClient = cluster.getJTClient().getProxy();
Configuration conf = new Configuration(cluster.getConf());
FinishTaskControlAction.configureControlActionForJob(conf);
SleepJob job = new SleepJob();
job.setConf(conf);
Job slpJob = job.createJob(1, 0, 100, 100, 100, 100);
JobConf jconf = new JobConf(conf);
slpJob.submit();
RunningJob rJob =
cluster.getJTClient().getClient().getJob(
org.apache.hadoop.mapred.JobID.downgrade(slpJob.getJobID()));
taskController = conf.get(TTConfig.TT_TASK_CONTROLLER);
// get the job info so we can get the env variables from the daemon.
// Now wait for the task to be in the running state, only then the
// directories will be created
JobInfo info = wovenClient.getJobInfo(rJob.getID());
Assert.assertNotNull("JobInfo is null", info);
JobID id = rJob.getID();
while (info.runningMaps() != 1) {
Thread.sleep(1000);
info = wovenClient.getJobInfo(id);
}
TaskInfo[] myTaskInfos = wovenClient.getTaskInfo(id);
for (TaskInfo tInfo : myTaskInfos) {
if (!tInfo.isSetupOrCleanup()) {
String[] taskTrackers = tInfo.getTaskTrackers();
for (String taskTracker : taskTrackers) {
TTInfo ttInfo = wovenClient.getTTInfo(taskTracker);
TTClient ttCli = cluster.getTTClient(ttInfo.getStatus().getHost());
Assert.assertNotNull("TTClient instance is null", ttCli);
TTTaskInfo ttTaskInfo = ttCli.getProxy().getTask(tInfo.getTaskID());
Assert.assertNotNull("TTTaskInfo is null", ttTaskInfo);
while (ttTaskInfo.getTaskStatus().getRunState() != TaskStatus.State.RUNNING) {
Thread.sleep(100);
ttTaskInfo = ttCli.getProxy().getTask(tInfo.getTaskID());
}
testPermissionWithTaskController(ttCli, conf, info);
FinishTaskControlAction action =
new FinishTaskControlAction(TaskID.downgrade(tInfo.getTaskID()));
for (TTClient cli : cluster.getTTClients()) {
cli.getProxy().sendAction(action);
}
}
}
}
JobInfo jInfo = wovenClient.getJobInfo(id);
jInfo = cluster.getJTClient().getProxy().getJobInfo(id);
while (!jInfo.getStatus().isJobComplete()) {
Thread.sleep(100);
jInfo = cluster.getJTClient().getProxy().getJobInfo(id);
}
}
private void testPermissionWithTaskController(
TTClient tClient, Configuration conf, JobInfo info) {
Assert.assertNotNull("TTclient is null", tClient);
FsPermission fsPerm = null;
String[] pathInfo = conf.getStrings(MRConfig.LOCAL_DIR);
for (int i = 0; i < pathInfo.length; i++) {
// First verify the jobid directory exists
jobIdDir = new StringBuffer();
String userName = null;
try {
JobStatus[] jobStatus = cluster.getJTClient().getClient().getAllJobs();
userName = jobStatus[0].getUsername();
} catch (Exception ex) {
LOG.error("Failed to get user name");
boolean status = false;
Assert.assertTrue("Failed to get the userName", status);
}
jobIdDir.append(pathInfo[i]).append(Path.SEPARATOR);
jobIdDir.append(TaskTracker.getLocalJobDir(userName, info
.getID().toString()));
FileStatus[] fs = null;
try {
fs = tClient.listStatus(jobIdDir.toString(), true);
} catch (Exception ex) {
LOG.error("Failed to get the jobIdDir files " + ex);
}
Assert.assertEquals("Filestatus length is zero", fs.length != 0, true);
for (FileStatus file : fs) {
try {
String filename = file.getPath().getName();
if (filename.equals(TaskTracker.JOBFILE)) {
if (taskController == DefaultTaskController.class.getName()) {
fsPerm = file.getPermission();
Assert.assertTrue("FilePermission failed for " + filename, fsPerm
.equals(PERM_777));
}
}
if (filename.startsWith("attempt")) {
StringBuffer attemptDir = new StringBuffer(jobIdDir);
attemptDir.append(Path.SEPARATOR).append(filename);
if (tClient.getFileStatus(attemptDir.toString(), true) != null) {
FileStatus[] attemptFs =
tClient.listStatus(attemptDir.toString(), true, true);
for (FileStatus attemptfz : attemptFs) {
Assert.assertNotNull("FileStatus is null", attemptfz);
fsPerm = attemptfz.getPermission();
Assert.assertNotNull("FsPermission is null", fsPerm);
if (taskController == DefaultTaskController.class.getName()) {
if (!attemptfz.isDir()) {
Assert.assertTrue(
"FilePermission failed for " + filename, fsPerm
.equals(PERM_777));
} else {
Assert.assertTrue(
"FilePermission failed for " + filename, fsPerm
.equals(PERM_755));
}
}
}
}
}
if (filename.equals(TaskTracker.TASKJARDIR)) {
StringBuffer jarsDir = new StringBuffer(jobIdDir);
jarsDir.append(Path.SEPARATOR).append(filename);
FileStatus[] jarsFs =
tClient.listStatus(jarsDir.toString(), true, true);
for (FileStatus jarsfz : jarsFs) {
Assert.assertNotNull("FileStatus is null", jarsfz);
fsPerm = jarsfz.getPermission();
Assert.assertNotNull("File permission is null", fsPerm);
if (taskController == DefaultTaskController.class.getName()) {
if (!jarsfz.isDir()) {
if (jarsfz.getPath().getName().equals("job.jar")) {
Assert.assertTrue(
"FilePermission failed for " + filename, fsPerm
.equals(PERM_777));
} else {
Assert.assertTrue(
"FilePermission failed for " + filename, fsPerm
.equals(PERM_644));
}
} else {
Assert.assertTrue(
"FilePermission failed for " + filename, fsPerm
.equals(PERM_755));
}
}
}
}
} catch (Exception ex) {
LOG.error("The exception occurred while searching for nonexsistent"
+ "file, ignoring and continuing. " + ex);
}
}// for loop ends
}// for loop ends
}
@AfterClass
public static void tearDown() throws java.lang.Exception {
cluster.tearDown();
}
}