| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.mapred; |
| |
| import org.apache.commons.logging.*; |
| |
| import org.apache.hadoop.fs.*; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.filecache.*; |
| import org.apache.hadoop.util.*; |
| |
| import java.io.*; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Vector; |
| import java.net.URI; |
| |
| /** Base class that runs a task in a separate process. Tasks are run in a |
| * separate process in order to isolate the map/reduce system code from bugs in |
| * user supplied map and reduce functions. |
| */ |
| abstract class TaskRunner extends Thread { |
| public static final Log LOG = |
| LogFactory.getLog(TaskRunner.class); |
| |
| volatile boolean killed = false; |
| private TaskTracker.TaskInProgress tip; |
| private Task t; |
| private Object lock = new Object(); |
| private volatile boolean done = false; |
| private int exitCode = -1; |
| private boolean exitCodeSet = false; |
| |
| private TaskTracker tracker; |
| |
| protected JobConf conf; |
| JvmManager jvmManager; |
| |
| /** |
| * for cleaning up old map outputs |
| */ |
| protected MapOutputFile mapOutputFile; |
| |
| public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker, |
| JobConf conf) { |
| this.tip = tip; |
| this.t = tip.getTask(); |
| this.tracker = tracker; |
| this.conf = conf; |
| this.mapOutputFile = new MapOutputFile(t.getJobID()); |
| this.mapOutputFile.setConf(conf); |
| this.jvmManager = tracker.getJvmManagerInstance(); |
| } |
| |
| public Task getTask() { return t; } |
| public TaskTracker.TaskInProgress getTaskInProgress() { return tip; } |
| public TaskTracker getTracker() { return tracker; } |
| |
| /** Called to assemble this task's input. This method is run in the parent |
| * process before the child is spawned. It should not execute user code, |
| * only system code. */ |
| public boolean prepare() throws IOException { |
| return true; |
| } |
| |
| /** Called when this task's output is no longer needed. |
| * This method is run in the parent process after the child exits. It should |
| * not execute user code, only system code. |
| */ |
| public void close() throws IOException {} |
| |
| private static String stringifyPathArray(Path[] p){ |
| if (p == null){ |
| return null; |
| } |
| StringBuffer str = new StringBuffer(p[0].toString()); |
| for (int i = 1; i < p.length; i++){ |
| str.append(","); |
| str.append(p[i].toString()); |
| } |
| return str.toString(); |
| } |
| |
| @Override |
| public final void run() { |
| try { |
| |
| //before preparing the job localize |
| //all the archives |
| TaskAttemptID taskid = t.getTaskID(); |
| LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); |
| File jobCacheDir = null; |
| if (conf.getJar() != null) { |
| jobCacheDir = new File( |
| new Path(conf.getJar()).getParent().toString()); |
| } |
| File workDir = new File(lDirAlloc.getLocalPathToRead( |
| TaskTracker.getJobCacheSubdir() |
| + Path.SEPARATOR + t.getJobID() |
| + Path.SEPARATOR + t.getTaskID() |
| + Path.SEPARATOR + MRConstants.WORKDIR, |
| conf). toString()); |
| |
| URI[] archives = DistributedCache.getCacheArchives(conf); |
| URI[] files = DistributedCache.getCacheFiles(conf); |
| FileStatus fileStatus; |
| FileSystem fileSystem; |
| Path localPath; |
| String baseDir; |
| |
| if ((archives != null) || (files != null)) { |
| if (archives != null) { |
| String[] archivesTimestamps = |
| DistributedCache.getArchiveTimestamps(conf); |
| Path[] p = new Path[archives.length]; |
| for (int i = 0; i < archives.length;i++){ |
| fileSystem = FileSystem.get(archives[i], conf); |
| fileStatus = fileSystem.getFileStatus( |
| new Path(archives[i].getPath())); |
| String cacheId = DistributedCache.makeRelative(archives[i],conf); |
| String cachePath = TaskTracker.getCacheSubdir() + |
| Path.SEPARATOR + cacheId; |
| if (lDirAlloc.ifExists(cachePath, conf)) { |
| localPath = lDirAlloc.getLocalPathToRead(cachePath, conf); |
| } |
| else { |
| localPath = lDirAlloc.getLocalPathForWrite(cachePath, |
| fileStatus.getLen(), conf); |
| } |
| baseDir = localPath.toString().replace(cacheId, ""); |
| p[i] = DistributedCache.getLocalCache(archives[i], conf, |
| new Path(baseDir), |
| fileStatus, |
| true, Long.parseLong( |
| archivesTimestamps[i]), |
| new Path(workDir. |
| getAbsolutePath()), |
| false); |
| |
| } |
| DistributedCache.setLocalArchives(conf, stringifyPathArray(p)); |
| } |
| if ((files != null)) { |
| String[] fileTimestamps = DistributedCache.getFileTimestamps(conf); |
| Path[] p = new Path[files.length]; |
| for (int i = 0; i < files.length;i++){ |
| fileSystem = FileSystem.get(files[i], conf); |
| fileStatus = fileSystem.getFileStatus( |
| new Path(files[i].getPath())); |
| String cacheId = DistributedCache.makeRelative(files[i], conf); |
| String cachePath = TaskTracker.getCacheSubdir() + |
| Path.SEPARATOR + cacheId; |
| if (lDirAlloc.ifExists(cachePath,conf)) { |
| localPath = lDirAlloc.getLocalPathToRead(cachePath, conf); |
| } else { |
| localPath = lDirAlloc.getLocalPathForWrite(cachePath, |
| fileStatus.getLen(), conf); |
| } |
| baseDir = localPath.toString().replace(cacheId, ""); |
| p[i] = DistributedCache.getLocalCache(files[i], conf, |
| new Path(baseDir), |
| fileStatus, |
| false, Long.parseLong( |
| fileTimestamps[i]), |
| new Path(workDir. |
| getAbsolutePath()), |
| false); |
| } |
| DistributedCache.setLocalFiles(conf, stringifyPathArray(p)); |
| } |
| Path localTaskFile = new Path(t.getJobFile()); |
| FileSystem localFs = FileSystem.getLocal(conf); |
| localFs.delete(localTaskFile, true); |
| OutputStream out = localFs.create(localTaskFile); |
| try { |
| conf.writeXml(out); |
| } finally { |
| out.close(); |
| } |
| } |
| |
| if (!prepare()) { |
| return; |
| } |
| |
| String sep = System.getProperty("path.separator"); |
| StringBuffer classPath = new StringBuffer(); |
| // start with same classpath as parent process |
| classPath.append(System.getProperty("java.class.path")); |
| classPath.append(sep); |
| if (!workDir.mkdirs()) { |
| if (!workDir.isDirectory()) { |
| LOG.fatal("Mkdirs failed to create " + workDir.toString()); |
| } |
| } |
| |
| String jar = conf.getJar(); |
| if (jar != null) { |
| // if jar exists, it into workDir |
| File[] libs = new File(jobCacheDir, "lib").listFiles(); |
| if (libs != null) { |
| for (int i = 0; i < libs.length; i++) { |
| classPath.append(sep); // add libs from jar to classpath |
| classPath.append(libs[i]); |
| } |
| } |
| classPath.append(sep); |
| classPath.append(new File(jobCacheDir, "classes")); |
| classPath.append(sep); |
| classPath.append(jobCacheDir); |
| |
| } |
| |
| // include the user specified classpath |
| |
| //archive paths |
| Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf); |
| if (archiveClasspaths != null && archives != null) { |
| Path[] localArchives = DistributedCache |
| .getLocalCacheArchives(conf); |
| if (localArchives != null){ |
| for (int i=0;i<archives.length;i++){ |
| for(int j=0;j<archiveClasspaths.length;j++){ |
| if (archives[i].getPath().equals( |
| archiveClasspaths[j].toString())){ |
| classPath.append(sep); |
| classPath.append(localArchives[i] |
| .toString()); |
| } |
| } |
| } |
| } |
| } |
| //file paths |
| Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf); |
| if (fileClasspaths!=null && files != null) { |
| Path[] localFiles = DistributedCache |
| .getLocalCacheFiles(conf); |
| if (localFiles != null) { |
| for (int i = 0; i < files.length; i++) { |
| for (int j = 0; j < fileClasspaths.length; j++) { |
| if (files[i].getPath().equals( |
| fileClasspaths[j].toString())) { |
| classPath.append(sep); |
| classPath.append(localFiles[i].toString()); |
| } |
| } |
| } |
| } |
| } |
| |
| classPath.append(sep); |
| classPath.append(workDir); |
| // Build exec child jmv args. |
| Vector<String> vargs = new Vector<String>(8); |
| File jvm = // use same jvm as parent |
| new File(new File(System.getProperty("java.home"), "bin"), "java"); |
| |
| vargs.add(jvm.toString()); |
| |
| // Add child (task) java-vm options. |
| // |
| // The following symbols if present in mapred.child.java.opts value are |
| // replaced: |
| // + @taskid@ is interpolated with value of TaskID. |
| // Other occurrences of @ will not be altered. |
| // |
| // Example with multiple arguments and substitutions, showing |
| // jvm GC logging, and start of a passwordless JVM JMX agent so can |
| // connect with jconsole and the likes to watch child memory, threads |
| // and get thread dumps. |
| // |
| // <property> |
| // <name>mapred.child.java.opts</name> |
| // <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc \ |
| // -Dcom.sun.management.jmxremote.authenticate=false \ |
| // -Dcom.sun.management.jmxremote.ssl=false \ |
| // </value> |
| // </property> |
| // |
| String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m"); |
| javaOpts = javaOpts.replace("@taskid@", taskid.toString()); |
| String [] javaOptsSplit = javaOpts.split(" "); |
| |
| // Add java.library.path; necessary for loading native libraries. |
| // |
| // 1. To support native-hadoop library i.e. libhadoop.so, we add the |
| // parent processes' java.library.path to the child. |
| // 2. We also add the 'cwd' of the task to it's java.library.path to help |
| // users distribute native libraries via the DistributedCache. |
| // 3. The user can also specify extra paths to be added to the |
| // java.library.path via mapred.child.java.opts. |
| // |
| String libraryPath = System.getProperty("java.library.path"); |
| if (libraryPath == null) { |
| libraryPath = workDir.getAbsolutePath(); |
| } else { |
| libraryPath += sep + workDir; |
| } |
| boolean hasUserLDPath = false; |
| for(int i=0; i<javaOptsSplit.length ;i++) { |
| if(javaOptsSplit[i].startsWith("-Djava.library.path=")) { |
| javaOptsSplit[i] += sep + libraryPath; |
| hasUserLDPath = true; |
| break; |
| } |
| } |
| if(!hasUserLDPath) { |
| vargs.add("-Djava.library.path=" + libraryPath); |
| } |
| for (int i = 0; i < javaOptsSplit.length; i++) { |
| vargs.add(javaOptsSplit[i]); |
| } |
| |
| // add java.io.tmpdir given by mapred.child.tmp |
| String tmp = conf.get("mapred.child.tmp", "./tmp"); |
| Path tmpDir = new Path(tmp); |
| |
| // if temp directory path is not absolute |
| // prepend it with workDir. |
| if (!tmpDir.isAbsolute()) { |
| tmpDir = new Path(workDir.toString(), tmp); |
| } |
| FileSystem localFs = FileSystem.getLocal(conf); |
| if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) { |
| throw new IOException("Mkdirs failed to create " + tmpDir.toString()); |
| } |
| vargs.add("-Djava.io.tmpdir=" + tmpDir.toString()); |
| |
| // Add classpath. |
| vargs.add("-classpath"); |
| vargs.add(classPath.toString()); |
| |
| // Setup the log4j prop |
| long logSize = TaskLog.getTaskLogLength(conf); |
| vargs.add("-Dhadoop.log.dir=" + |
| new File(System.getProperty("hadoop.log.dir") |
| ).getAbsolutePath()); |
| vargs.add("-Dhadoop.root.logger=INFO,TLA"); |
| vargs.add("-Dhadoop.tasklog.taskid=" + taskid); |
| vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize); |
| |
| if (conf.getProfileEnabled()) { |
| if (conf.getProfileTaskRange(t.isMapTask() |
| ).isIncluded(t.getPartition())) { |
| File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE); |
| vargs.add(String.format(conf.getProfileParams(), prof.toString())); |
| } |
| } |
| |
| // Add main class and its arguments |
| vargs.add(Child.class.getName()); // main of Child |
| // pass umbilical address |
| InetSocketAddress address = tracker.getTaskTrackerReportAddress(); |
| vargs.add(address.getAddress().getHostAddress()); |
| vargs.add(Integer.toString(address.getPort())); |
| vargs.add(taskid.toString()); // pass task identifier |
| |
| String pidFile = null; |
| if (tracker.isTaskMemoryManagerEnabled()) { |
| pidFile = lDirAlloc.getLocalPathForWrite( |
| (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + taskid), |
| this.conf).toString(); |
| } |
| |
| // set memory limit using ulimit if feasible and necessary ... |
| String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf); |
| List<String> setup = null; |
| if (ulimitCmd != null) { |
| setup = new ArrayList<String>(); |
| for (String arg : ulimitCmd) { |
| setup.add(arg); |
| } |
| } |
| |
| // Set up the redirection of the task's stdout and stderr streams |
| File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT); |
| File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR); |
| stdout.getParentFile().mkdirs(); |
| tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr); |
| |
| Map<String, String> env = new HashMap<String, String>(); |
| StringBuffer ldLibraryPath = new StringBuffer(); |
| ldLibraryPath.append(workDir.toString()); |
| String oldLdLibraryPath = null; |
| oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH"); |
| if (oldLdLibraryPath != null) { |
| ldLibraryPath.append(sep); |
| ldLibraryPath.append(oldLdLibraryPath); |
| } |
| env.put("LD_LIBRARY_PATH", ldLibraryPath.toString()); |
| jvmManager.launchJvm(this, |
| jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, |
| workDir, env, pidFile, conf)); |
| synchronized (lock) { |
| while (!done) { |
| lock.wait(); |
| } |
| } |
| tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID()); |
| if (exitCodeSet) { |
| if (!killed && exitCode != 0) { |
| if (exitCode == 65) { |
| tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID()); |
| } |
| throw new IOException("Task process exit with nonzero status of " + |
| exitCode + "."); |
| } |
| } |
| } catch (FSError e) { |
| LOG.fatal("FSError", e); |
| try { |
| tracker.fsError(t.getTaskID(), e.getMessage()); |
| } catch (IOException ie) { |
| LOG.fatal(t.getTaskID()+" reporting FSError", ie); |
| } |
| } catch (Throwable throwable) { |
| LOG.warn(t.getTaskID()+" Child Error", throwable); |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| throwable.printStackTrace(new PrintStream(baos)); |
| try { |
| tracker.reportDiagnosticInfo(t.getTaskID(), baos.toString()); |
| } catch (IOException e) { |
| LOG.warn(t.getTaskID()+" Reporting Diagnostics", e); |
| } |
| } finally { |
| try{ |
| URI[] archives = DistributedCache.getCacheArchives(conf); |
| URI[] files = DistributedCache.getCacheFiles(conf); |
| if (archives != null){ |
| for (int i = 0; i < archives.length; i++){ |
| DistributedCache.releaseCache(archives[i], conf); |
| } |
| } |
| if (files != null){ |
| for(int i = 0; i < files.length; i++){ |
| DistributedCache.releaseCache(files[i], conf); |
| } |
| } |
| }catch(IOException ie){ |
| LOG.warn("Error releasing caches : Cache files might not have been cleaned up"); |
| } |
| tracker.reportTaskFinished(t.getTaskID(), false); |
| } |
| } |
| |
| //Mostly for setting up the symlinks. Note that when we setup the distributed |
| //cache, we didn't create the symlinks. This is done on a per task basis |
| //by the currently executing task. |
| public static void setupWorkDir(JobConf conf) throws IOException { |
| File workDir = new File(".").getAbsoluteFile(); |
| FileUtil.fullyDelete(workDir); |
| if (DistributedCache.getSymlink(conf)) { |
| URI[] archives = DistributedCache.getCacheArchives(conf); |
| URI[] files = DistributedCache.getCacheFiles(conf); |
| Path[] localArchives = DistributedCache.getLocalCacheArchives(conf); |
| Path[] localFiles = DistributedCache.getLocalCacheFiles(conf); |
| if (archives != null) { |
| for (int i = 0; i < archives.length; i++) { |
| String link = archives[i].getFragment(); |
| if (link != null) { |
| link = workDir.toString() + Path.SEPARATOR + link; |
| File flink = new File(link); |
| if (!flink.exists()) { |
| FileUtil.symLink(localArchives[i].toString(), link); |
| } |
| } |
| } |
| } |
| if (files != null) { |
| for (int i = 0; i < files.length; i++) { |
| String link = files[i].getFragment(); |
| if (link != null) { |
| link = workDir.toString() + Path.SEPARATOR + link; |
| File flink = new File(link); |
| if (!flink.exists()) { |
| FileUtil.symLink(localFiles[i].toString(), link); |
| } |
| } |
| } |
| } |
| } |
| File jobCacheDir = null; |
| if (conf.getJar() != null) { |
| jobCacheDir = new File( |
| new Path(conf.getJar()).getParent().toString()); |
| } |
| |
| // create symlinks for all the files in job cache dir in current |
| // workingdir for streaming |
| try{ |
| DistributedCache.createAllSymlink(conf, jobCacheDir, |
| workDir); |
| } catch(IOException ie){ |
| // Do not exit even if symlinks have not been created. |
| LOG.warn(StringUtils.stringifyException(ie)); |
| } |
| // add java.io.tmpdir given by mapred.child.tmp |
| String tmp = conf.get("mapred.child.tmp", "./tmp"); |
| Path tmpDir = new Path(tmp); |
| |
| // if temp directory path is not absolute |
| // prepend it with workDir. |
| if (!tmpDir.isAbsolute()) { |
| tmpDir = new Path(workDir.toString(), tmp); |
| FileSystem localFs = FileSystem.getLocal(conf); |
| if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()){ |
| throw new IOException("Mkdirs failed to create " + tmpDir.toString()); |
| } |
| } |
| } |
| |
| /** |
| * Kill the child process |
| */ |
| public void kill() { |
| killed = true; |
| jvmManager.taskKilled(this); |
| signalDone(); |
| } |
| public void signalDone() { |
| synchronized (lock) { |
| done = true; |
| lock.notify(); |
| } |
| } |
| public void setExitCode(int exitCode) { |
| this.exitCodeSet = true; |
| this.exitCode = exitCode; |
| } |
| } |