blob: 0802b0337860a01f03d00c09fb0750df6c8e7f4a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.filecache.TaskDistributedCacheManager;
import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapred.QueueManager.QueueACL;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.RunJar;
/**
* Internal class responsible for initializing the job, not intended for users.
* Creates the following hierarchy:
* <li>$mapred.local.dir/taskTracker/$user</li>
* <li>$mapred.local.dir/taskTracker/$user/jobcache</li>
* <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/work</li>
* <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/jars</li>
* <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/jars/job.jar</li>
* <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/job.xml</li>
* <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/jobToken</li>
* <li>$mapred.local.dir/taskTracker/$user/distcache</li>
*/
public class JobLocalizer {
static final Log LOG = LogFactory.getLog(JobLocalizer.class);
private static final FsPermission urwx =
FsPermission.createImmutable((short) 0700);
private static final FsPermission urwx_gx =
FsPermission.createImmutable((short) 0710);
private static final FsPermission urw_gr =
FsPermission.createImmutable((short) 0640);
private final String user;
private final String jobid;
private final FileSystem lfs;
private final List<Path> localDirs;
private final LocalDirAllocator lDirAlloc;
private final JobConf ttConf;
private final String JOBDIR;
private final String DISTDIR;
private final String WORKDIR;
private final String JARDST;
private final String JOBCONF;
private final String JOBTOKEN;
private static final String JOB_LOCAL_CTXT = "mapred.job.local.dir";
public JobLocalizer(JobConf ttConf, String user, String jobid)
throws IOException {
this(ttConf, user, jobid,
ttConf.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
}
public JobLocalizer(JobConf ttConf, String user, String jobid,
String... localDirs) throws IOException {
if (null == user) {
throw new IOException("Cannot initialize for null user");
}
this.user = user;
if (null == jobid) {
throw new IOException("Cannot initialize for null jobid");
}
this.jobid = jobid;
this.ttConf = ttConf;
lfs = FileSystem.getLocal(ttConf).getRaw();
this.localDirs = createPaths(user, localDirs);
ttConf.setStrings(JOB_LOCAL_CTXT, localDirs);
Collections.shuffle(this.localDirs);
lDirAlloc = new LocalDirAllocator(JOB_LOCAL_CTXT);
JOBDIR = TaskTracker.JOBCACHE + Path.SEPARATOR + jobid;
DISTDIR = JOBDIR + "/" + TaskTracker.DISTCACHEDIR;
WORKDIR = JOBDIR + "/work";
JARDST = JOBDIR + "/" + TaskTracker.JARSDIR + "/job.jar";
JOBCONF = JOBDIR + "/" + TaskTracker.JOBFILE;
JOBTOKEN = JOBDIR + "/" + TaskTracker.JOB_TOKEN_FILE;
}
private static List<Path> createPaths(String user, final String[] str)
throws IOException {
if (null == str || 0 == str.length) {
throw new IOException("mapred.local.dir contains no entries");
}
final List<Path> ret = new ArrayList<Path>(str.length);
for (int i = 0; i < str.length; ++i) {
final Path p = new Path(str[i], TaskTracker.getUserDir(user));
ret.add(p);
str[i] = p.toString();
}
return ret;
}
public void createLocalDirs() throws IOException {
boolean userDirStatus = false;
// create all directories as rwx------
for (Path localDir : localDirs) {
// create $mapred.local.dir/taskTracker/$user
if (!lfs.mkdirs(localDir, urwx)) {
LOG.warn("Unable to create the user directory : " + localDir);
continue;
}
userDirStatus = true;
}
if (!userDirStatus) {
throw new IOException("Not able to initialize user directories "
+ "in any of the configured local directories for user " + user);
}
}
/**
* Initialize the local directories for a particular user on this TT. This
* involves creation and setting permissions of the following directories
* <ul>
* <li>$mapred.local.dir/taskTracker/$user</li>
* <li>$mapred.local.dir/taskTracker/$user/jobcache</li>
* <li>$mapred.local.dir/taskTracker/$user/distcache</li>
* </ul>
*/
public void createUserDirs() throws IOException {
LOG.info("Initializing user " + user + " on this TT.");
boolean jobCacheDirStatus = false;
boolean distributedCacheDirStatus = false;
// create all directories as rwx------
for (Path localDir : localDirs) {
// create $mapred.local.dir/taskTracker/$user/jobcache
final Path jobDir =
new Path(localDir, TaskTracker.JOBCACHE);
if (!lfs.mkdirs(jobDir, urwx)) {
LOG.warn("Unable to create job cache directory : " + jobDir);
} else {
jobCacheDirStatus = true;
}
// create $mapred.local.dir/taskTracker/$user/distcache
final Path distDir =
new Path(localDir, TaskTracker.DISTCACHEDIR);
if (!lfs.mkdirs(distDir, urwx)) {
LOG.warn("Unable to create distributed-cache directory : " + distDir);
} else {
distributedCacheDirStatus = true;
}
}
if (!jobCacheDirStatus) {
throw new IOException("Not able to initialize job-cache directories "
+ "in any of the configured local directories for user " + user);
}
if (!distributedCacheDirStatus) {
throw new IOException(
"Not able to initialize distributed-cache directories "
+ "in any of the configured local directories for user "
+ user);
}
}
/**
* Prepare the job directories for a given job. To be called by the job
* localization code, only if the job is not already localized.
* <br>
* Here, we set 700 permissions on the job directories created on all disks.
* This we do so as to avoid any misuse by other users till the time
* {@link TaskController#initializeJob} is run at a
* later time to set proper private permissions on the job directories. <br>
*/
public void createJobDirs() throws IOException {
boolean initJobDirStatus = false;
for (Path localDir : localDirs) {
Path fullJobDir = new Path(localDir, JOBDIR);
if (lfs.exists(fullJobDir)) {
// this will happen on a partial execution of localizeJob. Sometimes
// copying job.xml to the local disk succeeds but copying job.jar might
// throw out an exception. We should clean up and then try again.
lfs.delete(fullJobDir, true);
}
// create $mapred.local.dir/taskTracker/$user/jobcache/$jobid
if (!lfs.mkdirs(fullJobDir, urwx)) {
LOG.warn("Not able to create job directory " + fullJobDir.toString());
} else {
initJobDirStatus = true;
}
}
if (!initJobDirStatus) {
throw new IOException("Not able to initialize job directories "
+ "in any of the configured local directories for job "
+ jobid.toString());
}
}
/**
* Create job log directory and set appropriate permissions for the directory.
*/
public void initializeJobLogDir() throws IOException {
Path jobUserLogDir = new Path(TaskLog.getJobDir(jobid).toURI().toString());
if (!lfs.mkdirs(jobUserLogDir, urwx_gx)) {
throw new IOException(
"Could not create job user log directory: " + jobUserLogDir);
}
}
/**
* Download the job jar file from FS to the local file system and unjar it.
* Set the local jar file in the passed configuration.
*
* @param localJobConf
* @throws IOException
*/
private void localizeJobJarFile(JobConf localJobConf) throws IOException {
// copy Jar file to the local FS and unjar it.
String jarFile = localJobConf.getJar();
FileStatus status = null;
long jarFileSize = -1;
if (jarFile != null) {
Path jarFilePath = new Path(jarFile);
FileSystem userFs = jarFilePath.getFileSystem(localJobConf);
try {
status = userFs.getFileStatus(jarFilePath);
jarFileSize = status.getLen();
} catch (FileNotFoundException fe) {
jarFileSize = -1;
}
// Here we check for five times the size of jarFileSize to accommodate for
// unjarring the jar file in the jars directory
Path localJarFile =
lDirAlloc.getLocalPathForWrite(JARDST, 5 * jarFileSize, ttConf);
//Download job.jar
userFs.copyToLocalFile(jarFilePath, localJarFile);
localJobConf.setJar(localJarFile.toString());
// Also un-jar the job.jar files. We un-jar it so that classes inside
// sub-directories, for e.g., lib/, classes/ are available on class-path
RunJar.unJar(new File(localJarFile.toString()),
new File(localJarFile.getParent().toString()));
FileUtil.chmod(localJarFile.getParent().toString(), "ugo+rx", true);
}
}
/**
* The permissions to use for the private distributed cache objects.
* It is already protected by the user directory, so keep the group and other
* the same so that LocalFileSystem will use the java File methods to
* set permission.
*/
private static final FsPermission privateCachePerms =
FsPermission.createImmutable((short) 0755);
/**
* Given a list of objects, download each one.
* @param conf the job's configuration
* @param sources the list of objects to download from
* @param dests the list of paths to download them to
* @param times the desired modification times
* @param isPublic are the objects in the public cache?
* @param isArchive are these archive files?
* @throws IOException
* @return for archives, return the list of each of the sizes.
*/
private static long[] downloadPrivateCacheObjects(Configuration conf,
URI[] sources,
Path[] dests,
long[] times,
boolean[] isPublic,
boolean isArchive
) throws IOException {
if (null == sources && null == dests && null == times && null == isPublic) {
return null;
}
if (sources.length != dests.length ||
sources.length != times.length ||
sources.length != isPublic.length) {
throw new IOException("Distributed cache entry arrays have different " +
"lengths: " + sources.length + ", " + dests.length +
", " + times.length + ", " + isPublic.length);
}
long[] result = new long[sources.length];
for(int i=0; i < sources.length; i++) {
// public objects are already downloaded by the Task Tracker, we
// only need to handle the private ones here
if (!isPublic[i]) {
result[i] =
TrackerDistributedCacheManager.downloadCacheObject(conf, sources[i],
dests[i],
times[i],
isArchive,
privateCachePerms);
}
}
return result;
}
/**
* Download the parts of the distributed cache that are private.
* @param conf the job's configuration
* @throws IOException
* @return the size of the archive objects
*/
public static long[] downloadPrivateCache(Configuration conf) throws IOException {
downloadPrivateCacheObjects(conf,
DistributedCache.getCacheFiles(conf),
DistributedCache.getLocalCacheFiles(conf),
DistributedCache.getFileTimestamps(conf),
TrackerDistributedCacheManager.
getFileVisibilities(conf),
false);
return
downloadPrivateCacheObjects(conf,
DistributedCache.getCacheArchives(conf),
DistributedCache.getLocalCacheArchives(conf),
DistributedCache.getArchiveTimestamps(conf),
TrackerDistributedCacheManager.
getArchiveVisibilities(conf),
true);
}
public void localizeJobFiles(JobID jobid, JobConf jConf,
Path localJobTokenFile, TaskUmbilicalProtocol taskTracker)
throws IOException, InterruptedException {
localizeJobFiles(jobid, jConf,
lDirAlloc.getLocalPathForWrite(JOBCONF, ttConf), localJobTokenFile,
taskTracker);
}
public void localizeJobFiles(final JobID jobid, JobConf jConf,
Path localJobFile, Path localJobTokenFile,
final TaskUmbilicalProtocol taskTracker)
throws IOException, InterruptedException {
// Download the job.jar for this job from the system FS
localizeJobJarFile(jConf);
jConf.set(JOB_LOCAL_CTXT, ttConf.get(JOB_LOCAL_CTXT));
//update the config some more
jConf.set(TokenCache.JOB_TOKENS_FILENAME, localJobTokenFile.toString());
jConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
ttConf.get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
TaskTracker.resetNumTasksPerJvm(jConf);
//setup the distributed cache
final long[] sizes = downloadPrivateCache(jConf);
if (sizes != null) {
//the following doAs is required because the DefaultTaskController
//calls the localizeJobFiles method in the context of the TaskTracker
//process. The JVM authorization check would fail without this
//doAs. In the LinuxTC case, this doesn't harm.
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(jobid.toString());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
public Object run() throws IOException {
taskTracker.updatePrivateDistributedCacheSizes(jobid, sizes);
return null;
}
});
}
// Create job-acls.xml file in job userlog dir and write the needed
// info for authorization of users for viewing task logs of this job.
writeJobACLs(jConf, new Path(TaskLog.getJobDir(jobid).toURI().toString()));
//write the updated jobConf file in the job directory
JobLocalizer.writeLocalJobFile(localJobFile, jConf);
}
/**
* Creates job-acls.xml under the given directory logDir and writes
* job-view-acl, queue-admins-acl, jobOwner name and queue name into this
* file.
* queue name is the queue to which the job was submitted to.
* queue-admins-acl is the queue admins ACL of the queue to which this
* job was submitted to.
* @param conf job configuration
* @param logDir job userlog dir
* @throws IOException
*/
private void writeJobACLs(JobConf conf, Path logDir) throws IOException {
JobConf aclConf = new JobConf(false);
// set the job view acl in aclConf
String jobViewACL = conf.get(JobContext.JOB_ACL_VIEW_JOB, " ");
aclConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACL);
// set the job queue name in aclConf
String queue = conf.getQueueName();
aclConf.setQueueName(queue);
// set the queue admins acl in aclConf
String qACLName = QueueManager.toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName());
String queueAdminsACL = conf.get(qACLName, " ");
aclConf.set(qACLName, queueAdminsACL);
// set jobOwner as user.name in aclConf
aclConf.set("user.name", user);
OutputStream out = null;
Path aclFile = new Path(logDir, TaskTracker.jobACLsFile);
try {
out = lfs.create(aclFile);
aclConf.writeXml(out);
} finally {
IOUtils.cleanup(LOG, out);
}
lfs.setPermission(aclFile, urw_gr);
}
public void createWorkDir(JobConf jConf) throws IOException {
// create $mapred.local.dir/taskTracker/$user/jobcache/$jobid/work
final Path workDir = lDirAlloc.getLocalPathForWrite(WORKDIR, ttConf);
if (!lfs.mkdirs(workDir)) {
throw new IOException("Mkdirs failed to create "
+ workDir.toString());
}
jConf.set(TaskTracker.JOB_LOCAL_DIR, workDir.toUri().getPath());
}
public Path findCredentials() throws IOException {
return lDirAlloc.getLocalPathToRead(JOBTOKEN, ttConf);
}
public int runSetup(String user, String jobid, Path localJobTokenFile,
TaskUmbilicalProtocol taskTracker) throws IOException,
InterruptedException {
// load user credentials, configuration
// ASSUME
// let $x = $mapred.local.dir
// forall $x, exists $x/$user
// exists $x/$user/jobcache/$jobid/job.xml
// exists $x/$user/jobcache/$jobid/jobToken
// exists $logdir/userlogs/$jobid
final Path localJobFile = lDirAlloc.getLocalPathToRead(JOBCONF, ttConf);
final JobConf cfgJob = new JobConf(localJobFile);
createWorkDir(cfgJob);
localizeJobFiles(JobID.forName(jobid), cfgJob, localJobFile,
localJobTokenFile, taskTracker);
// $mapred.local.dir/taskTracker/$user/distcache
return 0;
}
public static void main(String[] argv)
throws IOException, InterruptedException {
// $logdir
// let $x = $root/tasktracker for some $mapred.local.dir
// create $x/$user/jobcache/$jobid/work
// fetch $x/$user/jobcache/$jobid/jars/job.jar
// setup $x/$user/distcache
// verify $x/distcache
// write $x/$user/jobcache/$jobid/job.xml
final String user = argv[0];
final String jobid = argv[1];
final InetSocketAddress ttAddr =
new InetSocketAddress(argv[2], Integer.parseInt(argv[3]));
final String uid = UserGroupInformation.getCurrentUser().getShortUserName();
if (!user.equals(uid)) {
LOG.warn("Localization running as " + uid + " not " + user);
}
// Pull in user's tokens to complete setup
final JobConf conf = new JobConf();
final JobLocalizer localizer =
new JobLocalizer(conf, user, jobid);
final Path jobTokenFile = localizer.findCredentials();
final Credentials creds = TokenCache.loadTokens(
jobTokenFile.toUri().toString(), conf);
LOG.debug("Loaded tokens from " + jobTokenFile);
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
for (Token<? extends TokenIdentifier> token : creds.getAllTokens()) {
ugi.addToken(token);
}
UserGroupInformation ugiJob = UserGroupInformation.createRemoteUser(jobid);
Token<JobTokenIdentifier> jt = TokenCache.getJobToken(creds);
jt.setService(new Text(ttAddr.getAddress().getHostAddress() + ":"
+ ttAddr.getPort()));
ugiJob.addToken(jt);
final TaskUmbilicalProtocol taskTracker =
ugiJob.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
public TaskUmbilicalProtocol run() throws IOException {
TaskUmbilicalProtocol taskTracker =
(TaskUmbilicalProtocol) RPC.getProxy(TaskUmbilicalProtocol.class,
TaskUmbilicalProtocol.versionID,
ttAddr, conf);
return taskTracker;
}
});
System.exit(
ugi.doAs(new PrivilegedExceptionAction<Integer>() {
public Integer run() {
try {
return localizer.runSetup(user, jobid, jobTokenFile, taskTracker);
} catch (Throwable e) {
e.printStackTrace(System.out);
return -1;
}
}
}));
}
/**
* Write the task specific job-configuration file.
* @throws IOException
*/
public static void writeLocalJobFile(Path jobFile, JobConf conf)
throws IOException {
FileSystem localFs = FileSystem.getLocal(conf);
localFs.delete(jobFile);
OutputStream out = null;
try {
out = FileSystem.create(localFs, jobFile, urw_gr);
conf.writeXml(out);
} finally {
IOUtils.cleanup(LOG, out);
}
}
}