| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.mapreduce; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.LinkedHashMap; |
| import java.util.Map; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; |
| import org.apache.hadoop.mapreduce.filecache.DistributedCache; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.URL; |
| import org.apache.hadoop.yarn.client.api.SharedCacheClient; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| /** |
| * This class is responsible for uploading resources from the client to HDFS |
| * that are associated with a MapReduce job. |
| */ |
| @Private |
| @Unstable |
| class JobResourceUploader { |
| protected static final Log LOG = LogFactory.getLog(JobResourceUploader.class); |
| private final boolean useWildcard; |
| private final FileSystem jtFs; |
| private SharedCacheClient scClient = null; |
| private SharedCacheConfig scConfig = new SharedCacheConfig(); |
| private ApplicationId appId = null; |
| |
| JobResourceUploader(FileSystem submitFs, boolean useWildcard) { |
| this.jtFs = submitFs; |
| this.useWildcard = useWildcard; |
| } |
| |
| private void initSharedCache(JobID jobid, Configuration conf) { |
| this.scConfig.init(conf); |
| if (this.scConfig.isSharedCacheEnabled()) { |
| this.scClient = createSharedCacheClient(conf); |
| appId = jobIDToAppId(jobid); |
| } |
| } |
| |
| /* |
| * We added this method so that we could do the conversion between JobId and |
| * ApplicationId for the shared cache client. This logic is very similar to |
| * the org.apache.hadoop.mapreduce.TypeConverter#toYarn method. We don't use |
| * that because mapreduce-client-core can not depend on |
| * mapreduce-client-common. |
| */ |
| private ApplicationId jobIDToAppId(JobID jobId) { |
| return ApplicationId.newInstance(Long.parseLong(jobId.getJtIdentifier()), |
| jobId.getId()); |
| } |
| |
| private void stopSharedCache() { |
| if (scClient != null) { |
| scClient.stop(); |
| scClient = null; |
| } |
| } |
| |
| /** |
| * Create, initialize and start a new shared cache client. |
| */ |
| @VisibleForTesting |
| protected SharedCacheClient createSharedCacheClient(Configuration conf) { |
| SharedCacheClient scc = SharedCacheClient.createSharedCacheClient(); |
| scc.init(conf); |
| scc.start(); |
| return scc; |
| } |
| |
| /** |
| * Upload and configure files, libjars, jobjars, and archives pertaining to |
| * the passed job. |
| * <p> |
| * This client will use the shared cache for libjars, files, archives and |
| * jobjars if it is enabled. When shared cache is enabled, it will try to use |
| * the shared cache and fall back to the default behavior when the scm isn't |
| * available. |
| * <p> |
| * 1. For the resources that have been successfully shared, we will continue |
| * to use them in a shared fashion. |
| * <p> |
| * 2. For the resources that weren't in the cache and need to be uploaded by |
| * NM, we won't ask NM to upload them. |
| * |
| * @param job the job containing the files to be uploaded |
| * @param submitJobDir the submission directory of the job |
| * @throws IOException |
| */ |
| public void uploadResources(Job job, Path submitJobDir) throws IOException { |
| try { |
| initSharedCache(job.getJobID(), job.getConfiguration()); |
| uploadResourcesInternal(job, submitJobDir); |
| } finally { |
| stopSharedCache(); |
| } |
| } |
| |
| private void uploadResourcesInternal(Job job, Path submitJobDir) |
| throws IOException { |
| Configuration conf = job.getConfiguration(); |
| short replication = |
| (short) conf.getInt(Job.SUBMIT_REPLICATION, |
| Job.DEFAULT_SUBMIT_REPLICATION); |
| |
| if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) { |
| LOG.warn("Hadoop command-line option parsing not performed. " |
| + "Implement the Tool interface and execute your application " |
| + "with ToolRunner to remedy this."); |
| } |
| |
| // |
| // Figure out what fs the JobTracker is using. Copy the |
| // job to it, under a temporary name. This allows DFS to work, |
| // and under the local fs also provides UNIX-like object loading |
| // semantics. (that is, if the job file is deleted right after |
| // submission, we can still run the submission to completion) |
| // |
| |
| // Create a number of filenames in the JobTracker's fs namespace |
| LOG.debug("default FileSystem: " + jtFs.getUri()); |
| if (jtFs.exists(submitJobDir)) { |
| throw new IOException("Not submitting job. Job directory " + submitJobDir |
| + " already exists!! This is unexpected.Please check what's there in" |
| + " that directory"); |
| } |
| // Create the submission directory for the MapReduce job. |
| submitJobDir = jtFs.makeQualified(submitJobDir); |
| submitJobDir = new Path(submitJobDir.toUri().getPath()); |
| FsPermission mapredSysPerms = |
| new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); |
| mkdirs(jtFs, submitJobDir, mapredSysPerms); |
| |
| // Get the resources that have been added via command line arguments in the |
| // GenericOptionsParser (i.e. files, libjars, archives). |
| Collection<String> files = conf.getStringCollection("tmpfiles"); |
| Collection<String> libjars = conf.getStringCollection("tmpjars"); |
| Collection<String> archives = conf.getStringCollection("tmparchives"); |
| String jobJar = job.getJar(); |
| |
| // Merge resources that have been programmatically specified for the shared |
| // cache via the Job API. |
| files.addAll(conf.getStringCollection(MRJobConfig.FILES_FOR_SHARED_CACHE)); |
| libjars.addAll(conf.getStringCollection( |
| MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE)); |
| archives.addAll(conf |
| .getStringCollection(MRJobConfig.ARCHIVES_FOR_SHARED_CACHE)); |
| |
| |
| Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>(); |
| checkLocalizationLimits(conf, files, libjars, archives, jobJar, statCache); |
| |
| Map<String, Boolean> fileSCUploadPolicies = |
| new LinkedHashMap<String, Boolean>(); |
| Map<String, Boolean> archiveSCUploadPolicies = |
| new LinkedHashMap<String, Boolean>(); |
| |
| uploadFiles(job, files, submitJobDir, mapredSysPerms, replication, |
| fileSCUploadPolicies, statCache); |
| uploadLibJars(job, libjars, submitJobDir, mapredSysPerms, replication, |
| fileSCUploadPolicies, statCache); |
| uploadArchives(job, archives, submitJobDir, mapredSysPerms, replication, |
| archiveSCUploadPolicies, statCache); |
| uploadJobJar(job, jobJar, submitJobDir, replication, statCache); |
| addLog4jToDistributedCache(job, submitJobDir); |
| |
| // Note, we do not consider resources in the distributed cache for the |
| // shared cache at this time. Only resources specified via the |
| // GenericOptionsParser or the jobjar. |
| Job.setFileSharedCacheUploadPolicies(conf, fileSCUploadPolicies); |
| Job.setArchiveSharedCacheUploadPolicies(conf, archiveSCUploadPolicies); |
| |
| // set the timestamps of the archives and files |
| // set the public/private visibility of the archives and files |
| ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf, |
| statCache); |
| // get DelegationToken for cached file |
| ClientDistributedCacheManager.getDelegationTokens(conf, |
| job.getCredentials()); |
| } |
| |
| @VisibleForTesting |
| void uploadFiles(Job job, Collection<String> files, |
| Path submitJobDir, FsPermission mapredSysPerms, short submitReplication, |
| Map<String, Boolean> fileSCUploadPolicies, Map<URI, FileStatus> statCache) |
| throws IOException { |
| Configuration conf = job.getConfiguration(); |
| Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir); |
| if (!files.isEmpty()) { |
| mkdirs(jtFs, filesDir, mapredSysPerms); |
| for (String tmpFile : files) { |
| URI tmpURI = null; |
| try { |
| tmpURI = new URI(tmpFile); |
| } catch (URISyntaxException e) { |
| throw new IllegalArgumentException("Error parsing files argument." |
| + " Argument must be a valid URI: " + tmpFile, e); |
| } |
| Path tmp = new Path(tmpURI); |
| URI newURI = null; |
| boolean uploadToSharedCache = false; |
| if (scConfig.isSharedCacheFilesEnabled()) { |
| newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true); |
| if (newURI == null) { |
| uploadToSharedCache = true; |
| } |
| } |
| |
| if (newURI == null) { |
| Path newPath = |
| copyRemoteFiles(filesDir, tmp, conf, submitReplication); |
| try { |
| newURI = getPathURI(newPath, tmpURI.getFragment()); |
| } catch (URISyntaxException ue) { |
| // should not throw a uri exception |
| throw new IOException( |
| "Failed to create a URI (URISyntaxException) for the" |
| + " remote path " + newPath |
| + ". This was based on the files parameter: " + tmpFile, |
| ue); |
| } |
| } |
| |
| job.addCacheFile(newURI); |
| if (scConfig.isSharedCacheFilesEnabled()) { |
| fileSCUploadPolicies.put(newURI.toString(), uploadToSharedCache); |
| } |
| } |
| } |
| } |
| |
| // Suppress warning for use of DistributedCache (it is everywhere). |
| @SuppressWarnings("deprecation") |
| @VisibleForTesting |
| void uploadLibJars(Job job, Collection<String> libjars, Path submitJobDir, |
| FsPermission mapredSysPerms, short submitReplication, |
| Map<String, Boolean> fileSCUploadPolicies, Map<URI, FileStatus> statCache) |
| throws IOException { |
| Configuration conf = job.getConfiguration(); |
| Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir); |
| if (!libjars.isEmpty()) { |
| mkdirs(jtFs, libjarsDir, mapredSysPerms); |
| Collection<URI> libjarURIs = new LinkedList<>(); |
| boolean foundFragment = false; |
| for (String tmpjars : libjars) { |
| URI tmpURI = null; |
| try { |
| tmpURI = new URI(tmpjars); |
| } catch (URISyntaxException e) { |
| throw new IllegalArgumentException("Error parsing libjars argument." |
| + " Argument must be a valid URI: " + tmpjars, e); |
| } |
| Path tmp = new Path(tmpURI); |
| URI newURI = null; |
| boolean uploadToSharedCache = false; |
| boolean fromSharedCache = false; |
| if (scConfig.isSharedCacheLibjarsEnabled()) { |
| newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true); |
| if (newURI == null) { |
| uploadToSharedCache = true; |
| } else { |
| fromSharedCache = true; |
| } |
| } |
| |
| if (newURI == null) { |
| Path newPath = |
| copyRemoteFiles(libjarsDir, tmp, conf, submitReplication); |
| try { |
| newURI = getPathURI(newPath, tmpURI.getFragment()); |
| } catch (URISyntaxException ue) { |
| // should not throw a uri exception |
| throw new IOException( |
| "Failed to create a URI (URISyntaxException) for the" |
| + " remote path " + newPath |
| + ". This was based on the libjar parameter: " + tmpjars, |
| ue); |
| } |
| } |
| |
| if (!foundFragment) { |
| // We do not count shared cache paths containing fragments as a |
| // "foundFragment." This is because these resources are not in the |
| // staging directory and will be added to the distributed cache |
| // separately. |
| foundFragment = (newURI.getFragment() != null) && !fromSharedCache; |
| } |
| DistributedCache.addFileToClassPath(new Path(newURI.getPath()), conf, |
| jtFs, false); |
| if (fromSharedCache) { |
| // We simply add this URI to the distributed cache. It will not come |
| // from the staging directory (it is in the shared cache), so we |
| // must add it to the cache regardless of the wildcard feature. |
| DistributedCache.addCacheFile(newURI, conf); |
| } else { |
| libjarURIs.add(newURI); |
| } |
| |
| if (scConfig.isSharedCacheLibjarsEnabled()) { |
| fileSCUploadPolicies.put(newURI.toString(), uploadToSharedCache); |
| } |
| } |
| |
| if (useWildcard && !foundFragment) { |
| // Add the whole directory to the cache using a wild card |
| Path libJarsDirWildcard = |
| jtFs.makeQualified(new Path(libjarsDir, DistributedCache.WILDCARD)); |
| DistributedCache.addCacheFile(libJarsDirWildcard.toUri(), conf); |
| } else { |
| for (URI uri : libjarURIs) { |
| DistributedCache.addCacheFile(uri, conf); |
| } |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| void uploadArchives(Job job, Collection<String> archives, |
| Path submitJobDir, FsPermission mapredSysPerms, short submitReplication, |
| Map<String, Boolean> archiveSCUploadPolicies, |
| Map<URI, FileStatus> statCache) throws IOException { |
| Configuration conf = job.getConfiguration(); |
| Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir); |
| if (!archives.isEmpty()) { |
| mkdirs(jtFs, archivesDir, mapredSysPerms); |
| for (String tmpArchives : archives) { |
| URI tmpURI; |
| try { |
| tmpURI = new URI(tmpArchives); |
| } catch (URISyntaxException e) { |
| throw new IllegalArgumentException("Error parsing archives argument." |
| + " Argument must be a valid URI: " + tmpArchives, e); |
| } |
| Path tmp = new Path(tmpURI); |
| URI newURI = null; |
| boolean uploadToSharedCache = false; |
| if (scConfig.isSharedCacheArchivesEnabled()) { |
| newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true); |
| if (newURI == null) { |
| uploadToSharedCache = true; |
| } |
| } |
| |
| if (newURI == null) { |
| Path newPath = |
| copyRemoteFiles(archivesDir, tmp, conf, submitReplication); |
| try { |
| newURI = getPathURI(newPath, tmpURI.getFragment()); |
| } catch (URISyntaxException ue) { |
| // should not throw a uri exception |
| throw new IOException( |
| "Failed to create a URI (URISyntaxException) for the" |
| + " remote path " + newPath |
| + ". This was based on the archive parameter: " |
| + tmpArchives, |
| ue); |
| } |
| } |
| |
| job.addCacheArchive(newURI); |
| if (scConfig.isSharedCacheArchivesEnabled()) { |
| archiveSCUploadPolicies.put(newURI.toString(), uploadToSharedCache); |
| } |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| void uploadJobJar(Job job, String jobJar, Path submitJobDir, |
| short submitReplication, Map<URI, FileStatus> statCache) |
| throws IOException { |
| Configuration conf = job.getConfiguration(); |
| if (jobJar != null) { // copy jar to JobTracker's fs |
| // use jar name if job is not named. |
| if ("".equals(job.getJobName())) { |
| job.setJobName(new Path(jobJar).getName()); |
| } |
| Path jobJarPath = new Path(jobJar); |
| URI jobJarURI = jobJarPath.toUri(); |
| Path newJarPath = null; |
| boolean uploadToSharedCache = false; |
| if (jobJarURI.getScheme() == null || |
| jobJarURI.getScheme().equals("file")) { |
| // job jar is on the local file system |
| if (scConfig.isSharedCacheJobjarEnabled()) { |
| // We must have a qualified path for the shared cache client. We can |
| // assume this is for the local filesystem |
| jobJarPath = FileSystem.getLocal(conf).makeQualified(jobJarPath); |
| // Don't add a resource name here because the resource name (i.e. |
| // job.jar directory symlink) will always be hard coded to job.jar for |
| // the job.jar |
| URI newURI = |
| useSharedCache(jobJarPath.toUri(), null, statCache, conf, false); |
| if (newURI == null) { |
| uploadToSharedCache = true; |
| } else { |
| newJarPath = stringToPath(newURI.toString()); |
| // The job jar is coming from the shared cache (i.e. a public |
| // place), so we want the job.jar to have a public visibility. |
| conf.setBoolean(MRJobConfig.JOBJAR_VISIBILITY, true); |
| } |
| } |
| if (newJarPath == null) { |
| newJarPath = JobSubmissionFiles.getJobJar(submitJobDir); |
| copyJar(jobJarPath, newJarPath, submitReplication); |
| } |
| } else { |
| // job jar is in a remote file system |
| if (scConfig.isSharedCacheJobjarEnabled()) { |
| // Don't add a resource name here because the resource name (i.e. |
| // job.jar directory symlink) will always be hard coded to job.jar for |
| // the job.jar |
| URI newURI = useSharedCache(jobJarURI, null, statCache, conf, false); |
| if (newURI == null) { |
| uploadToSharedCache = true; |
| newJarPath = jobJarPath; |
| } else { |
| newJarPath = stringToPath(newURI.toString()); |
| // The job jar is coming from the shared cache (i.e. a public |
| // place), so we want the job.jar to have a public visibility. |
| conf.setBoolean(MRJobConfig.JOBJAR_VISIBILITY, true); |
| } |
| } else { |
| // we don't need to upload the jobjar to the staging directory because |
| // it is already in an accessible place |
| newJarPath = jobJarPath; |
| } |
| } |
| job.setJar(newJarPath.toString()); |
| if (scConfig.isSharedCacheJobjarEnabled()) { |
| conf.setBoolean(MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY, |
| uploadToSharedCache); |
| } |
| } else { |
| LOG.warn("No job jar file set. User classes may not be found. " |
| + "See Job or Job#setJar(String)."); |
| } |
| } |
| |
| /** |
| * Verify that the resources this job is going to localize are within the |
| * localization limits. We count all resources towards these limits regardless |
| * of where they are coming from (i.e. local, distributed cache, or shared |
| * cache). |
| */ |
| @VisibleForTesting |
| void checkLocalizationLimits(Configuration conf, Collection<String> files, |
| Collection<String> libjars, Collection<String> archives, String jobJar, |
| Map<URI, FileStatus> statCache) throws IOException { |
| |
| LimitChecker limitChecker = new LimitChecker(conf); |
| if (!limitChecker.hasLimits()) { |
| // there are no limits set, so we are done. |
| return; |
| } |
| |
| // Get the files and archives that are already in the distributed cache |
| Collection<String> dcFiles = |
| conf.getStringCollection(MRJobConfig.CACHE_FILES); |
| Collection<String> dcArchives = |
| conf.getStringCollection(MRJobConfig.CACHE_ARCHIVES); |
| |
| for (String uri : dcFiles) { |
| explorePath(conf, stringToPath(uri), limitChecker, statCache); |
| } |
| |
| for (String uri : dcArchives) { |
| explorePath(conf, stringToPath(uri), limitChecker, statCache); |
| } |
| |
| for (String uri : files) { |
| explorePath(conf, stringToPath(uri), limitChecker, statCache); |
| } |
| |
| for (String uri : libjars) { |
| explorePath(conf, stringToPath(uri), limitChecker, statCache); |
| } |
| |
| for (String uri : archives) { |
| explorePath(conf, stringToPath(uri), limitChecker, statCache); |
| } |
| |
| if (jobJar != null) { |
| explorePath(conf, stringToPath(jobJar), limitChecker, statCache); |
| } |
| } |
| |
| /** |
| * Convert a String to a Path and gracefully remove fragments/queries if they |
| * exist in the String. |
| */ |
| @VisibleForTesting |
| Path stringToPath(String s) { |
| try { |
| URI uri = new URI(s); |
| return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath()); |
| } catch (URISyntaxException e) { |
| throw new IllegalArgumentException( |
| "Error parsing argument." + " Argument must be a valid URI: " + s, e); |
| } |
| } |
| |
| @VisibleForTesting |
| protected static final String MAX_RESOURCE_ERR_MSG = |
| "This job has exceeded the maximum number of submitted resources"; |
| @VisibleForTesting |
| protected static final String MAX_TOTAL_RESOURCE_MB_ERR_MSG = |
| "This job has exceeded the maximum size of submitted resources"; |
| @VisibleForTesting |
| protected static final String MAX_SINGLE_RESOURCE_MB_ERR_MSG = |
| "This job has exceeded the maximum size of a single submitted resource"; |
| |
| private static class LimitChecker { |
| LimitChecker(Configuration conf) { |
| this.maxNumOfResources = |
| conf.getInt(MRJobConfig.MAX_RESOURCES, |
| MRJobConfig.MAX_RESOURCES_DEFAULT); |
| this.maxSizeMB = |
| conf.getLong(MRJobConfig.MAX_RESOURCES_MB, |
| MRJobConfig.MAX_RESOURCES_MB_DEFAULT); |
| this.maxSizeOfResourceMB = |
| conf.getLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB, |
| MRJobConfig.MAX_SINGLE_RESOURCE_MB_DEFAULT); |
| this.totalConfigSizeBytes = maxSizeMB * 1024 * 1024; |
| this.totalConfigSizeOfResourceBytes = maxSizeOfResourceMB * 1024 * 1024; |
| } |
| |
| private long totalSizeBytes = 0; |
| private int totalNumberOfResources = 0; |
| private long currentMaxSizeOfFileBytes = 0; |
| private final long maxSizeMB; |
| private final int maxNumOfResources; |
| private final long maxSizeOfResourceMB; |
| private final long totalConfigSizeBytes; |
| private final long totalConfigSizeOfResourceBytes; |
| |
| private boolean hasLimits() { |
| return maxNumOfResources > 0 || maxSizeMB > 0 || maxSizeOfResourceMB > 0; |
| } |
| |
| private void addFile(Path p, long fileSizeBytes) throws IOException { |
| totalNumberOfResources++; |
| totalSizeBytes += fileSizeBytes; |
| if (fileSizeBytes > currentMaxSizeOfFileBytes) { |
| currentMaxSizeOfFileBytes = fileSizeBytes; |
| } |
| |
| if (totalConfigSizeBytes > 0 && totalSizeBytes > totalConfigSizeBytes) { |
| throw new IOException(MAX_TOTAL_RESOURCE_MB_ERR_MSG + " (Max: " |
| + maxSizeMB + "MB)."); |
| } |
| |
| if (maxNumOfResources > 0 && |
| totalNumberOfResources > maxNumOfResources) { |
| throw new IOException(MAX_RESOURCE_ERR_MSG + " (Max: " |
| + maxNumOfResources + ")."); |
| } |
| |
| if (totalConfigSizeOfResourceBytes > 0 |
| && currentMaxSizeOfFileBytes > totalConfigSizeOfResourceBytes) { |
| throw new IOException(MAX_SINGLE_RESOURCE_MB_ERR_MSG + " (Max: " |
| + maxSizeOfResourceMB + "MB, Violating resource: " + p + ")."); |
| } |
| } |
| } |
| |
| /** |
| * Recursively explore the given path and enforce the limits for resource |
| * localization. This method assumes that there are no symlinks in the |
| * directory structure. |
| */ |
| private void explorePath(Configuration job, Path p, |
| LimitChecker limitChecker, Map<URI, FileStatus> statCache) |
| throws IOException { |
| Path pathWithScheme = p; |
| if (!pathWithScheme.toUri().isAbsolute()) { |
| // the path does not have a scheme, so we assume it is a path from the |
| // local filesystem |
| FileSystem localFs = FileSystem.getLocal(job); |
| pathWithScheme = localFs.makeQualified(p); |
| } |
| FileStatus status = getFileStatus(statCache, job, pathWithScheme); |
| if (status.isDirectory()) { |
| FileStatus[] statusArray = |
| pathWithScheme.getFileSystem(job).listStatus(pathWithScheme); |
| for (FileStatus s : statusArray) { |
| explorePath(job, s.getPath(), limitChecker, statCache); |
| } |
| } else { |
| limitChecker.addFile(pathWithScheme, status.getLen()); |
| } |
| } |
| |
| @VisibleForTesting |
| FileStatus getFileStatus(Map<URI, FileStatus> statCache, |
| Configuration job, Path p) throws IOException { |
| URI u = p.toUri(); |
| FileStatus status = statCache.get(u); |
| if (status == null) { |
| status = p.getFileSystem(job).getFileStatus(p); |
| statCache.put(u, status); |
| } |
| return status; |
| } |
| |
| /** |
| * Create a new directory in the passed filesystem. This wrapper method exists |
| * so that it can be overridden/stubbed during testing. |
| */ |
| @VisibleForTesting |
| boolean mkdirs(FileSystem fs, Path dir, FsPermission permission) |
| throws IOException { |
| return FileSystem.mkdirs(fs, dir, permission); |
| } |
| |
| // copies a file to the jobtracker filesystem and returns the path where it |
| // was copied to |
| @VisibleForTesting |
| Path copyRemoteFiles(Path parentDir, Path originalPath, |
| Configuration conf, short replication) throws IOException { |
| // check if we do not need to copy the files |
| // is jt using the same file system. |
| // just checking for uri strings... doing no dns lookups |
| // to see if the filesystems are the same. This is not optimal. |
| // but avoids name resolution. |
| |
| FileSystem remoteFs = null; |
| remoteFs = originalPath.getFileSystem(conf); |
| if (FileUtil.compareFs(remoteFs, jtFs)) { |
| return originalPath; |
| } |
| // this might have name collisions. copy will throw an exception |
| // parse the original path to create new path |
| Path newPath = new Path(parentDir, originalPath.getName()); |
| FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, conf); |
| jtFs.setReplication(newPath, replication); |
| jtFs.makeQualified(newPath); |
| return newPath; |
| } |
| |
| /** |
| * Checksum a local resource file and call use for that resource with the scm. |
| */ |
| private URI useSharedCache(URI sourceFile, String resourceName, |
| Map<URI, FileStatus> statCache, Configuration conf, boolean honorFragment) |
| throws IOException { |
| if (scClient == null) { |
| return null; |
| } |
| Path filePath = new Path(sourceFile); |
| if (getFileStatus(statCache, conf, filePath).isDirectory()) { |
| LOG.warn("Shared cache does not support directories" |
| + " (see YARN-6097)." + " Will not upload " + filePath |
| + " to the shared cache."); |
| return null; |
| } |
| |
| String rn = resourceName; |
| if (honorFragment) { |
| if (sourceFile.getFragment() != null) { |
| rn = sourceFile.getFragment(); |
| } |
| } |
| |
| // If for whatever reason, we can't even calculate checksum for |
| // a resource, something is really wrong with the file system; |
| // even non-SCM approach won't work. Let us just throw the exception. |
| String checksum = scClient.getFileChecksum(filePath); |
| URL url = null; |
| try { |
| url = scClient.use(this.appId, checksum); |
| } catch (YarnException e) { |
| LOG.warn("Error trying to contact the shared cache manager," |
| + " disabling the SCMClient for the rest of this job submission", e); |
| /* |
| * If we fail to contact the SCM, we do not use it for the rest of this |
| * JobResourceUploader's life. This prevents us from having to timeout |
| * each time we try to upload a file while the SCM is unavailable. Instead |
| * we timeout/error the first time and quickly revert to the default |
| * behavior without the shared cache. We do this by stopping the shared |
| * cache client and setting it to null. |
| */ |
| stopSharedCache(); |
| } |
| |
| if (url != null) { |
| // Because we deal with URI's in mapreduce, we need to convert the URL to |
| // a URI and add a fragment if necessary. |
| URI uri = null; |
| try { |
| String name = new Path(url.getFile()).getName(); |
| if (rn != null && !name.equals(rn)) { |
| // A name was specified that is different then the URL in the shared |
| // cache. Therefore, we need to set the fragment portion of the URI to |
| // preserve the user's desired name. We assume that there is no |
| // existing fragment in the URL since the shared cache manager does |
| // not use fragments. |
| uri = new URI(url.getScheme(), url.getUserInfo(), url.getHost(), |
| url.getPort(), url.getFile(), null, rn); |
| } else { |
| uri = new URI(url.getScheme(), url.getUserInfo(), url.getHost(), |
| url.getPort(), url.getFile(), null, null); |
| } |
| return uri; |
| } catch (URISyntaxException e) { |
| LOG.warn("Error trying to convert URL received from shared cache to" |
| + " a URI: " + url.toString()); |
| return null; |
| } |
| } else { |
| return null; |
| } |
| } |
| |
| @VisibleForTesting |
| void copyJar(Path originalJarPath, Path submitJarFile, |
| short replication) throws IOException { |
| jtFs.copyFromLocalFile(originalJarPath, submitJarFile); |
| jtFs.setReplication(submitJarFile, replication); |
| jtFs.setPermission(submitJarFile, new FsPermission( |
| JobSubmissionFiles.JOB_FILE_PERMISSION)); |
| } |
| |
| private void addLog4jToDistributedCache(Job job, Path jobSubmitDir) |
| throws IOException { |
| Configuration conf = job.getConfiguration(); |
| String log4jPropertyFile = |
| conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, ""); |
| if (!log4jPropertyFile.isEmpty()) { |
| short replication = (short) conf.getInt(Job.SUBMIT_REPLICATION, 10); |
| copyLog4jPropertyFile(job, jobSubmitDir, replication); |
| } |
| } |
| |
| private URI getPathURI(Path destPath, String fragment) |
| throws URISyntaxException { |
| URI pathURI = destPath.toUri(); |
| if (pathURI.getFragment() == null) { |
| if (fragment == null) { |
| // no fragment, just return existing pathURI from destPath |
| } else { |
| pathURI = new URI(pathURI.toString() + "#" + fragment); |
| } |
| } |
| return pathURI; |
| } |
| |
| // copy user specified log4j.property file in local |
| // to HDFS with putting on distributed cache and adding its parent directory |
| // to classpath. |
| @SuppressWarnings("deprecation") |
| private void copyLog4jPropertyFile(Job job, Path submitJobDir, |
| short replication) throws IOException { |
| Configuration conf = job.getConfiguration(); |
| |
| String file = |
| validateFilePath( |
| conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE), conf); |
| LOG.debug("default FileSystem: " + jtFs.getUri()); |
| FsPermission mapredSysPerms = |
| new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); |
| try { |
| jtFs.getFileStatus(submitJobDir); |
| } catch (FileNotFoundException e) { |
| throw new IOException("Cannot find job submission directory! " |
| + "It should just be created, so something wrong here.", e); |
| } |
| |
| Path fileDir = JobSubmissionFiles.getJobLog4jFile(submitJobDir); |
| |
| // first copy local log4j.properties file to HDFS under submitJobDir |
| if (file != null) { |
| FileSystem.mkdirs(jtFs, fileDir, mapredSysPerms); |
| URI tmpURI = null; |
| try { |
| tmpURI = new URI(file); |
| } catch (URISyntaxException e) { |
| throw new IllegalArgumentException(e); |
| } |
| Path tmp = new Path(tmpURI); |
| Path newPath = copyRemoteFiles(fileDir, tmp, conf, replication); |
| DistributedCache.addFileToClassPath(new Path(newPath.toUri().getPath()), |
| conf); |
| } |
| } |
| |
| /** |
| * takes input as a path string for file and verifies if it exist. It defaults |
| * for file:/// if the files specified do not have a scheme. it returns the |
| * paths uri converted defaulting to file:///. So an input of /home/user/file1 |
| * would return file:///home/user/file1 |
| * |
| * @param file |
| * @param conf |
| * @return |
| */ |
| private String validateFilePath(String file, Configuration conf) |
| throws IOException { |
| if (file == null) { |
| return null; |
| } |
| if (file.isEmpty()) { |
| throw new IllegalArgumentException("File name can't be empty string"); |
| } |
| String finalPath; |
| URI pathURI; |
| try { |
| pathURI = new URI(file); |
| } catch (URISyntaxException e) { |
| throw new IllegalArgumentException(e); |
| } |
| Path path = new Path(pathURI); |
| FileSystem localFs = FileSystem.getLocal(conf); |
| if (pathURI.getScheme() == null) { |
| // default to the local file system |
| // check if the file exists or not first |
| localFs.getFileStatus(path); |
| finalPath = |
| path.makeQualified(localFs.getUri(), localFs.getWorkingDirectory()) |
| .toString(); |
| } else { |
| // check if the file exists in this file system |
| // we need to recreate this filesystem object to copy |
| // these files to the file system ResourceManager is running |
| // on. |
| FileSystem fs = path.getFileSystem(conf); |
| fs.getFileStatus(path); |
| finalPath = |
| path.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString(); |
| } |
| return finalPath; |
| } |
| } |