MAPREDUCE-2804. Fixed a race condition in setting up the log directories
for tasks that are starting at the same time. (omalley)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-204@1161328 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 71bca85..5f9ce6d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,6 +13,9 @@
BUG FIXES
+ MAPREDUCE-2804. Fixed a race condition in setting up the log directories
+ for tasks that are starting at the same time. (omalley)
+
MAPREDUCE-2846. Fixed a race condition in writing the log index file that
caused tasks to fail. (omalley)
diff --git a/src/core/org/apache/hadoop/fs/FileUtil.java b/src/core/org/apache/hadoop/fs/FileUtil.java
index bfb13d2..e9c3d70 100644
--- a/src/core/org/apache/hadoop/fs/FileUtil.java
+++ b/src/core/org/apache/hadoop/fs/FileUtil.java
@@ -25,6 +25,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell;
@@ -561,9 +563,25 @@
} catch(InterruptedException e){
//do nothing as of yet
}
+ if (returnVal != 0) {
+ LOG.warn("Command '" + cmd + "' failed " + returnVal +
+ " with: " + copyStderr(p));
+ }
return returnVal;
}
+ private static String copyStderr(Process p) throws IOException {
+ InputStream err = p.getErrorStream();
+ StringBuilder result = new StringBuilder();
+ byte[] buff = new byte[4096];
+ int len = err.read(buff);
+ while (len > 0) {
+ result.append(new String(buff, 0 , len));
+ len = err.read(buff);
+ }
+ return result.toString();
+ }
+
/**
* Change the permissions on a filename.
* @param filename the name of the file to change
@@ -608,6 +626,75 @@
}
return shExec.getExitCode();
}
+
+ /**
+ * Set permissions to the required value. Uses the java primitives instead
+ * of forking if group == other.
+ * @param f the file to change
+ * @param permission the new permissions
+ * @throws IOException
+ */
+ public static void setPermission(File f, FsPermission permission
+ ) throws IOException {
+ FsAction user = permission.getUserAction();
+ FsAction group = permission.getGroupAction();
+ FsAction other = permission.getOtherAction();
+
+ // Fork chmod if group and other permissions are different...
+ if (group != other) {
+ execSetPermission(f, permission);
+ return;
+ }
+
+ boolean rv = true;
+
+ // read perms
+ rv = f.setReadable(group.implies(FsAction.READ), false);
+ checkReturnValue(rv, f, permission);
+ if (group.implies(FsAction.READ) != user.implies(FsAction.READ)) {
+ f.setReadable(user.implies(FsAction.READ), true);
+ checkReturnValue(rv, f, permission);
+ }
+
+ // write perms
+ rv = f.setWritable(group.implies(FsAction.WRITE), false);
+ checkReturnValue(rv, f, permission);
+ if (group.implies(FsAction.WRITE) != user.implies(FsAction.WRITE)) {
+ f.setWritable(user.implies(FsAction.WRITE), true);
+ checkReturnValue(rv, f, permission);
+ }
+
+ // exec perms
+ rv = f.setExecutable(group.implies(FsAction.EXECUTE), false);
+ checkReturnValue(rv, f, permission);
+ if (group.implies(FsAction.EXECUTE) != user.implies(FsAction.EXECUTE)) {
+ f.setExecutable(user.implies(FsAction.EXECUTE), true);
+ checkReturnValue(rv, f, permission);
+ }
+ }
+
+ private static void checkReturnValue(boolean rv, File p,
+ FsPermission permission
+ ) throws IOException {
+ if (!rv) {
+ throw new IOException("Failed to set permissions of path: " + p + " to " +
+ String.format("%04o", permission.toShort()));
+ }
+ }
+
+ private static void execSetPermission(File f, FsPermission permission)
+ throws IOException {
+ execCommand(f, Shell.SET_PERMISSION_COMMAND,
+ String.format("%04o", permission.toShort()));
+ }
+
+ static String execCommand(File f, String... cmd) throws IOException {
+ String[] args = new String[cmd.length + 1];
+ System.arraycopy(cmd, 0, args, 0, cmd.length);
+ args[cmd.length] = f.getCanonicalPath();
+ String output = Shell.execCommand(args);
+ return output;
+ }
/**
* Create a tmp file for a base file.
diff --git a/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java b/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java
index 1a8f085..f3c38a95 100644
--- a/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java
+++ b/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java
@@ -21,7 +21,6 @@
import java.io.*;
import java.net.URI;
import java.nio.ByteBuffer;
-import java.nio.channels.FileLock;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
@@ -258,6 +257,7 @@
if (pathToFile(src).renameTo(pathToFile(dst))) {
return true;
}
+ LOG.debug("Falling through to a copy of " + src + " to " + dst);
return FileUtil.copy(this, src, this, dst, true, getConf());
}
@@ -415,8 +415,8 @@
IOException e = null;
try {
StringTokenizer t = new StringTokenizer(
- execCommand(new File(getPath().toUri()),
- Shell.getGET_PERMISSION_COMMAND()));
+ FileUtil.execCommand(new File(getPath().toUri()),
+ Shell.getGET_PERMISSION_COMMAND()));
//expected format
//-rw------- 1 username groupname ...
String permission = t.nextToken();
@@ -466,11 +466,11 @@
}
if (username == null) {
- execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname);
+ FileUtil.execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname);
} else {
//OWNER[:[GROUP]]
String s = username + (groupname == null? "": ":" + groupname);
- execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);
+ FileUtil.execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);
}
}
@@ -479,65 +479,8 @@
*/
@Override
public void setPermission(Path p, FsPermission permission
- ) throws IOException {
- FsAction user = permission.getUserAction();
- FsAction group = permission.getGroupAction();
- FsAction other = permission.getOtherAction();
-
- File f = pathToFile(p);
-
- // Fork chmod if group and other permissions are different...
- if (group != other) {
- execSetPermission(f, permission);
- return;
- }
-
- boolean rv = true;
-
- // read perms
- rv = f.setReadable(group.implies(FsAction.READ), false);
- checkReturnValue(rv, p, permission);
- if (group.implies(FsAction.READ) != user.implies(FsAction.READ)) {
- f.setReadable(user.implies(FsAction.READ), true);
- checkReturnValue(rv, p, permission);
- }
-
- // write perms
- rv = f.setWritable(group.implies(FsAction.WRITE), false);
- checkReturnValue(rv, p, permission);
- if (group.implies(FsAction.WRITE) != user.implies(FsAction.WRITE)) {
- f.setWritable(user.implies(FsAction.WRITE), true);
- checkReturnValue(rv, p, permission);
- }
-
- // exec perms
- rv = f.setExecutable(group.implies(FsAction.EXECUTE), false);
- checkReturnValue(rv, p, permission);
- if (group.implies(FsAction.EXECUTE) != user.implies(FsAction.EXECUTE)) {
- f.setExecutable(user.implies(FsAction.EXECUTE), true);
- checkReturnValue(rv, p, permission);
- }
+ ) throws IOException {
+ FileUtil.setPermission(pathToFile(p), permission);
}
- private void checkReturnValue(boolean rv, Path p, FsPermission permission)
- throws IOException {
- if (!rv) {
- throw new IOException("Failed to set permissions of path: " + p + " to " +
- String.format("%04o", permission.toShort()));
- }
- }
-
- private void execSetPermission(File f, FsPermission permission)
- throws IOException {
- execCommand(f, Shell.SET_PERMISSION_COMMAND,
- String.format("%04o", permission.toShort()));
- }
-
- private static String execCommand(File f, String... cmd) throws IOException {
- String[] args = new String[cmd.length + 1];
- System.arraycopy(cmd, 0, args, 0, cmd.length);
- args[cmd.length] = f.getCanonicalPath();
- String output = Shell.execCommand(args);
- return output;
- }
}
diff --git a/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java b/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
index d5c4ce4..9ab8095 100644
--- a/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
+++ b/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
@@ -68,12 +68,8 @@
@Override
public void createLogDir(TaskAttemptID taskID,
boolean isCleanup) throws IOException {
- boolean b = TaskLog.createTaskAttemptLogDir(taskID, isCleanup,
- localStorage.getGoodLocalDirs());
- if (!b) {
- LOG.warn("Creation of attempt log dir for " + taskID
- + " failed. Ignoring");
- }
+ TaskLog.createTaskAttemptLogDir(taskID, isCleanup,
+ localStorage.getGoodLocalDirs());
}
/**
diff --git a/src/mapred/org/apache/hadoop/mapred/TaskLog.java b/src/mapred/org/apache/hadoop/mapred/TaskLog.java
index 4b3a303..5d502c2 100644
--- a/src/mapred/org/apache/hadoop/mapred/TaskLog.java
+++ b/src/mapred/org/apache/hadoop/mapred/TaskLog.java
@@ -23,7 +23,6 @@
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -50,7 +49,6 @@
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -92,51 +90,29 @@
* @param taskID attempt-id for which log dir is to be created
* @param isCleanup Is this attempt a cleanup attempt ?
* @param localDirs mapred local directories
- * @return true if attempt log directory creation is succeeded
* @throws IOException
*/
- public static boolean createTaskAttemptLogDir(TaskAttemptID taskID,
+ public static void createTaskAttemptLogDir(TaskAttemptID taskID,
boolean isCleanup, String[] localDirs) throws IOException{
String cleanupSuffix = isCleanup ? ".cleanup" : "";
String strAttemptLogDir = getTaskAttemptLogDir(taskID,
cleanupSuffix, localDirs);
File attemptLogDir = new File(strAttemptLogDir);
- boolean isSucceeded = attemptLogDir.mkdirs();
- if(isSucceeded) {
- String strLinkAttemptLogDir = getJobDir(
- taskID.getJobID()).getAbsolutePath() + File.separatorChar +
- taskID.toString() + cleanupSuffix;
- if (FileUtil.symLink(strAttemptLogDir, strLinkAttemptLogDir) != 0) {
- LOG.warn("Creation of symlink to attempt log dir failed.");
- isSucceeded = false;
- }
-
- File linkAttemptLogDir = new File(strLinkAttemptLogDir);
- // Set permissions for job dir in userlogs
- if (!Localizer.PermissionsHandler.setPermissions(
- linkAttemptLogDir.getParentFile(),
- Localizer.PermissionsHandler.sevenZeroZero)) {
- LOG.warn("Setting permissions to "
- + linkAttemptLogDir.getParentFile() + " failed.");
- isSucceeded = false;
- }
- //Set permissions for target attempt log dir
- if (!Localizer.PermissionsHandler.setPermissions(attemptLogDir,
- Localizer.PermissionsHandler.sevenZeroZero)) {
- LOG.warn("Setting permissions to the real attempt log dir "
- + attemptLogDir + " failed.");
- isSucceeded = false;
- }
- //Set permissions for target job log dir
- if (!Localizer.PermissionsHandler.setPermissions(
- attemptLogDir.getParentFile(),
- Localizer.PermissionsHandler.sevenZeroZero)) {
- LOG.warn("Setting permissions to the real job log dir "
- + attemptLogDir.getParentFile() + " failed.");
- isSucceeded = false;
- }
+ if (!attemptLogDir.mkdirs()) {
+ throw new IOException("Creation of " + attemptLogDir + " failed.");
}
- return isSucceeded;
+ String strLinkAttemptLogDir =
+ getJobDir(taskID.getJobID()).getAbsolutePath() + File.separatorChar +
+ taskID.toString() + cleanupSuffix;
+ if (FileUtil.symLink(strAttemptLogDir, strLinkAttemptLogDir) != 0) {
+ throw new IOException("Creation of symlink from " +
+ strLinkAttemptLogDir + " to " + strAttemptLogDir +
+ " failed.");
+ }
+
+ //Set permissions for target attempt log dir
+ FsPermission userOnly = new FsPermission((short) 0700);
+ FileUtil.setPermission(attemptLogDir, userOnly);
}
/**
diff --git a/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java b/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
index 3339538..1a564ec 100644
--- a/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
+++ b/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
@@ -28,10 +28,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.TaskController;
-import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.TaskTracker;
-import org.apache.hadoop.mapreduce.JobID;
/**
*
@@ -98,51 +96,6 @@
}
/**
- * Set permission on the given file path using the specified permissions
- * information. We use java api to set permission instead of spawning chmod
- * processes. This saves a lot of time. Using this, one can set all possible
- * combinations of permissions for the owner of the file. But permissions
- * for the group and all others can only be set together, i.e. permissions
- * for group cannot be set different from those for others and vice versa.
- *
- * This method should satisfy the needs of most of the applications. For
- * those it doesn't, {@link FileUtil#chmod} can be used.
- *
- * @param f file path
- * @param pInfo permissions information
- * @return true if success, false otherwise
- */
- public static boolean setPermissions(File f, PermissionsInfo pInfo) {
- if (pInfo == null) {
- LOG.debug(" PermissionsInfo is null, returning.");
- return true;
- }
-
- LOG.debug("Setting permission for " + f.getAbsolutePath());
-
- boolean ret = true;
-
- // Clear all the flags
- ret = f.setReadable(false, false) && ret;
- ret = f.setWritable(false, false) && ret;
- ret = f.setExecutable(false, false) && ret;
-
- ret = f.setReadable(pInfo.readPermissions, pInfo.readPermsOwnerOnly);
- LOG.debug("Readable status for " + f + " set to " + ret);
- ret =
- f.setWritable(pInfo.writePermissions, pInfo.writePermsOwnerOnly)
- && ret;
- LOG.debug("Writable status for " + f + " set to " + ret);
- ret =
- f.setExecutable(pInfo.executablePermissions,
- pInfo.executePermsOwnerOnly)
- && ret;
-
- LOG.debug("Executable status for " + f + " set to " + ret);
- return ret;
- }
-
- /**
* Permissions rwxr_xr_x
*/
public static final PermissionsInfo sevenFiveFive =
@@ -210,9 +163,8 @@
if (fs.exists(userDir) || fs.mkdirs(userDir)) {
// Set permissions on the user-directory
- PermissionsHandler.setPermissions(
- new File(userDir.toUri().getPath()),
- PermissionsHandler.sevenZeroZero);
+ FsPermission userOnly = new FsPermission((short) 0700);
+ FileUtil.setPermission(new File(userDir.toUri().getPath()), userOnly);
userDirStatus = true;
// Set up the jobcache directory
@@ -220,8 +172,7 @@
new File(localDir, TaskTracker.getJobCacheSubdir(user));
if (jobCacheDir.exists() || jobCacheDir.mkdirs()) {
// Set permissions on the jobcache-directory
- PermissionsHandler.setPermissions(jobCacheDir,
- PermissionsHandler.sevenZeroZero);
+ FileUtil.setPermission(jobCacheDir, userOnly);
jobCacheDirStatus = true;
} else {
LOG.warn("Unable to create job cache directory : "
@@ -233,8 +184,7 @@
new File(localDir, TaskTracker.getPrivateDistributedCacheDir(user));
if (distributedCacheDir.exists() || distributedCacheDir.mkdirs()) {
// Set permissions on the distcache-directory
- PermissionsHandler.setPermissions(distributedCacheDir,
- PermissionsHandler.sevenZeroZero);
+ FileUtil.setPermission(distributedCacheDir, userOnly);
distributedCacheDirStatus = true;
} else {
LOG.warn("Unable to create distributed-cache directory : "
@@ -266,52 +216,6 @@
}
/**
- * Prepare the job directories for a given job. To be called by the job
- * localization code, only if the job is not already localized.
- *
- * <br>
- * Here, we set 700 permissions on the job directories created on all disks.
- * This we do so as to avoid any misuse by other users till the time
- * {@link TaskController#initializeJob} is run at a
- * later time to set proper private permissions on the job directories. <br>
- *
- * @param user
- * @param jobId
- * @throws IOException
- */
- public void initializeJobDirs(String user, JobID jobId)
- throws IOException {
- boolean initJobDirStatus = false;
- String jobDirPath = TaskTracker.getLocalJobDir(user, jobId.toString());
- for (String localDir : localDirs) {
- Path jobDir = new Path(localDir, jobDirPath);
- if (fs.exists(jobDir)) {
- // this will happen on a partial execution of localizeJob. Sometimes
- // copying job.xml to the local disk succeeds but copying job.jar might
- // throw out an exception. We should clean up and then try again.
- fs.delete(jobDir, true);
- }
-
- boolean jobDirStatus = fs.mkdirs(jobDir);
- if (!jobDirStatus) {
- LOG.warn("Not able to create job directory " + jobDir.toString());
- }
-
- initJobDirStatus = initJobDirStatus || jobDirStatus;
-
- // job-dir has to be private to the TT
- Localizer.PermissionsHandler.setPermissions(new File(jobDir.toUri()
- .getPath()), Localizer.PermissionsHandler.sevenZeroZero);
- }
-
- if (!initJobDirStatus) {
- throw new IOException("Not able to initialize job directories "
- + "in any of the configured local directories for job "
- + jobId.toString());
- }
- }
-
- /**
* Create taskDirs on all the disks. Otherwise, in some cases, like when
* LinuxTaskController is in use, child might wish to balance load across
* disks but cannot itself create attempt directory because of the fact that
@@ -347,22 +251,4 @@
+ attemptId);
}
}
-
- /**
- * Create job log directory and set appropriate permissions for the directory.
- *
- * @param jobId
- */
- public void initializeJobLogDir(JobID jobId) {
- File jobUserLogDir = TaskLog.getJobDir(jobId);
- if (!jobUserLogDir.exists()) {
- boolean ret = jobUserLogDir.mkdirs();
- if (!ret) {
- LOG.warn("Could not create job user log directory: " + jobUserLogDir);
- return;
- }
- }
- Localizer.PermissionsHandler.setPermissions(jobUserLogDir,
- Localizer.PermissionsHandler.sevenZeroZero);
- }
}