| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.mapred; |
| |
| import java.io.DataOutputStream; |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.RandomAccessFile; |
| import java.net.InetSocketAddress; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.StringTokenizer; |
| import java.util.TreeMap; |
| import java.util.Vector; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.regex.Pattern; |
| |
| import javax.servlet.ServletContext; |
| import javax.servlet.ServletException; |
| import javax.servlet.http.HttpServlet; |
| import javax.servlet.http.HttpServletRequest; |
| import javax.servlet.http.HttpServletResponse; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.DF; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.LocalDirAllocator; |
| import org.apache.hadoop.fs.LocalFileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.http.HttpServer; |
| import org.apache.hadoop.io.IntWritable; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.ipc.RemoteException; |
| import org.apache.hadoop.ipc.Server; |
| import org.apache.hadoop.mapred.TaskController.DebugScriptContext; |
| import org.apache.hadoop.mapred.TaskController.JobInitializationContext; |
| import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus; |
| import org.apache.hadoop.mapred.pipes.Submitter; |
| import org.apache.hadoop.mapreduce.MRConfig; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.TaskType; |
| import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager; |
| import org.apache.hadoop.mapreduce.security.JobTokens; |
| import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; |
| import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; |
| import org.apache.hadoop.mapreduce.server.tasktracker.Localizer; |
| import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader; |
| import org.apache.hadoop.metrics.MetricsContext; |
| import org.apache.hadoop.metrics.MetricsException; |
| import org.apache.hadoop.metrics.MetricsRecord; |
| import org.apache.hadoop.metrics.MetricsUtil; |
| import org.apache.hadoop.metrics.Updater; |
| import org.apache.hadoop.net.DNS; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.authorize.ConfiguredPolicy; |
| import org.apache.hadoop.security.authorize.PolicyProvider; |
| import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; |
| import org.apache.hadoop.util.DiskChecker; |
| import org.apache.hadoop.mapreduce.util.ConfigUtil; |
| import org.apache.hadoop.mapreduce.util.MemoryCalculatorPlugin; |
| import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.util.RunJar; |
| import org.apache.hadoop.util.Service; |
| import org.apache.hadoop.util.Daemon; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.util.VersionInfo; |
| import org.apache.hadoop.util.DiskChecker.DiskErrorException; |
| import org.apache.hadoop.util.Shell.ShellCommandExecutor; |
| |
| /******************************************************* |
| * TaskTracker is a process that starts and tracks MR Tasks |
| * in a networked environment. It contacts the JobTracker |
| * for Task assignments and reporting results. |
| * |
| *******************************************************/ |
| public class TaskTracker extends Service |
| implements MRConstants, TaskUmbilicalProtocol, Runnable, TTConfig { |
| /** |
| * @deprecated |
| */ |
| @Deprecated |
| static final String MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY = |
| "mapred.tasktracker.vmem.reserved"; |
| /** |
| * @deprecated |
| */ |
| @Deprecated |
| static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY = |
| "mapred.tasktracker.pmem.reserved"; |
| |
| static final long WAIT_FOR_DONE = 3 * 1000; |
| int httpPort; |
| |
| static enum State {NORMAL, STALE, INTERRUPTED, DENIED} |
| |
| static{ |
| ConfigUtil.loadResources(); |
| } |
| |
| public static final Log LOG = |
| LogFactory.getLog(TaskTracker.class); |
| |
| public static final String MR_CLIENTTRACE_FORMAT = |
| "src: %s" + // src IP |
| ", dest: %s" + // dst IP |
| ", maps: %s" + // number of maps |
| ", op: %s" + // operation |
| ", reduceID: %s" + // reduce id |
| ", duration: %s"; // duration |
| |
| public static final Log ClientTraceLog = |
| LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace"); |
| |
| /** |
| * Flag used to synchronize running state across threads. |
| */ |
| private volatile boolean running = false; |
| |
| private LocalDirAllocator localDirAllocator; |
| String taskTrackerName; |
| String localHostname; |
| InetSocketAddress jobTrackAddr; |
| |
| InetSocketAddress taskReportAddress; |
| |
| Server taskReportServer = null; |
| InterTrackerProtocol jobClient; |
| |
| private TrackerDistributedCacheManager distributedCacheManager; |
| |
| // last heartbeat response recieved |
| short heartbeatResponseId = -1; |
| |
| static final String TASK_CLEANUP_SUFFIX = ".cleanup"; |
| |
| /* |
| * This is the last 'status' report sent by this tracker to the JobTracker. |
| * |
| * If the rpc call succeeds, this 'status' is cleared-out by this tracker; |
| * indicating that a 'fresh' status report be generated; in the event the |
| * rpc calls fails for whatever reason, the previous status report is sent |
| * again. |
| */ |
| TaskTrackerStatus status = null; |
| |
| // The system-directory on HDFS where job files are stored |
| Path systemDirectory = null; |
| |
| // The filesystem where job files are stored |
| FileSystem systemFS = null; |
| |
| private HttpServer server; |
| |
| volatile boolean shuttingDown = false; |
| |
| Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>(); |
| /** |
| * Map from taskId -> TaskInProgress. |
| */ |
| Map<TaskAttemptID, TaskInProgress> runningTasks = null; |
| Map<JobID, RunningJob> runningJobs = new TreeMap<JobID, RunningJob>(); |
| |
| volatile int mapTotal = 0; |
| volatile int reduceTotal = 0; |
| boolean justStarted = true; |
| boolean justInited = true; |
| // Mark reduce tasks that are shuffling to rollback their events index |
| Set<TaskAttemptID> shouldReset = new HashSet<TaskAttemptID>(); |
| |
| //dir -> DF |
| Map<String, DF> localDirsDf = new HashMap<String, DF>(); |
| long minSpaceStart = 0; |
| //must have this much space free to start new tasks |
| boolean acceptNewTasks = true; |
| long minSpaceKill = 0; |
| //if we run under this limit, kill one task |
| //and make sure we never receive any new jobs |
| //until all the old tasks have been cleaned up. |
| //this is if a machine is so full it's only good |
| //for serving map output to the other nodes |
| |
| static Random r = new Random(); |
| public static final String SUBDIR = "taskTracker"; |
| static final String DISTCACHEDIR = "distcache"; |
| static final String JOBCACHE = "jobcache"; |
| static final String OUTPUT = "output"; |
| private static final String JARSDIR = "jars"; |
| static final String LOCAL_SPLIT_FILE = "split.dta"; |
| static final String JOBFILE = "job.xml"; |
| static final String JOB_TOKEN_FILE="jobToken"; //localized file |
| |
| static final String JOB_LOCAL_DIR = JobContext.JOB_LOCAL_DIR; |
| |
| private JobConf fConf; |
| FileSystem localFs; |
| |
| private Localizer localizer; |
| |
| private int maxMapSlots; |
| private int maxReduceSlots; |
| private int failures; |
| |
| // Performance-related config knob to send an out-of-band heartbeat |
| // on task completion |
| private volatile boolean oobHeartbeatOnTaskCompletion; |
| |
| // Track number of completed tasks to send an out-of-band heartbeat |
| private IntWritable finishedCount = new IntWritable(0); |
| |
| private MapEventsFetcherThread mapEventsFetcher; |
| int workerThreads; |
| CleanupQueue directoryCleanupThread; |
| volatile JvmManager jvmManager; |
| |
| private TaskMemoryManagerThread taskMemoryManager; |
| private boolean taskMemoryManagerEnabled = true; |
| private long totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT; |
| private long totalPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT; |
| private long mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT; |
| private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT; |
| private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT; |
| |
| static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY = |
| TT_MEMORY_CALCULATOR_PLUGIN; |
| |
| /** |
| * the minimum interval between jobtracker polls |
| */ |
| private volatile int heartbeatInterval = HEARTBEAT_INTERVAL_MIN; |
| /** |
| * Number of maptask completion events locations to poll for at one time |
| */ |
| private int probe_sample_size = 500; |
| |
| private IndexCache indexCache; |
| |
| /** |
| * Handle to the specific instance of the {@link TaskController} class |
| */ |
| private TaskController taskController; |
| |
| /** |
| * Handle to the specific instance of the {@link NodeHealthCheckerService} |
| */ |
| private NodeHealthCheckerService healthChecker; |
| |
| /* |
| * A list of commitTaskActions for whom commit response has been received |
| */ |
| private List<TaskAttemptID> commitResponses = |
| Collections.synchronizedList(new ArrayList<TaskAttemptID>()); |
| |
| private ShuffleServerMetrics shuffleServerMetrics; |
| /** This class contains the methods that should be used for metrics-reporting |
| * the specific metrics for shuffle. The TaskTracker is actually a server for |
| * the shuffle and hence the name ShuffleServerMetrics. |
| */ |
| private class ShuffleServerMetrics implements Updater { |
| private MetricsRecord shuffleMetricsRecord = null; |
| private int serverHandlerBusy = 0; |
| private long outputBytes = 0; |
| private int failedOutputs = 0; |
| private int successOutputs = 0; |
| ShuffleServerMetrics(JobConf conf) { |
| MetricsContext context = MetricsUtil.getContext("mapred"); |
| shuffleMetricsRecord = |
| MetricsUtil.createRecord(context, "shuffleOutput"); |
| this.shuffleMetricsRecord.setTag("sessionId", conf.getSessionId()); |
| context.registerUpdater(this); |
| } |
| synchronized void serverHandlerBusy() { |
| ++serverHandlerBusy; |
| } |
| synchronized void serverHandlerFree() { |
| --serverHandlerBusy; |
| } |
| synchronized void outputBytes(long bytes) { |
| outputBytes += bytes; |
| } |
| synchronized void failedOutput() { |
| ++failedOutputs; |
| } |
| synchronized void successOutput() { |
| ++successOutputs; |
| } |
| public void doUpdates(MetricsContext unused) { |
| synchronized (this) { |
| if (workerThreads != 0) { |
| shuffleMetricsRecord.setMetric("shuffle_handler_busy_percent", |
| 100*((float)serverHandlerBusy/workerThreads)); |
| } else { |
| shuffleMetricsRecord.setMetric("shuffle_handler_busy_percent", 0); |
| } |
| shuffleMetricsRecord.incrMetric("shuffle_output_bytes", |
| outputBytes); |
| shuffleMetricsRecord.incrMetric("shuffle_failed_outputs", |
| failedOutputs); |
| shuffleMetricsRecord.incrMetric("shuffle_success_outputs", |
| successOutputs); |
| outputBytes = 0; |
| failedOutputs = 0; |
| successOutputs = 0; |
| } |
| shuffleMetricsRecord.update(); |
| } |
| } |
| |
| |
| |
| |
| |
| private TaskTrackerInstrumentation myInstrumentation = null; |
| |
| public TaskTrackerInstrumentation getTaskTrackerInstrumentation() { |
| return myInstrumentation; |
| } |
| |
| /** |
| * A list of tips that should be cleaned up. |
| */ |
| private BlockingQueue<TaskTrackerAction> tasksToCleanup = |
| new LinkedBlockingQueue<TaskTrackerAction>(); |
| |
| /** |
| * A daemon-thread that pulls tips off the list of things to cleanup. |
| */ |
| private TaskCleanupThread taskCleanupThread; |
| |
| public TaskController getTaskController() { |
| return taskController; |
| } |
| |
| private RunningJob addTaskToJob(JobID jobId, |
| TaskInProgress tip) { |
| synchronized (runningJobs) { |
| RunningJob rJob = null; |
| if (!runningJobs.containsKey(jobId)) { |
| rJob = new RunningJob(jobId); |
| rJob.localized = false; |
| rJob.tasks = new HashSet<TaskInProgress>(); |
| runningJobs.put(jobId, rJob); |
| } else { |
| rJob = runningJobs.get(jobId); |
| } |
| synchronized (rJob) { |
| rJob.tasks.add(tip); |
| } |
| runningJobs.notify(); //notify the fetcher thread |
| return rJob; |
| } |
| } |
| |
| private void removeTaskFromJob(JobID jobId, TaskInProgress tip) { |
| synchronized (runningJobs) { |
| RunningJob rjob = runningJobs.get(jobId); |
| if (rjob == null) { |
| LOG.warn("Unknown job " + jobId + " being deleted."); |
| } else { |
| synchronized (rjob) { |
| rjob.tasks.remove(tip); |
| } |
| } |
| } |
| } |
| |
| Localizer getLocalizer() { |
| return localizer; |
| } |
| |
| void setLocalizer(Localizer l) { |
| localizer = l; |
| } |
| |
| public static String getUserDir(String user) { |
| return TaskTracker.SUBDIR + Path.SEPARATOR + user; |
| } |
| |
| public static String getDistributedCacheDir(String user) { |
| return getUserDir(user) + Path.SEPARATOR + TaskTracker.DISTCACHEDIR; |
| } |
| |
| public static String getJobCacheSubdir(String user) { |
| return getUserDir(user) + Path.SEPARATOR + TaskTracker.JOBCACHE; |
| } |
| |
| public static String getLocalJobDir(String user, String jobid) { |
| return getJobCacheSubdir(user) + Path.SEPARATOR + jobid; |
| } |
| |
| static String getLocalJobConfFile(String user, String jobid) { |
| return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOBFILE; |
| } |
| |
| static String getLocalJobTokenFile(String user, String jobid) { |
| return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE; |
| } |
| |
| |
| static String getTaskConfFile(String user, String jobid, String taskid, |
| boolean isCleanupAttempt) { |
| return getLocalTaskDir(user, jobid, taskid, isCleanupAttempt) |
| + Path.SEPARATOR + TaskTracker.JOBFILE; |
| } |
| |
| static String getJobJarsDir(String user, String jobid) { |
| return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JARSDIR; |
| } |
| |
| static String getJobJarFile(String user, String jobid) { |
| return getJobJarsDir(user, jobid) + Path.SEPARATOR + "job.jar"; |
| } |
| |
| static String getJobWorkDir(String user, String jobid) { |
| return getLocalJobDir(user, jobid) + Path.SEPARATOR + MRConstants.WORKDIR; |
| } |
| |
| static String getLocalSplitFile(String user, String jobid, String taskid) { |
| return TaskTracker.getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR |
| + TaskTracker.LOCAL_SPLIT_FILE; |
| } |
| |
| static String getIntermediateOutputDir(String user, String jobid, |
| String taskid) { |
| return getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR |
| + TaskTracker.OUTPUT; |
| } |
| |
| static String getLocalTaskDir(String user, String jobid, String taskid) { |
| return getLocalTaskDir(user, jobid, taskid, false); |
| } |
| |
| public static String getLocalTaskDir(String user, String jobid, String taskid, |
| boolean isCleanupAttempt) { |
| String taskDir = getLocalJobDir(user, jobid) + Path.SEPARATOR + taskid; |
| if (isCleanupAttempt) { |
| taskDir = taskDir + TASK_CLEANUP_SUFFIX; |
| } |
| return taskDir; |
| } |
| |
| static String getTaskWorkDir(String user, String jobid, String taskid, |
| boolean isCleanupAttempt) { |
| String dir = getLocalJobDir(user, jobid) + Path.SEPARATOR + taskid; |
| if (isCleanupAttempt) { |
| dir = dir + TASK_CLEANUP_SUFFIX; |
| } |
| return dir + Path.SEPARATOR + MRConstants.WORKDIR; |
| } |
| |
| String getPid(TaskAttemptID tid) { |
| TaskInProgress tip = tasks.get(tid); |
| if (tip != null) { |
| return jvmManager.getPid(tip.getTaskRunner()); |
| } |
| return null; |
| } |
| |
| public long getProtocolVersion(String protocol, |
| long clientVersion) throws IOException { |
| if (protocol.equals(TaskUmbilicalProtocol.class.getName())) { |
| return TaskUmbilicalProtocol.versionID; |
| } else { |
| throw new IOException("Unknown protocol for task tracker: " + |
| protocol); |
| } |
| } |
| |
| /** |
| * Do the real constructor work here. It's in a separate method |
| * so we can call it again and "recycle" the object after calling |
| * close(). |
| */ |
| synchronized void initialize() throws IOException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Initializing Task Tracker: " + toString()); |
| } |
| //check that the server is not already live. |
| |
| //allow this operation in only two service states: started and live |
| verifyServiceState(ServiceState.STARTED, ServiceState.LIVE); |
| |
| //flip the running switch for our inner threads |
| running = true; |
| |
| localFs = FileSystem.getLocal(fConf); |
| // use configured nameserver & interface to get local hostname |
| if (fConf.get(TT_HOST_NAME) != null) { |
| this.localHostname = fConf.get(TT_HOST_NAME); |
| } |
| if (localHostname == null) { |
| this.localHostname = |
| DNS.getDefaultHost |
| (fConf.get(TT_DNS_INTERFACE,"default"), |
| fConf.get(TT_DNS_NAMESERVER,"default")); |
| } |
| |
| //check local disk |
| checkLocalDirs(this.fConf.getLocalDirs()); |
| fConf.deleteLocalFiles(SUBDIR); |
| |
| // Clear out state tables |
| this.tasks.clear(); |
| this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>(); |
| this.runningJobs = new TreeMap<JobID, RunningJob>(); |
| this.mapTotal = 0; |
| this.reduceTotal = 0; |
| this.acceptNewTasks = true; |
| this.status = null; |
| |
| this.minSpaceStart = this.fConf.getLong(TT_LOCAL_DIR_MINSPACE_START, 0L); |
| this.minSpaceKill = this.fConf.getLong(TT_LOCAL_DIR_MINSPACE_KILL, 0L); |
| //tweak the probe sample size (make it a function of numCopiers) |
| probe_sample_size = |
| this.fConf.getInt(TT_MAX_TASK_COMPLETION_EVENTS_TO_POLL, 500); |
| |
| Class<? extends TaskTrackerInstrumentation> metricsInst = getInstrumentationClass(fConf); |
| try { |
| java.lang.reflect.Constructor<? extends TaskTrackerInstrumentation> c = |
| metricsInst.getConstructor(new Class[] {TaskTracker.class} ); |
| this.myInstrumentation = c.newInstance(this); |
| } catch(Exception e) { |
| //Reflection can throw lots of exceptions -- handle them all by |
| //falling back on the default. |
| LOG.error("failed to initialize taskTracker metrics", e); |
| this.myInstrumentation = new TaskTrackerMetricsInst(this); |
| } |
| |
| // bind address |
| InetSocketAddress socAddr = NetUtils.createSocketAddr( |
| fConf.get(TT_REPORT_ADDRESS, "127.0.0.1:0")); |
| String bindAddress = socAddr.getHostName(); |
| int tmpPort = socAddr.getPort(); |
| |
| this.jvmManager = new JvmManager(this); |
| |
| // Set service-level authorization security policy |
| if (this.fConf.getBoolean( |
| ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) { |
| PolicyProvider policyProvider = |
| (PolicyProvider)(ReflectionUtils.newInstance( |
| this.fConf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, |
| MapReducePolicyProvider.class, PolicyProvider.class), |
| this.fConf)); |
| SecurityUtil.setPolicy(new ConfiguredPolicy(this.fConf, policyProvider)); |
| } |
| |
| // RPC initialization |
| int max = maxMapSlots > maxReduceSlots ? |
| maxMapSlots : maxReduceSlots; |
| //set the num handlers to max*2 since canCommit may wait for the duration |
| //of a heartbeat RPC |
| this.taskReportServer = |
| RPC.getServer(this, bindAddress, tmpPort, 2 * max, false, this.fConf); |
| this.taskReportServer.start(); |
| |
| // get the assigned address |
| this.taskReportAddress = taskReportServer.getListenerAddress(); |
| this.fConf.set(TT_REPORT_ADDRESS, |
| taskReportAddress.getHostName() + ":" + taskReportAddress.getPort()); |
| LOG.info("TaskTracker up at: " + this.taskReportAddress); |
| |
| this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress; |
| LOG.info("Starting tracker " + taskTrackerName); |
| |
| // Initialize DistributedCache and |
| // clear out temporary files that might be lying around |
| this.distributedCacheManager = |
| new TrackerDistributedCacheManager(this.fConf); |
| this.distributedCacheManager.purgeCache(); |
| cleanupStorage(); |
| |
| //mark as just started; this is used in heartbeats |
| this.justStarted = true; |
| int connectTimeout = fConf |
| .getInt("mapred.task.tracker.connect.timeout", 60000); |
| this.jobClient = (InterTrackerProtocol) |
| RPC.waitForProxy(InterTrackerProtocol.class, |
| InterTrackerProtocol.versionID, |
| jobTrackAddr, this.fConf, connectTimeout); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Connected to JobTracker at " + jobTrackAddr); |
| } |
| this.justInited = true; |
| this.running = true; |
| // start the thread that will fetch map task completion events |
| this.mapEventsFetcher = new MapEventsFetcherThread(); |
| mapEventsFetcher.setDaemon(true); |
| mapEventsFetcher.setName( |
| "Map-events fetcher for all reduce tasks " + "on " + |
| taskTrackerName); |
| mapEventsFetcher.start(); |
| |
| initializeMemoryManagement(); |
| |
| this.indexCache = new IndexCache(this.fConf); |
| |
| mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots); |
| reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots); |
| mapLauncher.start(); |
| reduceLauncher.start(); |
| Class<? extends TaskController> taskControllerClass |
| = fConf.getClass(TT_TASK_CONTROLLER, |
| DefaultTaskController.class, |
| TaskController.class); |
| taskController = (TaskController)ReflectionUtils.newInstance( |
| taskControllerClass, fConf); |
| |
| //setup and create jobcache directory with appropriate permissions |
| taskController.setup(); |
| |
| // create a localizer instance |
| setLocalizer(new Localizer(localFs, fConf.getLocalDirs(), taskController)); |
| |
| //Start up node health checker service. |
| if (shouldStartHealthMonitor(this.fConf)) { |
| startHealthMonitor(this.fConf); |
| } |
| |
| oobHeartbeatOnTaskCompletion = |
| fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false); |
| } |
| |
| public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass( |
| Configuration conf) { |
| return conf.getClass(TT_INSTRUMENTATION, |
| TaskTrackerMetricsInst.class, TaskTrackerInstrumentation.class); |
| } |
| |
| public static void setInstrumentationClass( |
| Configuration conf, Class<? extends TaskTrackerInstrumentation> t) { |
| conf.setClass(TT_INSTRUMENTATION, |
| t, TaskTrackerInstrumentation.class); |
| } |
| |
| /** |
| * Removes all contents of temporary storage. Called upon |
| * startup, to remove any leftovers from previous run. |
| */ |
| public void cleanupStorage() throws IOException { |
| if (fConf != null) { |
| fConf.deleteLocalFiles(); |
| } |
| } |
| |
| // Object on wait which MapEventsFetcherThread is going to wait. |
| private Object waitingOn = new Object(); |
| |
| private class MapEventsFetcherThread extends Thread { |
| |
| private List <FetchStatus> reducesInShuffle() { |
| List <FetchStatus> fList = new ArrayList<FetchStatus>(); |
| for (Map.Entry <JobID, RunningJob> item : runningJobs.entrySet()) { |
| RunningJob rjob = item.getValue(); |
| JobID jobId = item.getKey(); |
| FetchStatus f; |
| synchronized (rjob) { |
| f = rjob.getFetchStatus(); |
| for (TaskInProgress tip : rjob.tasks) { |
| Task task = tip.getTask(); |
| if (!task.isMapTask()) { |
| if (((ReduceTask)task).getPhase() == |
| TaskStatus.Phase.SHUFFLE) { |
| if (rjob.getFetchStatus() == null) { |
| //this is a new job; we start fetching its map events |
| f = new FetchStatus(jobId, |
| ((ReduceTask)task).getNumMaps()); |
| rjob.setFetchStatus(f); |
| } |
| f = rjob.getFetchStatus(); |
| fList.add(f); |
| break; //no need to check any more tasks belonging to this |
| } |
| } |
| } |
| } |
| } |
| //at this point, we have information about for which of |
| //the running jobs do we need to query the jobtracker for map |
| //outputs (actually map events). |
| return fList; |
| } |
| |
| @Override |
| public void run() { |
| LOG.info("Starting thread: " + this.getName()); |
| |
| while (running) { |
| try { |
| List <FetchStatus> fList = null; |
| synchronized (runningJobs) { |
| while (((fList = reducesInShuffle()).size()) == 0) { |
| try { |
| runningJobs.wait(); |
| } catch (InterruptedException e) { |
| LOG.info("Shutting down: " + this.getName()); |
| return; |
| } |
| } |
| } |
| // now fetch all the map task events for all the reduce tasks |
| // possibly belonging to different jobs |
| boolean fetchAgain = false; //flag signifying whether we want to fetch |
| //immediately again. |
| for (FetchStatus f : fList) { |
| long currentTime = System.currentTimeMillis(); |
| try { |
| //the method below will return true when we have not |
| //fetched all available events yet |
| if (f.fetchMapCompletionEvents(currentTime)) { |
| fetchAgain = true; |
| } |
| } catch (Exception e) { |
| LOG.warn( |
| "Ignoring exception that fetch for map completion" + |
| " events threw for " + f.jobId + " threw: " + |
| StringUtils.stringifyException(e)); |
| } |
| if (!running) { |
| break; |
| } |
| } |
| synchronized (waitingOn) { |
| try { |
| if (!fetchAgain) { |
| waitingOn.wait(heartbeatInterval); |
| } |
| } catch (InterruptedException ie) { |
| LOG.info("Shutting down: " + this.getName()); |
| return; |
| } |
| } |
| } catch (Exception e) { |
| LOG.info("Ignoring exception " + e.getMessage()); |
| } |
| } |
| } |
| } |
| |
| private class FetchStatus { |
| /** The next event ID that we will start querying the JobTracker from*/ |
| private IntWritable fromEventId; |
| /** This is the cache of map events for a given job */ |
| private List<TaskCompletionEvent> allMapEvents; |
| /** What jobid this fetchstatus object is for*/ |
| private JobID jobId; |
| private long lastFetchTime; |
| private boolean fetchAgain; |
| |
| public FetchStatus(JobID jobId, int numMaps) { |
| this.fromEventId = new IntWritable(0); |
| this.jobId = jobId; |
| this.allMapEvents = new ArrayList<TaskCompletionEvent>(numMaps); |
| } |
| |
| /** |
| * Reset the events obtained so far. |
| */ |
| public void reset() { |
| // Note that the sync is first on fromEventId and then on allMapEvents |
| synchronized (fromEventId) { |
| synchronized (allMapEvents) { |
| fromEventId.set(0); // set the new index for TCE |
| allMapEvents.clear(); |
| } |
| } |
| } |
| |
| public TaskCompletionEvent[] getMapEvents(int fromId, int max) { |
| |
| TaskCompletionEvent[] mapEvents = |
| TaskCompletionEvent.EMPTY_ARRAY; |
| boolean notifyFetcher = false; |
| synchronized (allMapEvents) { |
| if (allMapEvents.size() > fromId) { |
| int actualMax = Math.min(max, (allMapEvents.size() - fromId)); |
| List <TaskCompletionEvent> eventSublist = |
| allMapEvents.subList(fromId, actualMax + fromId); |
| mapEvents = eventSublist.toArray(mapEvents); |
| } else { |
| // Notify Fetcher thread. |
| notifyFetcher = true; |
| } |
| } |
| if (notifyFetcher) { |
| synchronized (waitingOn) { |
| waitingOn.notify(); |
| } |
| } |
| return mapEvents; |
| } |
| |
| public boolean fetchMapCompletionEvents(long currTime) throws IOException { |
| if (!fetchAgain && (currTime - lastFetchTime) < heartbeatInterval) { |
| return false; |
| } |
| int currFromEventId = 0; |
| synchronized (fromEventId) { |
| currFromEventId = fromEventId.get(); |
| List <TaskCompletionEvent> recentMapEvents = |
| queryJobTracker(fromEventId, jobId, jobClient); |
| synchronized (allMapEvents) { |
| allMapEvents.addAll(recentMapEvents); |
| } |
| lastFetchTime = currTime; |
| if (fromEventId.get() - currFromEventId >= probe_sample_size) { |
| //return true when we have fetched the full payload, indicating |
| //that we should fetch again immediately (there might be more to |
| //fetch |
| fetchAgain = true; |
| return true; |
| } |
| } |
| fetchAgain = false; |
| return false; |
| } |
| } |
| |
| private static LocalDirAllocator lDirAlloc = |
| new LocalDirAllocator(MRConfig.LOCAL_DIR); |
| |
| // intialize the job directory |
| private void localizeJob(TaskInProgress tip) throws IOException { |
| Task t = tip.getTask(); |
| JobID jobId = t.getJobID(); |
| RunningJob rjob = addTaskToJob(jobId, tip); |
| |
| // Initialize the user directories if needed. |
| getLocalizer().initializeUserDirs(t.getUser()); |
| |
| synchronized (rjob) { |
| if (!rjob.localized) { |
| |
| JobConf localJobConf = localizeJobFiles(t); |
| |
| // Now initialize the job via task-controller so as to set |
| // ownership/permissions of jars, job-work-dir. Note that initializeJob |
| // should be the last call after every other directory/file to be |
| // directly under the job directory is created. |
| JobInitializationContext context = new JobInitializationContext(); |
| context.jobid = jobId; |
| context.user = localJobConf.getUser(); |
| context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR)); |
| taskController.initializeJob(context); |
| |
| rjob.jobConf = localJobConf; |
| rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) || |
| localJobConf.getKeepFailedTaskFiles()); |
| FSDataInputStream in = localFs.open(new Path( |
| rjob.jobConf.get(JobContext.JOB_TOKEN_FILE))); |
| JobTokens jt = new JobTokens(); |
| jt.readFields(in); |
| rjob.jobTokens = jt; // store JobToken object per job |
| |
| rjob.localized = true; |
| } |
| } |
| launchTaskForJob(tip, new JobConf(rjob.jobConf)); |
| } |
| |
| /** |
| * Localize the job on this tasktracker. Specifically |
| * <ul> |
| * <li>Cleanup and create job directories on all disks</li> |
| * <li>Download the job config file job.xml from the FS</li> |
| * <li>Create the job work directory and set {@link TaskTracker#JOB_LOCAL_DIR} |
| * in the configuration. |
| * <li>Download the job jar file job.jar from the FS, unjar it and set jar |
| * file in the configuration.</li> |
| * </ul> |
| * |
| * @param t task whose job has to be localized on this TT |
| * @return the modified job configuration to be used for all the tasks of this |
| * job as a starting point. |
| * @throws IOException |
| */ |
| JobConf localizeJobFiles(Task t) |
| throws IOException { |
| JobID jobId = t.getJobID(); |
| String userName = t.getUser(); |
| |
| // Initialize the job directories |
| FileSystem localFs = FileSystem.getLocal(fConf); |
| getLocalizer().initializeJobDirs(userName, jobId); |
| |
| // Download the job.xml for this job from the system FS |
| Path localJobFile = |
| localizeJobConfFile(new Path(t.getJobFile()), userName, jobId); |
| |
| JobConf localJobConf = new JobConf(localJobFile); |
| |
| // create the 'job-work' directory: job-specific shared directory for use as |
| // scratch space by all tasks of the same job running on this TaskTracker. |
| Path workDir = |
| lDirAlloc.getLocalPathForWrite(getJobWorkDir(userName, jobId |
| .toString()), fConf); |
| if (!localFs.mkdirs(workDir)) { |
| throw new IOException("Mkdirs failed to create " |
| + workDir.toString()); |
| } |
| System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath()); |
| localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath()); |
| |
| // Download the job.jar for this job from the system FS |
| localizeJobJarFile(userName, jobId, localFs, localJobConf); |
| // save local copy of JobToken file |
| localizeJobTokenFile(userName, jobId, localJobConf); |
| return localJobConf; |
| } |
| |
| /** |
| * Download the job configuration file from the FS. |
| * |
| * @param t Task whose job file has to be downloaded |
| * @param jobId jobid of the task |
| * @return the local file system path of the downloaded file. |
| * @throws IOException |
| */ |
| private Path localizeJobConfFile(Path jobFile, String user, JobID jobId) |
| throws IOException { |
| // Get sizes of JobFile and JarFile |
| // sizes are -1 if they are not present. |
| FileStatus status = null; |
| long jobFileSize = -1; |
| try { |
| status = systemFS.getFileStatus(jobFile); |
| jobFileSize = status.getLen(); |
| } catch(FileNotFoundException fe) { |
| jobFileSize = -1; |
| } |
| |
| Path localJobFile = |
| lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(user, jobId.toString()), |
| jobFileSize, fConf); |
| |
| // Download job.xml |
| systemFS.copyToLocalFile(jobFile, localJobFile); |
| return localJobFile; |
| } |
| |
| /** |
| * Download the job jar file from FS to the local file system and unjar it. |
| * Set the local jar file in the passed configuration. |
| * |
| * @param jobId |
| * @param localFs |
| * @param localJobConf |
| * @throws IOException |
| */ |
| private void localizeJobJarFile(String user, JobID jobId, FileSystem localFs, |
| JobConf localJobConf) |
| throws IOException { |
| // copy Jar file to the local FS and unjar it. |
| String jarFile = localJobConf.getJar(); |
| FileStatus status = null; |
| long jarFileSize = -1; |
| if (jarFile != null) { |
| Path jarFilePath = new Path(jarFile); |
| try { |
| status = systemFS.getFileStatus(jarFilePath); |
| jarFileSize = status.getLen(); |
| } catch (FileNotFoundException fe) { |
| jarFileSize = -1; |
| } |
| // Here we check for five times the size of jarFileSize to accommodate for |
| // unjarring the jar file in the jars directory |
| Path localJarFile = |
| lDirAlloc.getLocalPathForWrite( |
| getJobJarFile(user, jobId.toString()), 5 * jarFileSize, fConf); |
| |
| // Download job.jar |
| systemFS.copyToLocalFile(jarFilePath, localJarFile); |
| |
| localJobConf.setJar(localJarFile.toString()); |
| |
| // Also un-jar the job.jar files. We un-jar it so that classes inside |
| // sub-directories, for e.g., lib/, classes/ are available on class-path |
| RunJar.unJar(new File(localJarFile.toString()), new File(localJarFile |
| .getParent().toString())); |
| } |
| } |
| |
| private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{ |
| synchronized (tip) { |
| tip.setJobConf(jobConf); |
| tip.launchTask(); |
| } |
| } |
| |
| ///////////////////////////////////////////////////// |
| // Service Lifecycle |
| ///////////////////////////////////////////////////// |
| |
| /** |
| * A shutdown request triggers termination |
| * @throws IOException when errors happen during termination |
| */ |
| public synchronized void shutdown() throws IOException { |
| close(); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * @throws IOException exceptions which will be logged |
| */ |
| @Override |
| protected void innerClose() throws IOException { |
| synchronized (this) { |
| shuttingDown = true; |
| closeTaskTracker(); |
| if (this.server != null) { |
| try { |
| LOG.info("Shutting down StatusHttpServer"); |
| this.server.stop(); |
| } catch (Exception e) { |
| LOG.warn("Exception shutting down TaskTracker", e); |
| } |
| } |
| stopCleanupThreads(); |
| } |
| } |
| |
| /** |
| * Close down the TaskTracker and all its components. We must also shutdown |
| * any running tasks or threads, and cleanup disk space. A new TaskTracker |
| * within the same process space might be restarted, so everything must be |
| * clean. |
| * @throws IOException when errors happen during shutdown |
| */ |
| protected synchronized void closeTaskTracker() throws IOException { |
| if (!running) { |
| //this operation is a no-op when not already running |
| return; |
| } |
| running = false; |
| // |
| // Kill running tasks. Do this in a 2nd vector, called 'tasksToClose', |
| // because calling jobHasFinished() may result in an edit to 'tasks'. |
| // |
| TreeMap<TaskAttemptID, TaskInProgress> tasksToClose = |
| new TreeMap<TaskAttemptID, TaskInProgress>(); |
| tasksToClose.putAll(tasks); |
| for (TaskInProgress tip : tasksToClose.values()) { |
| tip.jobHasFinished(false); |
| } |
| |
| this.running = false; |
| |
| // Clear local storage |
| cleanupStorage(); |
| |
| // Shutdown the fetcher thread |
| if (mapEventsFetcher != null) { |
| mapEventsFetcher.interrupt(); |
| } |
| //stop the launchers |
| if (mapLauncher != null) { |
| mapLauncher.cleanTaskQueue(); |
| mapLauncher.interrupt(); |
| } |
| if (reduceLauncher != null) { |
| reduceLauncher.cleanTaskQueue(); |
| reduceLauncher.interrupt(); |
| } |
| if (jvmManager != null) { |
| jvmManager.stop(); |
| } |
| |
| // shutdown RPC connections |
| RPC.stopProxy(jobClient); |
| |
| // wait for the fetcher thread to exit |
| for (boolean done = false; !done; ) { |
| try { |
| if (mapEventsFetcher != null) { |
| mapEventsFetcher.join(); |
| } |
| done = true; |
| } catch (InterruptedException e) { |
| } |
| } |
| |
| if (taskReportServer != null) { |
| taskReportServer.stop(); |
| taskReportServer = null; |
| } |
| if (healthChecker != null) { |
| //stop node health checker service |
| healthChecker.stop(); |
| healthChecker = null; |
| } |
| } |
| |
| /** |
| * For testing |
| */ |
| TaskTracker() { |
| server = null; |
| } |
| |
| void setConf(JobConf conf) { |
| fConf = conf; |
| } |
| |
| /** |
| * Start with the local machine name, and the default JobTracker |
| * Create and start a task tracker. |
| * Subclasses must not subclass this constructor, as it may |
| * call their initialisation/startup methods before the construction |
| * is complete |
| * It is here for backwards compatibility. |
| * @param conf configuration |
| * @throws IOException for problems on startup |
| */ |
| public TaskTracker(JobConf conf) throws IOException { |
| this(conf, true); |
| } |
| |
| /** |
| * Subclasses should extend this constructor and pass start=false to the |
| * superclass to avoid race conditions in constructors and threads. |
| * @param conf configuration |
| * @param start flag to set to true to start the daemon. Subclasses should |
| * avoid this, starting themselves outside the constructor, to avoid odd |
| * thread-related race conditions. |
| * @throws IOException for problems on startup |
| */ |
| protected TaskTracker(JobConf conf, boolean start) throws IOException { |
| super(conf); |
| fConf = conf; |
| //for backwards compatibility, the task tracker starts up unless told not |
| //to. Subclasses should be very cautious about having their superclass |
| //do that as subclassed methods can be invoked before the class is fully |
| //configured |
| if (start) { |
| startService(this); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * @throws IOException for any problem. |
| * @throws InterruptedException if the thread was interrupted on startup |
| */ |
| @Override |
| protected synchronized void innerStart() |
| throws IOException, InterruptedException { |
| JobConf conf = fConf; |
| fConf = conf; |
| maxMapSlots = conf.getInt(TT_MAP_SLOTS, 2); |
| maxReduceSlots = conf.getInt(TT_REDUCE_SLOTS, 2); |
| this.jobTrackAddr = JobTracker.getAddress(conf); |
| InetSocketAddress infoSocAddr = NetUtils.createSocketAddr( |
| conf.get(TT_HTTP_ADDRESS, "0.0.0.0:50060")); |
| String httpBindAddress = infoSocAddr.getHostName(); |
| int httpPort = infoSocAddr.getPort(); |
| this.server = new HttpServer("task", httpBindAddress, httpPort, |
| httpPort == 0, conf); |
| workerThreads = conf.getInt(TT_HTTP_THREADS, 40); |
| this.shuffleServerMetrics = new ShuffleServerMetrics(conf); |
| server.setThreads(1, workerThreads); |
| // let the jsp pages get to the task tracker, config, and other relevant |
| // objects |
| FileSystem local = FileSystem.getLocal(conf); |
| this.localDirAllocator = new LocalDirAllocator(MRConfig.LOCAL_DIR); |
| server.setAttribute("task.tracker", this); |
| server.setAttribute("local.file.system", local); |
| server.setAttribute("conf", conf); |
| server.setAttribute("log", LOG); |
| server.setAttribute("localDirAllocator", localDirAllocator); |
| server.setAttribute("shuffleServerMetrics", shuffleServerMetrics); |
| server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class); |
| server.addInternalServlet("taskLog", "/tasklog", TaskLogServlet.class); |
| server.start(); |
| this.httpPort = server.getPort(); |
| checkJettyPort(httpPort); |
| initialize(); |
| } |
| |
| private void checkJettyPort(int port) throws IOException { |
| //See HADOOP-4744 |
| if (port < 0) { |
| shuttingDown = true; |
| throw new IOException("Jetty problem. Jetty didn't bind to a " + |
| "valid port"); |
| } |
| } |
| |
| private void startCleanupThreads() throws IOException { |
| taskCleanupThread = new TaskCleanupThread(); |
| taskCleanupThread.setDaemon(true); |
| taskCleanupThread.start(); |
| directoryCleanupThread = new CleanupQueue(); |
| } |
| |
| /** |
| * Tell the cleanup threads that they should end themselves |
| */ |
| private void stopCleanupThreads() { |
| if (taskCleanupThread != null) { |
| taskCleanupThread.terminate(); |
| taskCleanupThread = null; |
| } |
| } |
| |
| /** |
| * The connection to the JobTracker, used by the TaskRunner |
| * for locating remote files. |
| */ |
| public InterTrackerProtocol getJobClient() { |
| return jobClient; |
| } |
| |
| /** Return the port at which the tasktracker bound to */ |
| public synchronized InetSocketAddress getTaskTrackerReportAddress() { |
| return taskReportAddress; |
| } |
| |
| /** Queries the job tracker for a set of outputs ready to be copied |
| * @param fromEventId the first event ID we want to start from, this is |
| * modified by the call to this method |
| * @param jobClient the job tracker |
| * @return a set of locations to copy outputs from |
| * @throws IOException |
| */ |
| private List<TaskCompletionEvent> queryJobTracker(IntWritable fromEventId, |
| JobID jobId, |
| InterTrackerProtocol jobClient) |
| throws IOException { |
| |
| TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents( |
| jobId, |
| fromEventId.get(), |
| probe_sample_size); |
| //we are interested in map task completion events only. So store |
| //only those |
| List <TaskCompletionEvent> recentMapEvents = |
| new ArrayList<TaskCompletionEvent>(); |
| for (int i = 0; i < t.length; i++) { |
| if (t[i].isMapTask()) { |
| recentMapEvents.add(t[i]); |
| } |
| } |
| fromEventId.set(fromEventId.get() + t.length); |
| return recentMapEvents; |
| } |
| |
| /** |
| * Main service loop. Will stay in this loop forever. |
| */ |
| State offerService() throws Exception { |
| long lastHeartbeat = 0; |
| boolean restartingService = true; |
| |
| while (running && !shuttingDown) { |
| try { |
| long now = System.currentTimeMillis(); |
| |
| long waitTime = heartbeatInterval - (now - lastHeartbeat); |
| if (waitTime > 0) { |
| // sleeps for the wait time or |
| // until there are empty slots to schedule tasks |
| synchronized (finishedCount) { |
| if (finishedCount.get() == 0) { |
| finishedCount.wait(waitTime); |
| } |
| finishedCount.set(0); |
| } |
| } |
| |
| // If the TaskTracker is just starting up: |
| // 1. Verify the buildVersion |
| // 2. Get the system directory & filesystem |
| if(justInited) { |
| LOG.debug("Checking build version with JobTracker"); |
| String jobTrackerBV = jobClient.getBuildVersion(); |
| if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) { |
| String msg = "Shutting down. Incompatible buildVersion." + |
| "\nJobTracker's: " + jobTrackerBV + |
| "\nTaskTracker's: "+ VersionInfo.getBuildVersion(); |
| LOG.error(msg); |
| try { |
| jobClient.reportTaskTrackerError(taskTrackerName, null, msg); |
| } catch(Exception e ) { |
| LOG.info("Problem reporting to jobtracker: " + e, e); |
| } |
| return State.DENIED; |
| } |
| |
| String dir = jobClient.getSystemDir(); |
| if (dir == null) { |
| throw new IOException("Failed to get system directory"); |
| } |
| systemDirectory = new Path(dir); |
| systemFS = systemDirectory.getFileSystem(fConf); |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Starting " + this); |
| LOG.debug("System directory is " + systemDirectory); |
| } |
| } |
| |
| // Send the heartbeat and process the jobtracker's directives |
| HeartbeatResponse heartbeatResponse = transmitHeartBeat(now); |
| |
| // Note the time when the heartbeat returned, use this to decide when to send the |
| // next heartbeat |
| lastHeartbeat = System.currentTimeMillis(); |
| |
| TaskTrackerAction[] actions = heartbeatResponse.getActions(); |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + |
| heartbeatResponse.getResponseId() + " and " + |
| ((actions != null) ? actions.length : 0) + " actions"); |
| } |
| if (reinitTaskTracker(actions)) { |
| return State.STALE; |
| } |
| |
| //At this point the job tracker is present and compatible, |
| //so the service is coming up. |
| //It is time to declare it as such |
| if (restartingService) { |
| //declare the service as live. |
| enterLiveState(); |
| restartingService = false; |
| } |
| |
| // resetting heartbeat interval from the response. |
| heartbeatInterval = heartbeatResponse.getHeartbeatInterval(); |
| justStarted = false; |
| justInited = false; |
| if (actions != null){ |
| for(TaskTrackerAction action: actions) { |
| if (action instanceof LaunchTaskAction) { |
| addToTaskQueue((LaunchTaskAction)action); |
| } else if (action instanceof CommitTaskAction) { |
| CommitTaskAction commitAction = (CommitTaskAction)action; |
| if (!commitResponses.contains(commitAction.getTaskID())) { |
| LOG.info("Received commit task action for " + |
| commitAction.getTaskID()); |
| commitResponses.add(commitAction.getTaskID()); |
| } |
| } else { |
| tasksToCleanup.put(action); |
| } |
| } |
| } |
| markUnresponsiveTasks(); |
| killOverflowingTasks(); |
| |
| //we've cleaned up, resume normal operation |
| if (!acceptNewTasks && isIdle()) { |
| acceptNewTasks=true; |
| } |
| //The check below may not be required every iteration but we are |
| //erring on the side of caution here. We have seen many cases where |
| //the call to jetty's getLocalPort() returns different values at |
| //different times. Being a real paranoid here. |
| checkJettyPort(server.getPort()); |
| } catch (InterruptedException ie) { |
| LOG.info("Interrupted. Closing down."); |
| return State.INTERRUPTED; |
| } catch (DiskErrorException de) { |
| String msg = "Exiting task tracker for disk error:\n" + |
| StringUtils.stringifyException(de); |
| LOG.error(msg); |
| synchronized (this) { |
| jobClient.reportTaskTrackerError(taskTrackerName, |
| "DiskErrorException", msg); |
| } |
| return State.STALE; |
| } catch (RemoteException re) { |
| String reClass = re.getClassName(); |
| if (DisallowedTaskTrackerException.class.getName().equals(reClass)) { |
| LOG.info("Tasktracker disallowed by JobTracker."); |
| return State.DENIED; |
| } |
| } catch (Exception except) { |
| String msg = "Caught exception: " + |
| StringUtils.stringifyException(except); |
| LOG.error(msg); |
| } |
| } |
| |
| return State.NORMAL; |
| } |
| |
| private long previousUpdate = 0; |
| |
| /** |
| * Build and transmit the heart beat to the JobTracker |
| * @param now current time |
| * @return false if the tracker was unknown |
| * @throws IOException |
| */ |
| private HeartbeatResponse transmitHeartBeat(long now) throws IOException { |
| // Send Counters in the status once every COUNTER_UPDATE_INTERVAL |
| boolean sendCounters; |
| if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) { |
| sendCounters = true; |
| previousUpdate = now; |
| } |
| else { |
| sendCounters = false; |
| } |
| |
| // |
| // Check if the last heartbeat got through... |
| // if so then build the heartbeat information for the JobTracker; |
| // else resend the previous status information. |
| // |
| if (status == null) { |
| synchronized (this) { |
| status = new TaskTrackerStatus(taskTrackerName, localHostname, |
| httpPort, |
| cloneAndResetRunningTaskStatuses( |
| sendCounters), |
| failures, |
| maxMapSlots, |
| maxReduceSlots); |
| } |
| } else { |
| LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() + |
| "' with reponseId '" + heartbeatResponseId); |
| } |
| |
| // |
| // Check if we should ask for a new Task |
| // |
| boolean askForNewTask; |
| long localMinSpaceStart; |
| synchronized (this) { |
| askForNewTask = |
| ((status.countOccupiedMapSlots() < maxMapSlots || |
| status.countOccupiedReduceSlots() < maxReduceSlots) && |
| acceptNewTasks); |
| localMinSpaceStart = minSpaceStart; |
| } |
| if (askForNewTask) { |
| checkLocalDirs(fConf.getLocalDirs()); |
| askForNewTask = enoughFreeSpace(localMinSpaceStart); |
| long freeDiskSpace = getFreeSpace(); |
| long totVmem = getTotalVirtualMemoryOnTT(); |
| long totPmem = getTotalPhysicalMemoryOnTT(); |
| |
| status.getResourceStatus().setAvailableSpace(freeDiskSpace); |
| status.getResourceStatus().setTotalVirtualMemory(totVmem); |
| status.getResourceStatus().setTotalPhysicalMemory(totPmem); |
| status.getResourceStatus().setMapSlotMemorySizeOnTT( |
| mapSlotMemorySizeOnTT); |
| status.getResourceStatus().setReduceSlotMemorySizeOnTT( |
| reduceSlotSizeMemoryOnTT); |
| } |
| //add node health information |
| |
| TaskTrackerHealthStatus healthStatus = status.getHealthStatus(); |
| synchronized (this) { |
| if (healthChecker != null) { |
| healthChecker.setHealthStatus(healthStatus); |
| } else { |
| healthStatus.setNodeHealthy(true); |
| healthStatus.setLastReported(0L); |
| healthStatus.setHealthReport(""); |
| } |
| } |
| // |
| // Xmit the heartbeat |
| // |
| HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, |
| justStarted, |
| justInited, |
| askForNewTask, |
| heartbeatResponseId); |
| |
| // |
| // The heartbeat got through successfully! |
| // |
| heartbeatResponseId = heartbeatResponse.getResponseId(); |
| |
| synchronized (this) { |
| for (TaskStatus taskStatus : status.getTaskReports()) { |
| if (taskStatus.getRunState() != TaskStatus.State.RUNNING && |
| taskStatus.getRunState() != TaskStatus.State.UNASSIGNED && |
| taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING && |
| !taskStatus.inTaskCleanupPhase()) { |
| if (taskStatus.getIsMap()) { |
| mapTotal--; |
| } else { |
| reduceTotal--; |
| } |
| try { |
| myInstrumentation.completeTask(taskStatus.getTaskID()); |
| } catch (MetricsException me) { |
| LOG.warn("Caught: " + StringUtils.stringifyException(me)); |
| } |
| runningTasks.remove(taskStatus.getTaskID()); |
| } |
| } |
| |
| // Clear transient status information which should only |
| // be sent once to the JobTracker |
| for (TaskInProgress tip: runningTasks.values()) { |
| tip.getStatus().clearStatus(); |
| } |
| } |
| |
| // Force a rebuild of 'status' on the next iteration |
| status = null; |
| |
| return heartbeatResponse; |
| } |
| |
| /** |
| * Return the total virtual memory available on this TaskTracker. |
| * @return total size of virtual memory. |
| */ |
| long getTotalVirtualMemoryOnTT() { |
| return totalVirtualMemoryOnTT; |
| } |
| |
| /** |
| * Return the total physical memory available on this TaskTracker. |
| * @return total size of physical memory. |
| */ |
| long getTotalPhysicalMemoryOnTT() { |
| return totalPhysicalMemoryOnTT; |
| } |
| |
| long getTotalMemoryAllottedForTasksOnTT() { |
| return totalMemoryAllottedForTasks; |
| } |
| |
| /** |
| * Check if the jobtracker directed a 'reset' of the tasktracker. |
| * |
| * @param actions the directives of the jobtracker for the tasktracker. |
| * @return <code>true</code> if tasktracker is to be reset, |
| * <code>false</code> otherwise. |
| */ |
| private boolean reinitTaskTracker(TaskTrackerAction[] actions) { |
| if (actions != null) { |
| for (TaskTrackerAction action : actions) { |
| if (action.getActionId() == |
| TaskTrackerAction.ActionType.REINIT_TRACKER) { |
| LOG.info("Recieved RenitTrackerAction from JobTracker"); |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Kill any tasks that have not reported progress in the last X seconds. |
| */ |
| private synchronized void markUnresponsiveTasks() throws IOException { |
| long now = System.currentTimeMillis(); |
| for (TaskInProgress tip: runningTasks.values()) { |
| if (tip.getRunState() == TaskStatus.State.RUNNING || |
| tip.getRunState() == TaskStatus.State.COMMIT_PENDING || |
| tip.isCleaningup()) { |
| // Check the per-job timeout interval for tasks; |
| // an interval of '0' implies it is never timed-out |
| long jobTaskTimeout = tip.getTaskTimeout(); |
| if (jobTaskTimeout == 0) { |
| continue; |
| } |
| |
| // Check if the task has not reported progress for a |
| // time-period greater than the configured time-out |
| long timeSinceLastReport = now - tip.getLastProgressReport(); |
| if (timeSinceLastReport > jobTaskTimeout && !tip.wasKilled) { |
| String msg = |
| "Task " + tip.getTask().getTaskID() + " failed to report status for " |
| + (timeSinceLastReport / 1000) + " seconds. Killing!"; |
| LOG.info(tip.getTask().getTaskID() + ": " + msg); |
| ReflectionUtils.logThreadInfo(LOG, "lost task", 30); |
| tip.reportDiagnosticInfo(msg); |
| myInstrumentation.timedoutTask(tip.getTask().getTaskID()); |
| purgeTask(tip, true); |
| } |
| } |
| } |
| } |
| |
| /** |
| * The task tracker is done with this job, so we need to clean up. |
| * @param action The action with the job |
| * @throws IOException |
| */ |
| private synchronized void purgeJob(KillJobAction action) throws IOException { |
| JobID jobId = action.getJobID(); |
| LOG.info("Received 'KillJobAction' for job: " + jobId); |
| RunningJob rjob = null; |
| synchronized (runningJobs) { |
| rjob = runningJobs.get(jobId); |
| } |
| |
| if (rjob == null) { |
| LOG.warn("Unknown job " + jobId + " being deleted."); |
| } else { |
| synchronized (rjob) { |
| // Add this tips of this job to queue of tasks to be purged |
| for (TaskInProgress tip : rjob.tasks) { |
| tip.jobHasFinished(false); |
| Task t = tip.getTask(); |
| if (t.isMapTask()) { |
| indexCache.removeMap(tip.getTask().getTaskID().toString()); |
| } |
| } |
| // Delete the job directory for this |
| // task if the job is done/failed |
| if (!rjob.keepJobFiles) { |
| removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID().toString()); |
| } |
| // Remove this job |
| rjob.tasks.clear(); |
| } |
| } |
| |
| synchronized(runningJobs) { |
| runningJobs.remove(jobId); |
| } |
| } |
| |
| /** |
| * This job's files are no longer needed on this TT, remove them. |
| * |
| * @param rjob |
| * @throws IOException |
| */ |
| void removeJobFiles(String user, String jobId) |
| throws IOException { |
| directoryCleanupThread.addToQueue(localFs, getLocalFiles(fConf, |
| getLocalJobDir(user, jobId))); |
| } |
| |
| /** |
| * Remove the tip and update all relevant state. |
| * |
| * @param tip {@link TaskInProgress} to be removed. |
| * @param wasFailure did the task fail or was it killed? |
| */ |
| private void purgeTask(TaskInProgress tip, boolean wasFailure) |
| throws IOException { |
| if (tip != null) { |
| LOG.info("About to purge task: " + tip.getTask().getTaskID()); |
| |
| // Remove the task from running jobs, |
| // removing the job if it's the last task |
| removeTaskFromJob(tip.getTask().getJobID(), tip); |
| tip.jobHasFinished(wasFailure); |
| if (tip.getTask().isMapTask()) { |
| indexCache.removeMap(tip.getTask().getTaskID().toString()); |
| } |
| } |
| } |
| |
| /** Check if we're dangerously low on disk space |
| * If so, kill jobs to free up space and make sure |
| * we don't accept any new tasks |
| * Try killing the reduce jobs first, since I believe they |
| * use up most space |
| * Then pick the one with least progress |
| */ |
| private void killOverflowingTasks() throws IOException { |
| long localMinSpaceKill; |
| synchronized(this){ |
| localMinSpaceKill = minSpaceKill; |
| } |
| if (!enoughFreeSpace(localMinSpaceKill)) { |
| acceptNewTasks=false; |
| //we give up! do not accept new tasks until |
| //all the ones running have finished and they're all cleared up |
| synchronized (this) { |
| TaskInProgress killMe = findTaskToKill(null); |
| |
| if (killMe!=null) { |
| String msg = "Tasktracker running out of space." + |
| " Killing task."; |
| LOG.info(killMe.getTask().getTaskID() + ": " + msg); |
| killMe.reportDiagnosticInfo(msg); |
| purgeTask(killMe, false); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Pick a task to kill to free up memory/disk-space |
| * @param tasksToExclude tasks that are to be excluded while trying to find a |
| * task to kill. If null, all runningTasks will be searched. |
| * @return the task to kill or null, if one wasn't found |
| */ |
| synchronized TaskInProgress findTaskToKill(List<TaskAttemptID> tasksToExclude) { |
| TaskInProgress killMe = null; |
| for (Iterator it = runningTasks.values().iterator(); it.hasNext();) { |
| TaskInProgress tip = (TaskInProgress) it.next(); |
| |
| if (tasksToExclude != null |
| && tasksToExclude.contains(tip.getTask().getTaskID())) { |
| // exclude this task |
| continue; |
| } |
| |
| if ((tip.getRunState() == TaskStatus.State.RUNNING || |
| tip.getRunState() == TaskStatus.State.COMMIT_PENDING) && |
| !tip.wasKilled) { |
| |
| if (killMe == null) { |
| killMe = tip; |
| |
| } else if (!tip.getTask().isMapTask()) { |
| //reduce task, give priority |
| if (killMe.getTask().isMapTask() || |
| (tip.getTask().getProgress().get() < |
| killMe.getTask().getProgress().get())) { |
| |
| killMe = tip; |
| } |
| |
| } else if (killMe.getTask().isMapTask() && |
| tip.getTask().getProgress().get() < |
| killMe.getTask().getProgress().get()) { |
| //map task, only add if the progress is lower |
| |
| killMe = tip; |
| } |
| } |
| } |
| return killMe; |
| } |
| |
| /** |
| * Check if any of the local directories has enough |
| * free space (more than minSpace) |
| * |
| * If not, do not try to get a new task assigned |
| * @return |
| * @throws IOException |
| */ |
| private boolean enoughFreeSpace(long minSpace) throws IOException { |
| if (minSpace == 0) { |
| return true; |
| } |
| return minSpace < getFreeSpace(); |
| } |
| |
| private long getFreeSpace() throws IOException { |
| long biggestSeenSoFar = 0; |
| String[] localDirs = fConf.getLocalDirs(); |
| for (int i = 0; i < localDirs.length; i++) { |
| DF df = null; |
| if (localDirsDf.containsKey(localDirs[i])) { |
| df = localDirsDf.get(localDirs[i]); |
| } else { |
| df = new DF(new File(localDirs[i]), fConf); |
| localDirsDf.put(localDirs[i], df); |
| } |
| |
| long availOnThisVol = df.getAvailable(); |
| if (availOnThisVol > biggestSeenSoFar) { |
| biggestSeenSoFar = availOnThisVol; |
| } |
| } |
| |
| //Should ultimately hold back the space we expect running tasks to use but |
| //that estimate isn't currently being passed down to the TaskTrackers |
| return biggestSeenSoFar; |
| } |
| |
| /** |
| * Try to get the size of output for this task. |
| * Returns -1 if it can't be found. |
| * @return |
| */ |
| long tryToGetOutputSize(TaskAttemptID taskId, JobConf conf) { |
| |
| try{ |
| TaskInProgress tip; |
| synchronized(this) { |
| tip = tasks.get(taskId); |
| } |
| if(tip == null) |
| return -1; |
| |
| if (!tip.getTask().isMapTask() || |
| tip.getRunState() != TaskStatus.State.SUCCEEDED) { |
| return -1; |
| } |
| |
| MapOutputFile mapOutputFile = new MapOutputFile(); |
| mapOutputFile.setConf(conf); |
| |
| Path tmp_output = mapOutputFile.getOutputFile(); |
| if(tmp_output == null) |
| return 0; |
| FileSystem localFS = FileSystem.getLocal(conf); |
| FileStatus stat = localFS.getFileStatus(tmp_output); |
| if(stat == null) |
| return 0; |
| else |
| return stat.getLen(); |
| } catch(IOException e) { |
| LOG.info(e); |
| return -1; |
| } |
| } |
| |
| private TaskLauncher mapLauncher; |
| private TaskLauncher reduceLauncher; |
| public JvmManager getJvmManagerInstance() { |
| return jvmManager; |
| } |
| |
| private void addToTaskQueue(LaunchTaskAction action) { |
| if (action.getTask().isMapTask()) { |
| mapLauncher.addToTaskQueue(action); |
| } else { |
| reduceLauncher.addToTaskQueue(action); |
| } |
| } |
| |
| private class TaskLauncher extends Thread { |
| private IntWritable numFreeSlots; |
| private final int maxSlots; |
| private List<TaskInProgress> tasksToLaunch; |
| |
| public TaskLauncher(TaskType taskType, int numSlots) { |
| this.maxSlots = numSlots; |
| this.numFreeSlots = new IntWritable(numSlots); |
| this.tasksToLaunch = new LinkedList<TaskInProgress>(); |
| setDaemon(true); |
| setName("TaskLauncher for " + taskType + " tasks"); |
| } |
| |
| public void addToTaskQueue(LaunchTaskAction action) { |
| synchronized (tasksToLaunch) { |
| TaskInProgress tip = registerTask(action, this); |
| tasksToLaunch.add(tip); |
| tasksToLaunch.notifyAll(); |
| } |
| } |
| |
| public void cleanTaskQueue() { |
| tasksToLaunch.clear(); |
| } |
| |
| public void addFreeSlots(int numSlots) { |
| synchronized (numFreeSlots) { |
| numFreeSlots.set(numFreeSlots.get() + numSlots); |
| assert (numFreeSlots.get() <= maxSlots); |
| LOG.info("addFreeSlot : current free slots : " + numFreeSlots.get()); |
| numFreeSlots.notifyAll(); |
| } |
| } |
| |
| public void run() { |
| while (!Thread.interrupted()) { |
| try { |
| TaskInProgress tip; |
| Task task; |
| synchronized (tasksToLaunch) { |
| while (tasksToLaunch.isEmpty()) { |
| tasksToLaunch.wait(); |
| } |
| //get the TIP |
| tip = tasksToLaunch.remove(0); |
| task = tip.getTask(); |
| LOG.info("Trying to launch : " + tip.getTask().getTaskID() + |
| " which needs " + task.getNumSlotsRequired() + " slots"); |
| } |
| //wait for free slots to run |
| synchronized (numFreeSlots) { |
| while (numFreeSlots.get() < task.getNumSlotsRequired()) { |
| LOG.info("TaskLauncher : Waiting for " + task.getNumSlotsRequired() + |
| " to launch " + task.getTaskID() + ", currently we have " + |
| numFreeSlots.get() + " free slots"); |
| numFreeSlots.wait(); |
| } |
| LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+ |
| " and trying to launch "+tip.getTask().getTaskID() + |
| " which needs " + task.getNumSlotsRequired() + " slots"); |
| numFreeSlots.set(numFreeSlots.get() - task.getNumSlotsRequired()); |
| assert (numFreeSlots.get() >= 0); |
| } |
| synchronized (tip) { |
| //to make sure that there is no kill task action for this |
| if (tip.getRunState() != TaskStatus.State.UNASSIGNED && |
| tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN && |
| tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) { |
| //got killed externally while still in the launcher queue |
| addFreeSlots(task.getNumSlotsRequired()); |
| continue; |
| } |
| tip.slotTaken = true; |
| } |
| //got a free slot. launch the task |
| startNewTask(tip); |
| } catch (InterruptedException e) { |
| return; // ALL DONE |
| } catch (Throwable th) { |
| LOG.error("TaskLauncher error " + |
| StringUtils.stringifyException(th)); |
| } |
| } |
| } |
| } |
| private TaskInProgress registerTask(LaunchTaskAction action, |
| TaskLauncher launcher) { |
| Task t = action.getTask(); |
| LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() + |
| " task's state:" + t.getState()); |
| TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher); |
| synchronized (this) { |
| tasks.put(t.getTaskID(), tip); |
| runningTasks.put(t.getTaskID(), tip); |
| boolean isMap = t.isMapTask(); |
| if (isMap) { |
| mapTotal++; |
| } else { |
| reduceTotal++; |
| } |
| } |
| return tip; |
| } |
| /** |
| * Start a new task. |
| * All exceptions are handled locally, so that we don't mess up the |
| * task tracker. |
| */ |
| private void startNewTask(TaskInProgress tip) { |
| try { |
| localizeJob(tip); |
| } catch (Throwable e) { |
| String msg = ("Error initializing " + tip.getTask().getTaskID() + |
| ":\n" + StringUtils.stringifyException(e)); |
| LOG.warn(msg, e); |
| tip.reportDiagnosticInfo(msg); |
| try { |
| tip.kill(true); |
| tip.cleanup(true); |
| } catch (IOException ie2) { |
| LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n" + |
| StringUtils.stringifyException(ie2)); |
| } |
| |
| // Careful! |
| // This might not be an 'Exception' - don't handle 'Error' here! |
| if (e instanceof Error) { |
| throw ((Error) e); |
| } |
| } |
| } |
| |
| void addToMemoryManager(TaskAttemptID attemptId, boolean isMap, |
| JobConf conf) { |
| if (isTaskMemoryManagerEnabled()) { |
| taskMemoryManager.addTask(attemptId, isMap ? conf |
| .getMemoryForMapTask() * 1024 * 1024L : conf |
| .getMemoryForReduceTask() * 1024 * 1024L); |
| } |
| } |
| |
| void removeFromMemoryManager(TaskAttemptID attemptId) { |
| // Remove the entry from taskMemoryManagerThread's data structures. |
| if (isTaskMemoryManagerEnabled()) { |
| taskMemoryManager.removeTask(attemptId); |
| } |
| } |
| |
| /** |
| * Notify the tasktracker to send an out-of-band heartbeat. |
| */ |
| private void notifyTTAboutTaskCompletion() { |
| if (oobHeartbeatOnTaskCompletion) { |
| synchronized (finishedCount) { |
| int value = finishedCount.get(); |
| finishedCount.set(value+1); |
| finishedCount.notify(); |
| } |
| } |
| } |
| |
| /** |
| * The server retry loop. |
| * This while-loop attempts to connect to the JobTracker. It only |
| * loops when the old TaskTracker has gone bad (its state is |
| * stale somehow) and we need to reinitialize everything. |
| */ |
| public void run() { |
| try { |
| startCleanupThreads(); |
| boolean denied = false; |
| while (running && !shuttingDown && !denied) { |
| boolean staleState = false; |
| try { |
| // This while-loop attempts reconnects if we get network errors |
| while (running && !staleState && !shuttingDown && !denied) { |
| try { |
| State osState = offerService(); |
| if (osState == State.STALE) { |
| staleState = true; |
| } else if (osState == State.DENIED) { |
| denied = true; |
| } |
| } catch (Exception ex) { |
| if (!shuttingDown) { |
| LOG.info("Lost connection to JobTracker [" + |
| jobTrackAddr + "]. Retrying...", ex); |
| //enter the started state; we are no longer live |
| enterState(ServiceState.UNDEFINED, ServiceState.STARTED); |
| try { |
| Thread.sleep(5000); |
| } catch (InterruptedException ie) { |
| } |
| } |
| } |
| } |
| } finally { |
| closeTaskTracker(); |
| } |
| if (shuttingDown) { return; } |
| LOG.warn("Reinitializing local state"); |
| initialize(); |
| } |
| if (denied) { |
| shutdown(); |
| } |
| } catch (IOException iex) { |
| LOG.error("Got fatal exception while reinitializing TaskTracker: " + |
| StringUtils.stringifyException(iex)); |
| return; |
| } |
| } |
| |
| /////////////////////////////////////////////////////// |
| // TaskInProgress maintains all the info for a Task that |
| // lives at this TaskTracker. It maintains the Task object, |
| // its TaskStatus, and the TaskRunner. |
| /////////////////////////////////////////////////////// |
| class TaskInProgress { |
| Task task; |
| long lastProgressReport; |
| StringBuffer diagnosticInfo = new StringBuffer(); |
| private TaskRunner runner; |
| volatile boolean done = false; |
| volatile boolean wasKilled = false; |
| private JobConf defaultJobConf; |
| private JobConf localJobConf; |
| private boolean keepFailedTaskFiles; |
| private boolean alwaysKeepTaskFiles; |
| private TaskStatus taskStatus; |
| private long taskTimeout; |
| private String debugCommand; |
| private volatile boolean slotTaken = false; |
| private TaskLauncher launcher; |
| |
| /** |
| */ |
| public TaskInProgress(Task task, JobConf conf) { |
| this(task, conf, null); |
| } |
| |
| public TaskInProgress(Task task, JobConf conf, TaskLauncher launcher) { |
| this.task = task; |
| this.launcher = launcher; |
| this.lastProgressReport = System.currentTimeMillis(); |
| this.defaultJobConf = conf; |
| localJobConf = null; |
| taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(), |
| 0.0f, |
| task.getNumSlotsRequired(), |
| task.getState(), |
| diagnosticInfo.toString(), |
| "initializing", |
| getName(), |
| task.isTaskCleanupTask() ? |
| TaskStatus.Phase.CLEANUP : |
| task.isMapTask()? TaskStatus.Phase.MAP: |
| TaskStatus.Phase.SHUFFLE, |
| task.getCounters()); |
| taskTimeout = (10 * 60 * 1000); |
| } |
| |
| void localizeTask(Task task) throws IOException{ |
| |
| FileSystem localFs = FileSystem.getLocal(fConf); |
| |
| // create taskDirs on all the disks. |
| getLocalizer().initializeAttemptDirs(task.getUser(), |
| task.getJobID().toString(), task.getTaskID().toString(), |
| task.isTaskCleanupTask()); |
| |
| // create the working-directory of the task |
| Path cwd = |
| lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getUser(), task |
| .getJobID().toString(), task.getTaskID().toString(), task |
| .isTaskCleanupTask()), defaultJobConf); |
| if (!localFs.mkdirs(cwd)) { |
| throw new IOException("Mkdirs failed to create " |
| + cwd.toString()); |
| } |
| |
| localJobConf.set(LOCAL_DIR, |
| fConf.get(LOCAL_DIR)); |
| |
| if (fConf.get(TT_HOST_NAME) != null) { |
| localJobConf.set(TT_HOST_NAME, fConf.get(TT_HOST_NAME)); |
| } |
| |
| keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles(); |
| |
| // Do the task-type specific localization |
| task.localizeConfiguration(localJobConf); |
| |
| List<String[]> staticResolutions = NetUtils.getAllStaticResolutions(); |
| if (staticResolutions != null && staticResolutions.size() > 0) { |
| StringBuffer str = new StringBuffer(); |
| |
| for (int i = 0; i < staticResolutions.size(); i++) { |
| String[] hostToResolved = staticResolutions.get(i); |
| str.append(hostToResolved[0]+"="+hostToResolved[1]); |
| if (i != staticResolutions.size() - 1) { |
| str.append(','); |
| } |
| } |
| localJobConf.set(TT_STATIC_RESOLUTIONS, str.toString()); |
| } |
| if (task.isMapTask()) { |
| debugCommand = localJobConf.getMapDebugScript(); |
| } else { |
| debugCommand = localJobConf.getReduceDebugScript(); |
| } |
| String keepPattern = localJobConf.getKeepTaskFilesPattern(); |
| if (keepPattern != null) { |
| alwaysKeepTaskFiles = |
| Pattern.matches(keepPattern, task.getTaskID().toString()); |
| } else { |
| alwaysKeepTaskFiles = false; |
| } |
| if (debugCommand != null || localJobConf.getProfileEnabled() || |
| alwaysKeepTaskFiles || keepFailedTaskFiles) { |
| //disable jvm reuse |
| localJobConf.setNumTasksToExecutePerJvm(1); |
| } |
| task.setConf(localJobConf); |
| } |
| |
| /** |
| */ |
| public Task getTask() { |
| return task; |
| } |
| |
| TaskRunner getTaskRunner() { |
| return runner; |
| } |
| |
| void setTaskRunner(TaskRunner rnr) { |
| this.runner = rnr; |
| } |
| |
| public synchronized void setJobConf(JobConf lconf){ |
| this.localJobConf = lconf; |
| keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles(); |
| taskTimeout = localJobConf.getLong(JobContext.TASK_TIMEOUT, |
| 10 * 60 * 1000); |
| } |
| |
| public synchronized JobConf getJobConf() { |
| return localJobConf; |
| } |
| |
| /** |
| */ |
| public synchronized TaskStatus getStatus() { |
| taskStatus.setDiagnosticInfo(diagnosticInfo.toString()); |
| if (diagnosticInfo.length() > 0) { |
| diagnosticInfo = new StringBuffer(); |
| } |
| |
| return taskStatus; |
| } |
| |
| /** |
| * Kick off the task execution |
| */ |
| public synchronized void launchTask() throws IOException { |
| if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED || |
| this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN || |
| this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) { |
| localizeTask(task); |
| if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) { |
| this.taskStatus.setRunState(TaskStatus.State.RUNNING); |
| } |
| setTaskRunner(task.createRunner(TaskTracker.this, this)); |
| this.runner.start(); |
| this.taskStatus.setStartTime(System.currentTimeMillis()); |
| } else { |
| LOG.info("Not launching task: " + task.getTaskID() + |
| " since it's state is " + this.taskStatus.getRunState()); |
| } |
| } |
| |
| boolean isCleaningup() { |
| return this.taskStatus.inTaskCleanupPhase(); |
| } |
| |
| /** |
| * The task is reporting its progress |
| */ |
| public synchronized void reportProgress(TaskStatus taskStatus) |
| { |
| LOG.info(task.getTaskID() + " " + taskStatus.getProgress() + |
| "% " + taskStatus.getStateString()); |
| // task will report its state as |
| // COMMIT_PENDING when it is waiting for commit response and |
| // when it is committing. |
| // cleanup attempt will report its state as FAILED_UNCLEAN/KILLED_UNCLEAN |
| if (this.done || |
| (this.taskStatus.getRunState() != TaskStatus.State.RUNNING && |
| this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING && |
| !isCleaningup()) || |
| ((this.taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING || |
| this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN || |
| this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) && |
| taskStatus.getRunState() == TaskStatus.State.RUNNING)) { |
| //make sure we ignore progress messages after a task has |
| //invoked TaskUmbilicalProtocol.done() or if the task has been |
| //KILLED/FAILED/FAILED_UNCLEAN/KILLED_UNCLEAN |
| //Also ignore progress update if the state change is from |
| //COMMIT_PENDING/FAILED_UNCLEAN/KILLED_UNCLEA to RUNNING |
| LOG.info(task.getTaskID() + " Ignoring status-update since " + |
| ((this.done) ? "task is 'done'" : |
| ("runState: " + this.taskStatus.getRunState())) |
| ); |
| return; |
| } |
| |
| this.taskStatus.statusUpdate(taskStatus); |
| this.lastProgressReport = System.currentTimeMillis(); |
| } |
| |
| /** |
| */ |
| public long getLastProgressReport() { |
| return lastProgressReport; |
| } |
| |
| /** |
| */ |
| public TaskStatus.State getRunState() { |
| return taskStatus.getRunState(); |
| } |
| |
| /** |
| * The task's configured timeout. |
| * |
| * @return the task's configured timeout. |
| */ |
| public long getTaskTimeout() { |
| return taskTimeout; |
| } |
| |
| /** |
| * The task has reported some diagnostic info about its status |
| */ |
| public synchronized void reportDiagnosticInfo(String info) { |
| this.diagnosticInfo.append(info); |
| } |
| |
| public synchronized void reportNextRecordRange(SortedRanges.Range range) { |
| this.taskStatus.setNextRecordRange(range); |
| } |
| |
| /** |
| * The task is reporting that it's done running |
| */ |
| public synchronized void reportDone() { |
| if (isCleaningup()) { |
| if (this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) { |
| this.taskStatus.setRunState(TaskStatus.State.FAILED); |
| } else if (this.taskStatus.getRunState() == |
| TaskStatus.State.KILLED_UNCLEAN) { |
| this.taskStatus.setRunState(TaskStatus.State.KILLED); |
| } |
| } else { |
| this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED); |
| } |
| this.taskStatus.setProgress(1.0f); |
| this.taskStatus.setFinishTime(System.currentTimeMillis()); |
| this.done = true; |
| jvmManager.taskFinished(runner); |
| runner.signalDone(); |
| LOG.info("Task " + task.getTaskID() + " is done."); |
| LOG.info("reported output size for " + task.getTaskID() + " was " + taskStatus.getOutputSize()); |
| |
| } |
| |
| public boolean wasKilled() { |
| return wasKilled; |
| } |
| |
| /** |
| * A task is reporting in as 'done'. |
| * |
| * We need to notify the tasktracker to send an out-of-band heartbeat. |
| * If isn't <code>commitPending</code>, we need to finalize the task |
| * and release the slot it's occupied. |
| * |
| * @param commitPending is the task-commit pending? |
| */ |
| void reportTaskFinished(boolean commitPending) { |
| if (!commitPending) { |
| taskFinished(); |
| releaseSlot(); |
| } |
| notifyTTAboutTaskCompletion(); |
| } |
| |
| /* State changes: |
| * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/FAILED/KILLED_UNCLEAN/KILLED |
| * FAILED_UNCLEAN -> FAILED |
| * KILLED_UNCLEAN -> KILLED |
| */ |
| private void setTaskFailState(boolean wasFailure) { |
| // go FAILED_UNCLEAN -> FAILED and KILLED_UNCLEAN -> KILLED always |
| if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) { |
| taskStatus.setRunState(TaskStatus.State.FAILED); |
| } else if (taskStatus.getRunState() == |
| TaskStatus.State.KILLED_UNCLEAN) { |
| taskStatus.setRunState(TaskStatus.State.KILLED); |
| } else if (task.isMapOrReduce() && |
| taskStatus.getPhase() != TaskStatus.Phase.CLEANUP) { |
| if (wasFailure) { |
| taskStatus.setRunState(TaskStatus.State.FAILED_UNCLEAN); |
| } else { |
| taskStatus.setRunState(TaskStatus.State.KILLED_UNCLEAN); |
| } |
| } else { |
| if (wasFailure) { |
| taskStatus.setRunState(TaskStatus.State.FAILED); |
| } else { |
| taskStatus.setRunState(TaskStatus.State.KILLED); |
| } |
| } |
| } |
| |
| /** |
| * The task has actually finished running. |
| */ |
| public void taskFinished() { |
| long start = System.currentTimeMillis(); |
| |
| // |
| // Wait until task reports as done. If it hasn't reported in, |
| // wait for a second and try again. |
| // |
| while (!done && (System.currentTimeMillis() - start < WAIT_FOR_DONE)) { |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException ie) { |
| } |
| } |
| |
| // |
| // Change state to success or failure, depending on whether |
| // task was 'done' before terminating |
| // |
| boolean needCleanup = false; |
| synchronized (this) { |
| // Remove the task from MemoryManager, if the task SUCCEEDED or FAILED. |
| // KILLED tasks are removed in method kill(), because Kill |
| // would result in launching a cleanup attempt before |
| // TaskRunner returns; if remove happens here, it would remove |
| // wrong task from memory manager. |
| if (done || !wasKilled) { |
| removeFromMemoryManager(task.getTaskID()); |
| } |
| if (!done) { |
| if (!wasKilled) { |
| failures += 1; |
| setTaskFailState(true); |
| // call the script here for the failed tasks. |
| if (debugCommand != null) { |
| runDebugScript(); |
| } |
| } |
| taskStatus.setProgress(0.0f); |
| } |
| this.taskStatus.setFinishTime(System.currentTimeMillis()); |
| needCleanup = (taskStatus.getRunState() == TaskStatus.State.FAILED || |
| taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN || |
| taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN || |
| taskStatus.getRunState() == TaskStatus.State.KILLED); |
| } |
| |
| // |
| // If the task has failed, or if the task was killAndCleanup()'ed, |
| // we should clean up right away. We only wait to cleanup |
| // if the task succeeded, and its results might be useful |
| // later on to downstream job processing. |
| // |
| if (needCleanup) { |
| removeTaskFromJob(task.getJobID(), this); |
| } |
| try { |
| cleanup(needCleanup); |
| } catch (IOException ie) { |
| } |
| |
| } |
| |
| private void runDebugScript() { |
| String taskStdout =""; |
| String taskStderr =""; |
| String taskSyslog =""; |
| String jobConf = task.getJobFile(); |
| try { |
| // get task's stdout file |
| taskStdout = FileUtil.makeShellPath( |
| TaskLog.getRealTaskLogFileLocation |
| (task.getTaskID(), TaskLog.LogName.STDOUT)); |
| // get task's stderr file |
| taskStderr = FileUtil.makeShellPath( |
| TaskLog.getRealTaskLogFileLocation |
| (task.getTaskID(), TaskLog.LogName.STDERR)); |
| // get task's syslog file |
| taskSyslog = FileUtil.makeShellPath( |
| TaskLog.getRealTaskLogFileLocation |
| (task.getTaskID(), TaskLog.LogName.SYSLOG)); |
| } catch(IOException e){ |
| LOG.warn("Exception finding task's stdout/err/syslog files"); |
| } |
| File workDir = null; |
| try { |
| workDir = |
| new File(lDirAlloc.getLocalPathToRead( |
| TaskTracker.getLocalTaskDir(task.getUser(), task |
| .getJobID().toString(), task.getTaskID() |
| .toString(), task.isTaskCleanupTask()) |
| + Path.SEPARATOR + MRConstants.WORKDIR, |
| localJobConf).toString()); |
| } catch (IOException e) { |
| LOG.warn("Working Directory of the task " + task.getTaskID() + |
| " doesnt exist. Caught exception " + |
| StringUtils.stringifyException(e)); |
| } |
| // Build the command |
| File stdout = TaskLog.getRealTaskLogFileLocation( |
| task.getTaskID(), TaskLog.LogName.DEBUGOUT); |
| // add pipes program as argument if it exists. |
| String program =""; |
| String executable = Submitter.getExecutable(localJobConf); |
| if ( executable != null) { |
| try { |
| program = new URI(executable).getFragment(); |
| } catch (URISyntaxException ur) { |
| LOG.warn("Problem in the URI fragment for pipes executable"); |
| } |
| } |
| String [] debug = debugCommand.split(" "); |
| List<String> vargs = new ArrayList<String>(); |
| for (String component : debug) { |
| vargs.add(component); |
| } |
| vargs.add(taskStdout); |
| vargs.add(taskStderr); |
| vargs.add(taskSyslog); |
| vargs.add(jobConf); |
| vargs.add(program); |
| DebugScriptContext context = |
| new TaskController.DebugScriptContext(); |
| context.args = vargs; |
| context.stdout = stdout; |
| context.workDir = workDir; |
| context.task = task; |
| try { |
| getTaskController().runDebugScript(context); |
| // add all lines of debug out to diagnostics |
| try { |
| int num = localJobConf.getInt(JobContext.TASK_DEBUGOUT_LINES, |
| -1); |
| addDiagnostics(FileUtil.makeShellPath(stdout),num, |
| "DEBUG OUT"); |
| } catch(IOException ioe) { |
| LOG.warn("Exception in add diagnostics!"); |
| } |
| } catch (IOException ie) { |
| LOG.warn("runDebugScript failed with: " + StringUtils. |
| stringifyException(ie)); |
| } |
| } |
| |
| /** |
| * Add last 'num' lines of the given file to the diagnostics. |
| * if num =-1, all the lines of file are added to the diagnostics. |
| * @param file The file from which to collect diagnostics. |
| * @param num The number of lines to be sent to diagnostics. |
| * @param tag The tag is printed before the diagnostics are printed. |
| */ |
| public void addDiagnostics(String file, int num, String tag) { |
| RandomAccessFile rafile = null; |
| try { |
| rafile = new RandomAccessFile(file,"r"); |
| int no_lines =0; |
| String line = null; |
| StringBuffer tail = new StringBuffer(); |
| tail.append("\n-------------------- "+tag+"---------------------\n"); |
| String[] lines = null; |
| if (num >0) { |
| lines = new String[num]; |
| } |
| while ((line = rafile.readLine()) != null) { |
| no_lines++; |
| if (num >0) { |
| if (no_lines <= num) { |
| lines[no_lines-1] = line; |
| } |
| else { // shift them up |
| for (int i=0; i<num-1; ++i) { |
| lines[i] = lines[i+1]; |
| } |
| lines[num-1] = line; |
| } |
| } |
| else if (num == -1) { |
| tail.append(line); |
| tail.append("\n"); |
| } |
| } |
| int n = no_lines > num ?num:no_lines; |
| if (num >0) { |
| for (int i=0;i<n;i++) { |
| tail.append(lines[i]); |
| tail.append("\n"); |
| } |
| } |
| if(n!=0) |
| reportDiagnosticInfo(tail.toString()); |
| } catch (FileNotFoundException fnfe){ |
| LOG.warn("File "+file+ " not found"); |
| } catch (IOException ioe){ |
| LOG.warn("Error reading file "+file); |
| } finally { |
| try { |
| if (rafile != null) { |
| rafile.close(); |
| } |
| } catch (IOException ioe) { |
| LOG.warn("Error closing file "+file); |
| } |
| } |
| } |
| |
| /** |
| * We no longer need anything from this task, as the job has |
| * finished. If the task is still running, kill it and clean up. |
| * |
| * @param wasFailure did the task fail, as opposed to was it killed by |
| * the framework |
| */ |
| public void jobHasFinished(boolean wasFailure) throws IOException { |
| // Kill the task if it is still running |
| synchronized(this){ |
| if (getRunState() == TaskStatus.State.RUNNING || |
| getRunState() == TaskStatus.State.UNASSIGNED || |
| getRunState() == TaskStatus.State.COMMIT_PENDING || |
| isCleaningup()) { |
| kill(wasFailure); |
| } |
| } |
| |
| // Cleanup on the finished task |
| cleanup(true); |
| } |
| |
| /** |
| * Something went wrong and the task must be killed. |
| * |
| * @param wasFailure was it a failure (versus a kill request)? |
| */ |
| public synchronized void kill(boolean wasFailure) throws IOException { |
| if (taskStatus.getRunState() == TaskStatus.State.RUNNING || |
| taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING || |
| isCleaningup()) { |
| wasKilled = true; |
| if (wasFailure) { |
| failures += 1; |
| } |
| // runner could be null if task-cleanup attempt is not localized yet |
| if (runner != null) { |
| runner.kill(); |
| } |
| setTaskFailState(wasFailure); |
| } else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) { |
| if (wasFailure) { |
| failures += 1; |
| taskStatus.setRunState(TaskStatus.State.FAILED); |
| } else { |
| taskStatus.setRunState(TaskStatus.State.KILLED); |
| } |
| } |
| taskStatus.setFinishTime(System.currentTimeMillis()); |
| removeFromMemoryManager(task.getTaskID()); |
| releaseSlot(); |
| notifyTTAboutTaskCompletion(); |
| } |
| |
| private synchronized void releaseSlot() { |
| if (slotTaken) { |
| if (launcher != null) { |
| launcher.addFreeSlots(task.getNumSlotsRequired()); |
| } |
| slotTaken = false; |
| } |
| } |
| |
| /** |
| * The map output has been lost. |
| */ |
| private synchronized void mapOutputLost(String failure |
| ) throws IOException { |
| if (taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING || |
| taskStatus.getRunState() == TaskStatus.State.SUCCEEDED) { |
| // change status to failure |
| LOG.info("Reporting output lost:"+task.getTaskID()); |
| taskStatus.setRunState(TaskStatus.State.FAILED); |
| taskStatus.setProgress(0.0f); |
| reportDiagnosticInfo("Map output lost, rescheduling: " + |
| failure); |
| runningTasks.put(task.getTaskID(), this); |
| mapTotal++; |
| } else { |
| LOG.warn("Output already reported lost:"+task.getTaskID()); |
| } |
| } |
| |
| /** |
| * We no longer need anything from this task. Either the |
| * controlling job is all done and the files have been copied |
| * away, or the task failed and we don't need the remains. |
| * Any calls to cleanup should not lock the tip first. |
| * cleanup does the right thing- updates tasks in Tasktracker |
| * by locking tasktracker first and then locks the tip. |
| * |
| * if needCleanup is true, the whole task directory is cleaned up. |
| * otherwise the current working directory of the task |
| * i.e. <taskid>/work is cleaned up. |
| */ |
| void cleanup(boolean needCleanup) throws IOException { |
| TaskAttemptID taskId = task.getTaskID(); |
| LOG.debug("Cleaning up " + taskId); |
| |
| |
| synchronized (TaskTracker.this) { |
| if (needCleanup) { |
| // see if tasks data structure is holding this tip. |
| // tasks could hold the tip for cleanup attempt, if cleanup attempt |
| // got launched before this method. |
| if (tasks.get(taskId) == this) { |
| tasks.remove(taskId); |
| } |
| } |
| synchronized (this){ |
| if (alwaysKeepTaskFiles || |
| (taskStatus.getRunState() == TaskStatus.State.FAILED && |
| keepFailedTaskFiles)) { |
| return; |
| } |
| } |
| } |
| synchronized (this) { |
| // localJobConf could be null if localization has not happened |
| // then no cleanup will be required. |
| if (localJobConf == null) { |
| return; |
| } |
| try { |
| removeTaskFiles(needCleanup, taskId); |
| } catch (Throwable ie) { |
| LOG.info("Error cleaning up task runner: " |
| + StringUtils.stringifyException(ie)); |
| } |
| } |
| } |
| |
| /** |
| * Some or all of the files from this task are no longer required. Remove |
| * them via CleanupQueue. |
| * |
| * @param needCleanup |
| * @param taskId |
| * @throws IOException |
| */ |
| void removeTaskFiles(boolean needCleanup, TaskAttemptID taskId) |
| throws IOException { |
| if (needCleanup) { |
| if (runner != null) { |
| // cleans up the output directory of the task (where map outputs |
| // and reduce inputs get stored) |
| runner.close(); |
| } |
| |
| String localTaskDir = |
| getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId |
| .toString(), task.isTaskCleanupTask()); |
| if (localJobConf.getNumTasksToExecutePerJvm() == 1) { |
| // No jvm reuse, remove everything |
| directoryCleanupThread.addToQueue(localFs, getLocalFiles( |
| defaultJobConf, localTaskDir)); |
| } else { |
| // Jvm reuse. We don't delete the workdir since some other task |
| // (running in the same JVM) might be using the dir. The JVM |
| // running the tasks would clean the workdir per a task in the |
| // task process itself. |
| directoryCleanupThread.addToQueue(localFs, getLocalFiles( |
| defaultJobConf, localTaskDir + Path.SEPARATOR |
| + TaskTracker.JOBFILE)); |
| } |
| } else { |
| if (localJobConf.getNumTasksToExecutePerJvm() == 1) { |
| String taskWorkDir = |
| getTaskWorkDir(task.getUser(), task.getJobID().toString(), |
| taskId.toString(), task.isTaskCleanupTask()); |
| directoryCleanupThread.addToQueue(localFs, getLocalFiles( |
| defaultJobConf, taskWorkDir)); |
| } |
| } |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| return (obj instanceof TaskInProgress) && |
| task.getTaskID().equals |
| (((TaskInProgress) obj).getTask().getTaskID()); |
| } |
| |
| @Override |
| public int hashCode() { |
| return task.getTaskID().hashCode(); |
| } |
| } |
| |
| |
| // /////////////////////////////////////////////////////////////// |
| // TaskUmbilicalProtocol |
| ///////////////////////////////////////////////////////////////// |
| |
| /** |
| * Called upon startup by the child process, to fetch Task data. |
| */ |
| public synchronized JvmTask getTask(JvmContext context) |
| throws IOException { |
| JVMId jvmId = context.jvmId; |
| |
| // save pid of task JVM sent by child |
| jvmManager.setPidToJvm(jvmId, context.pid); |
| |
| LOG.debug("JVM with ID : " + jvmId + " asked for a task"); |
| if (!jvmManager.isJvmKnown(jvmId)) { |
| LOG.info("Killing unknown JVM " + jvmId); |
| return new JvmTask(null, true); |
| } |
| RunningJob rjob = runningJobs.get(jvmId.getJobId()); |
| if (rjob == null) { //kill the JVM since the job is dead |
| LOG.info("Killing JVM " + jvmId + " since job " + jvmId.getJobId() + |
| " is dead"); |
| jvmManager.killJvm(jvmId); |
| return new JvmTask(null, true); |
| } |
| TaskInProgress tip = jvmManager.getTaskForJvm(jvmId); |
| if (tip == null) { |
| return new JvmTask(null, false); |
| } |
| if (tasks.get(tip.getTask().getTaskID()) != null) { //is task still present |
| LOG.info("JVM with ID: " + jvmId + " given task: " + |
| tip.getTask().getTaskID()); |
| return new JvmTask(tip.getTask(), false); |
| } else { |
| LOG.info("Killing JVM with ID: " + jvmId + " since scheduled task: " + |
| tip.getTask().getTaskID() + " is " + tip.taskStatus.getRunState()); |
| return new JvmTask(null, true); |
| } |
| } |
| |
| /** |
| * Called periodically to report Task progress, from 0.0 to 1.0. |
| */ |
| public synchronized boolean statusUpdate(TaskAttemptID taskid, |
| TaskStatus taskStatus) |
| throws IOException { |
| TaskInProgress tip = tasks.get(taskid); |
| if (tip != null) { |
| tip.reportProgress(taskStatus); |
| return true; |
| } else { |
| LOG.warn("Progress from unknown child task: "+taskid); |
| return false; |
| } |
| } |
| |
| /** |
| * Called when the task dies before completion, and we want to report back |
| * diagnostic info |
| */ |
| public synchronized void reportDiagnosticInfo(TaskAttemptID taskid, String info) throws IOException { |
| TaskInProgress tip = tasks.get(taskid); |
| if (tip != null) { |
| tip.reportDiagnosticInfo(info); |
| } else { |
| LOG.warn("Error from unknown child task: "+taskid+". Ignored."); |
| } |
| } |
| |
| public synchronized void reportNextRecordRange(TaskAttemptID taskid, |
| SortedRanges.Range range) throws IOException { |
| TaskInProgress tip = tasks.get(taskid); |
| if (tip != null) { |
| tip.reportNextRecordRange(range); |
| } else { |
| LOG.warn("reportNextRecordRange from unknown child task: "+taskid+". " + |
| "Ignored."); |
| } |
| } |
| |
| /** Child checking to see if we're alive. Normally does nothing.*/ |
| public synchronized boolean ping(TaskAttemptID taskid) throws IOException { |
| return tasks.get(taskid) != null; |
| } |
| |
| /** |
| * Task is reporting that it is in commit_pending |
| * and it is waiting for the commit Response |
| */ |
| public synchronized void commitPending(TaskAttemptID taskid, |
| TaskStatus taskStatus) |
| throws IOException { |
| LOG.info("Task " + taskid + " is in commit-pending," +"" + |
| " task state:" +taskStatus.getRunState()); |
| statusUpdate(taskid, taskStatus); |
| reportTaskFinished(taskid, true); |
| } |
| |
| /** |
| * Child checking whether it can commit |
| */ |
| public synchronized boolean canCommit(TaskAttemptID taskid) { |
| return commitResponses.contains(taskid); //don't remove it now |
| } |
| |
| /** |
| * The task is done. |
| */ |
| public synchronized void done(TaskAttemptID taskid) |
| throws IOException { |
| TaskInProgress tip = tasks.get(taskid); |
| commitResponses.remove(taskid); |
| if (tip != null) { |
| tip.reportDone(); |
| } else { |
| LOG.warn("Unknown child task done: "+taskid+". Ignored."); |
| } |
| } |
| |
| |
| /** |
| * A reduce-task failed to shuffle the map-outputs. Kill the task. |
| */ |
| public synchronized void shuffleError(TaskAttemptID taskId, String message) |
| throws IOException { |
| LOG.fatal("Task: " + taskId + " - Killed due to Shuffle Failure: " + message); |
| TaskInProgress tip = runningTasks.get(taskId); |
| tip.reportDiagnosticInfo("Shuffle Error: " + message); |
| purgeTask(tip, true); |
| } |
| |
| /** |
| * A child task had a local filesystem error. Kill the task. |
| */ |
| public synchronized void fsError(TaskAttemptID taskId, String message) |
| throws IOException { |
| LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message); |
| TaskInProgress tip = runningTasks.get(taskId); |
| tip.reportDiagnosticInfo("FSError: " + message); |
| purgeTask(tip, true); |
| } |
| |
| /** |
| * A child task had a fatal error. Kill the task. |
| */ |
| public synchronized void fatalError(TaskAttemptID taskId, String msg) |
| throws IOException { |
| LOG.fatal("Task: " + taskId + " - exited : " + msg); |
| TaskInProgress tip = runningTasks.get(taskId); |
| tip.reportDiagnosticInfo("Error: " + msg); |
| purgeTask(tip, true); |
| } |
| |
| public synchronized MapTaskCompletionEventsUpdate getMapCompletionEvents( |
| JobID jobId, int fromEventId, int maxLocs, TaskAttemptID id) |
| throws IOException { |
| TaskCompletionEvent[]mapEvents = TaskCompletionEvent.EMPTY_ARRAY; |
| synchronized (shouldReset) { |
| if (shouldReset.remove(id)) { |
| return new MapTaskCompletionEventsUpdate(mapEvents, true); |
| } |
| } |
| RunningJob rjob; |
| synchronized (runningJobs) { |
| rjob = runningJobs.get(jobId); |
| if (rjob != null) { |
| synchronized (rjob) { |
| FetchStatus f = rjob.getFetchStatus(); |
| if (f != null) { |
| mapEvents = f.getMapEvents(fromEventId, maxLocs); |
| } |
| } |
| } |
| } |
| return new MapTaskCompletionEventsUpdate(mapEvents, false); |
| } |
| |
| ///////////////////////////////////////////////////// |
| // Called by TaskTracker thread after task process ends |
| ///////////////////////////////////////////////////// |
| /** |
| * The task is no longer running. It may not have completed successfully |
| */ |
| void reportTaskFinished(TaskAttemptID taskid, boolean commitPending) { |
| TaskInProgress tip; |
| synchronized (this) { |
| tip = tasks.get(taskid); |
| } |
| if (tip != null) { |
| tip.reportTaskFinished(commitPending); |
| } else { |
| LOG.warn("Unknown child task finished: "+taskid+". Ignored."); |
| } |
| } |
| |
| |
| /** |
| * A completed map task's output has been lost. |
| */ |
| public synchronized void mapOutputLost(TaskAttemptID taskid, |
| String errorMsg) throws IOException { |
| TaskInProgress tip = tasks.get(taskid); |
| if (tip != null) { |
| tip.mapOutputLost(errorMsg); |
| } else { |
| LOG.warn("Unknown child with bad map output: "+taskid+". Ignored."); |
| } |
| } |
| |
| /** |
| * The datastructure for initializing a job |
| */ |
| static class RunningJob{ |
| private JobID jobid; |
| private JobConf jobConf; |
| // keep this for later use |
| volatile Set<TaskInProgress> tasks; |
| boolean localized; |
| boolean keepJobFiles; |
| FetchStatus f; |
| JobTokens jobTokens; |
| RunningJob(JobID jobid) { |
| this.jobid = jobid; |
| localized = false; |
| tasks = new HashSet<TaskInProgress>(); |
| keepJobFiles = false; |
| } |
| |
| JobID getJobID() { |
| return jobid; |
| } |
| |
| void setFetchStatus(FetchStatus f) { |
| this.f = f; |
| } |
| |
| FetchStatus getFetchStatus() { |
| return f; |
| } |
| } |
| |
| /** |
| * Get the name for this task tracker. |
| * @return the string like "tracker_mymachine:50010" |
| */ |
| String getName() { |
| return taskTrackerName; |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * @return the name of this service |
| */ |
| @Override |
| public String getServiceName() { |
| return taskTrackerName != null ? taskTrackerName : "Task Tracker"; |
| } |
| |
| private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses( |
| boolean sendCounters) { |
| List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size()); |
| for(TaskInProgress tip: runningTasks.values()) { |
| TaskStatus status = tip.getStatus(); |
| status.setIncludeCounters(sendCounters); |
| status.setOutputSize(tryToGetOutputSize(status.getTaskID(), fConf)); |
| // send counters for finished or failed tasks and commit pending tasks |
| if (status.getRunState() != TaskStatus.State.RUNNING) { |
| status.setIncludeCounters(true); |
| } |
| result.add((TaskStatus)status.clone()); |
| status.clearStatus(); |
| } |
| return result; |
| } |
| /** |
| * Get the list of tasks that will be reported back to the |
| * job tracker in the next heartbeat cycle. |
| * @return a copy of the list of TaskStatus objects |
| */ |
| synchronized List<TaskStatus> getRunningTaskStatuses() { |
| List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size()); |
| for(TaskInProgress tip: runningTasks.values()) { |
| result.add(tip.getStatus()); |
| } |
| return result; |
| } |
| |
| /** |
| * Get the list of stored tasks on this task tracker. |
| * @return |
| */ |
| synchronized List<TaskStatus> getNonRunningTasks() { |
| List<TaskStatus> result = new ArrayList<TaskStatus>(tasks.size()); |
| for(Map.Entry<TaskAttemptID, TaskInProgress> task: tasks.entrySet()) { |
| if (!runningTasks.containsKey(task.getKey())) { |
| result.add(task.getValue().getStatus()); |
| } |
| } |
| return result; |
| } |
| |
| |
| /** |
| * Get the list of tasks from running jobs on this task tracker. |
| * @return a copy of the list of TaskStatus objects |
| */ |
| synchronized List<TaskStatus> getTasksFromRunningJobs() { |
| List<TaskStatus> result = new ArrayList<TaskStatus>(tasks.size()); |
| for (Map.Entry <JobID, RunningJob> item : runningJobs.entrySet()) { |
| RunningJob rjob = item.getValue(); |
| synchronized (rjob) { |
| for (TaskInProgress tip : rjob.tasks) { |
| result.add(tip.getStatus()); |
| } |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Get the default job conf for this tracker. |
| */ |
| JobConf getJobConf() { |
| return fConf; |
| } |
| |
| /** |
| * Check if the given local directories |
| * (and parent directories, if necessary) can be created. |
| * @param localDirs where the new TaskTracker should keep its local files. |
| * @throws DiskErrorException if all local directories are not writable |
| */ |
| private static void checkLocalDirs(String[] localDirs) |
| throws DiskErrorException { |
| boolean writable = false; |
| |
| if (localDirs != null) { |
| for (int i = 0; i < localDirs.length; i++) { |
| try { |
| DiskChecker.checkDir(new File(localDirs[i])); |
| writable = true; |
| } catch(DiskErrorException e) { |
| LOG.warn("Task Tracker local " + e.getMessage()); |
| } |
| } |
| } |
| |
| if (!writable) |
| throw new DiskErrorException( |
| "all local directories are not writable"); |
| } |
| |
| /** |
| * Is this task tracker idle? |
| * @return has this task tracker finished and cleaned up all of its tasks? |
| */ |
| public synchronized boolean isIdle() { |
| return tasks.isEmpty() && tasksToCleanup.isEmpty(); |
| } |
| |
| /** |
| * Start the TaskTracker, point toward the indicated JobTracker |
| */ |
| public static void main(String argv[]) throws Exception { |
| StringUtils.startupShutdownMessage(TaskTracker.class, argv, LOG); |
| if (argv.length != 0) { |
| System.out.println("usage: TaskTracker"); |
| System.exit(-1); |
| } |
| try { |
| JobConf conf=new JobConf(); |
| // enable the server to track time spent waiting on locks |
| ReflectionUtils.setContentionTracing |
| (conf.getBoolean(TT_CONTENTION_TRACKING, false)); |
| TaskTracker tracker = new TaskTracker(conf, false); |
| Service.startService(tracker); |
| tracker.run(); |
| } catch (Throwable e) { |
| LOG.error("Can not start task tracker because "+ |
| StringUtils.stringifyException(e)); |
| System.exit(-1); |
| } |
| } |
| |
| /** |
| * This class is used in TaskTracker's Jetty to serve the map outputs |
| * to other nodes. |
| */ |
| public static class MapOutputServlet extends HttpServlet { |
| private static final long serialVersionUID = 1L; |
| private static final int MAX_BYTES_TO_READ = 64 * 1024; |
| @Override |
| public void doGet(HttpServletRequest request, |
| HttpServletResponse response |
| ) throws ServletException, IOException { |
| long start = System.currentTimeMillis(); |
| String mapIds = request.getParameter("map"); |
| String reduceId = request.getParameter("reduce"); |
| String jobId = request.getParameter("job"); |
| |
| LOG.debug("Shuffle started for maps (mapIds=" + mapIds + ") to reduce " + |
| reduceId); |
| |
| if (jobId == null) { |
| throw new IOException("job parameter is required"); |
| } |
| |
| if (mapIds == null || reduceId == null) { |
| throw new IOException("map and reduce parameters are required"); |
| } |
| |
| ServletContext context = getServletContext(); |
| int reduce = Integer.parseInt(reduceId); |
| DataOutputStream outStream = null; |
| |
| ShuffleServerMetrics shuffleMetrics = |
| (ShuffleServerMetrics) context.getAttribute("shuffleServerMetrics"); |
| TaskTracker tracker = |
| (TaskTracker) context.getAttribute("task.tracker"); |
| |
| verifyRequest(request, response, tracker, jobId); |
| |
| int numMaps = 0; |
| try { |
| shuffleMetrics.serverHandlerBusy(); |
| outStream = new DataOutputStream(response.getOutputStream()); |
| //use the same buffersize as used for reading the data from disk |
| response.setBufferSize(MAX_BYTES_TO_READ); |
| JobConf conf = (JobConf) context.getAttribute("conf"); |
| LocalDirAllocator lDirAlloc = |
| (LocalDirAllocator)context.getAttribute("localDirAllocator"); |
| FileSystem rfs = ((LocalFileSystem) |
| context.getAttribute("local.file.system")).getRaw(); |
| |
| // Split the map ids, send output for one map at a time |
| StringTokenizer itr = new StringTokenizer(mapIds, ","); |
| while(itr.hasMoreTokens()) { |
| String mapId = itr.nextToken(); |
| ++numMaps; |
| sendMapFile(jobId, mapId, reduce, conf, outStream, |
| tracker, lDirAlloc, shuffleMetrics, rfs); |
| } |
| } catch (IOException ie) { |
| Log log = (Log) context.getAttribute("log"); |
| String errorMsg = ("getMapOutputs(" + mapIds + "," + reduceId + |
| ") failed"); |
| log.warn(errorMsg, ie); |
| response.sendError(HttpServletResponse.SC_GONE, errorMsg); |
| shuffleMetrics.failedOutput(); |
| throw ie; |
| } finally { |
| shuffleMetrics.serverHandlerFree(); |
| } |
| outStream.close(); |
| shuffleMetrics.successOutput(); |
| long timeElapsed = (System.currentTimeMillis()-start); |
| LOG.info("Shuffled " + numMaps |
| + "maps (mapIds=" + mapIds + ") to reduce " |
| + reduceId + " in " + timeElapsed + "s"); |
| |
| if (ClientTraceLog.isInfoEnabled()) { |
| ClientTraceLog.info(String.format(MR_CLIENTTRACE_FORMAT, |
| request.getLocalAddr() + ":" + request.getLocalPort(), |
| request.getRemoteAddr() + ":" + request.getRemotePort(), |
| numMaps, "MAPRED_SHUFFLE", reduceId, |
| timeElapsed)); |
| } |
| } |
| |
| private void sendMapFile(String jobId, String mapId, |
| int reduce, |
| Configuration conf, |
| DataOutputStream outStream, |
| TaskTracker tracker, |
| LocalDirAllocator lDirAlloc, |
| ShuffleServerMetrics shuffleMetrics, |
| FileSystem localfs |
| ) throws IOException { |
| |
| LOG.debug("sendMapFile called for " + mapId + " to reduce " + reduce); |
| |
| // true iff IOException was caused by attempt to access input |
| boolean isInputException = false; |
| FSDataInputStream mapOutputIn = null; |
| byte[] buffer = new byte[MAX_BYTES_TO_READ]; |
| long totalRead = 0; |
| |
| String userName = null; |
| synchronized (tracker.runningJobs) { |
| RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId)); |
| if (rjob == null) { |
| throw new IOException("Unknown job " + jobId + "!!"); |
| } |
| userName = rjob.jobConf.getUser(); |
| } |
| // Index file |
| Path indexFileName = |
| lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( |
| userName, jobId, mapId) |
| + "/file.out.index", conf); |
| |
| // Map-output file |
| Path mapOutputFileName = |
| lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( |
| userName, jobId, mapId) |
| + "/file.out", conf); |
| |
| /** |
| * Read the index file to get the information about where the map-output |
| * for the given reducer is available. |
| */ |
| IndexRecord info = |
| tracker.indexCache.getIndexInformation(mapId, reduce, indexFileName); |
| |
| try { |
| /** |
| * Read the data from the single map-output file and |
| * send it to the reducer. |
| */ |
| //open the map-output file |
| mapOutputIn = localfs.open(mapOutputFileName); |
| //seek to the correct offset for the reduce |
| mapOutputIn.seek(info.startOffset); |
| |
| // write header for each map output |
| ShuffleHeader header = new ShuffleHeader(mapId, info.partLength, |
| info.rawLength, reduce); |
| header.write(outStream); |
| |
| // read the map-output and stream it out |
| isInputException = true; |
| long rem = info.partLength; |
| if (rem == 0) { |
| throw new IOException("Illegal partLength of 0 for mapId " + mapId + |
| " to reduce " + reduce); |
| } |
| int len = |
| mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ)); |
| long now = 0; |
| while (len >= 0) { |
| rem -= len; |
| try { |
| shuffleMetrics.outputBytes(len); |
| |
| if (len > 0) { |
| outStream.write(buffer, 0, len); |
| } else { |
| LOG.info("Skipped zero-length read of map " + mapId + |
| " to reduce " + reduce); |
| } |
| |
| } catch (IOException ie) { |
| isInputException = false; |
| throw ie; |
| } |
| totalRead += len; |
| if (rem == 0) { |
| break; |
| } |
| len = |
| mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ)); |
| } |
| |
| mapOutputIn.close(); |
| } catch (IOException ie) { |
| String errorMsg = "error on sending map " + mapId + " to reduce " + |
| reduce; |
| if (isInputException) { |
| tracker.mapOutputLost(TaskAttemptID.forName(mapId), errorMsg + |
| StringUtils.stringifyException(ie)); |
| } |
| if (mapOutputIn != null) { |
| try { |
| mapOutputIn.close(); |
| } catch (IOException ioe) { |
| LOG.info("problem closing map output file", ioe); |
| } |
| } |
| throw new IOException(errorMsg, ie); |
| } |
| |
| LOG.info("Sent out " + totalRead + " bytes to reduce " + reduce + |
| " from map: " + mapId + " given " + info.partLength + "/" + |
| info.rawLength); |
| } |
| |
| /** |
| * verify that request has correct HASH for the url |
| * and also add a field to reply header with hash of the HASH |
| * @param request |
| * @param response |
| * @param jt the job token |
| * @throws IOException |
| */ |
| private void verifyRequest(HttpServletRequest request, |
| HttpServletResponse response, TaskTracker tracker, String jobId) |
| throws IOException { |
| JobTokens jt = null; |
| synchronized (tracker.runningJobs) { |
| RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId)); |
| if (rjob == null) { |
| throw new IOException("Unknown job " + jobId + "!!"); |
| } |
| jt = rjob.jobTokens; |
| } |
| // string to encrypt |
| String enc_str = SecureShuffleUtils.buildMsgFrom(request); |
| |
| // hash from the fetcher |
| String urlHashStr = request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH); |
| if(urlHashStr == null) { |
| response.sendError(HttpServletResponse.SC_UNAUTHORIZED); |
| throw new IOException("fetcher cannot be authenticated"); |
| } |
| int len = urlHashStr.length(); |
| LOG.debug("verifying request. enc_str="+enc_str+"; hash=..."+ |
| urlHashStr.substring(len-len/2, len-1)); // half of the hash for debug |
| |
| SecureShuffleUtils ssutil = new SecureShuffleUtils(jt.getShuffleJobToken()); |
| // verify - throws exception |
| try { |
| ssutil.verifyReply(urlHashStr, enc_str); |
| } catch (IOException ioe) { |
| response.sendError(HttpServletResponse.SC_UNAUTHORIZED); |
| throw ioe; |
| } |
| |
| // verification passed - encode the reply |
| String reply = ssutil.generateHash(urlHashStr.getBytes()); |
| response.addHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply); |
| |
| len = reply.length(); |
| LOG.debug("Fetcher request verfied. enc_str="+enc_str+";reply=" |
| +reply.substring(len-len/2, len-1)); |
| } |
| } |
| |
| |
| // get the full paths of the directory in all the local disks. |
| private Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{ |
| String[] localDirs = conf.getLocalDirs(); |
| Path[] paths = new Path[localDirs.length]; |
| FileSystem localFs = FileSystem.getLocal(conf); |
| for (int i = 0; i < localDirs.length; i++) { |
| paths[i] = new Path(localDirs[i], subdir); |
| paths[i] = paths[i].makeQualified(localFs); |
| } |
| return paths; |
| } |
| |
| int getMaxCurrentMapTasks() { |
| return maxMapSlots; |
| } |
| |
| int getMaxCurrentReduceTasks() { |
| return maxReduceSlots; |
| } |
| |
| /** |
| * Return a string that is useful in logs and debugging |
| * |
| * @return state of the job tracker |
| */ |
| @Override |
| public String toString() { |
| return super.toString() |
| + " " |
| + (server != null ? |
| (server.toString() + " ") : "") |
| + (taskReportAddress!=null ? |
| ("rpc://" + taskReportAddress + "/ ") : "") |
| + (jobTrackAddr != null ? |
| (" bound to JobTracker " + jobTrackAddr + " ") : ""); |
| } |
| |
| /** |
| * Is the TaskMemoryManager Enabled on this system? |
| * @return true if enabled, false otherwise. |
| */ |
| public boolean isTaskMemoryManagerEnabled() { |
| return taskMemoryManagerEnabled; |
| } |
| |
| public TaskMemoryManagerThread getTaskMemoryManager() { |
| return taskMemoryManager; |
| } |
| |
| /** |
| * Normalize the negative values in configuration |
| * |
| * @param val |
| * @return normalized val |
| */ |
| private long normalizeMemoryConfigValue(long val) { |
| if (val < 0) { |
| val = JobConf.DISABLED_MEMORY_LIMIT; |
| } |
| return val; |
| } |
| |
| /** |
| * Memory-related setup |
| */ |
| private void initializeMemoryManagement() { |
| |
| //handling @deprecated |
| if (fConf.get(MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY) != null) { |
| LOG.warn( |
| JobConf.deprecatedString( |
| MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY)); |
| } |
| |
| //handling @deprecated |
| if (fConf.get(MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY) != null) { |
| LOG.warn( |
| JobConf.deprecatedString( |
| MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY)); |
| } |
| |
| //handling @deprecated |
| if (fConf.get(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY) != null) { |
| LOG.warn( |
| JobConf.deprecatedString( |
| JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY)); |
| } |
| |
| //handling @deprecated |
| if (fConf.get(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY) != null) { |
| LOG.warn( |
| JobConf.deprecatedString( |
| JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY)); |
| } |
| |
| Class<? extends MemoryCalculatorPlugin> clazz = |
| fConf.getClass(TT_MEMORY_CALCULATOR_PLUGIN, |
| null, MemoryCalculatorPlugin.class); |
| MemoryCalculatorPlugin memoryCalculatorPlugin = |
| MemoryCalculatorPlugin |
| .getMemoryCalculatorPlugin(clazz, fConf); |
| LOG.info(" Using MemoryCalculatorPlugin : " + memoryCalculatorPlugin); |
| |
| if (memoryCalculatorPlugin != null) { |
| totalVirtualMemoryOnTT = memoryCalculatorPlugin.getVirtualMemorySize(); |
| if (totalVirtualMemoryOnTT <= 0) { |
| LOG.warn("TaskTracker's totalVmem could not be calculated. " |
| + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT); |
| totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT; |
| } |
| totalPhysicalMemoryOnTT = memoryCalculatorPlugin.getPhysicalMemorySize(); |
| if (totalPhysicalMemoryOnTT <= 0) { |
| LOG.warn("TaskTracker's totalPmem could not be calculated. " |
| + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT); |
| totalPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT; |
| } |
| } |
| |
| mapSlotMemorySizeOnTT = |
| fConf.getLong( |
| MAPMEMORY_MB, |
| JobConf.DISABLED_MEMORY_LIMIT); |
| reduceSlotSizeMemoryOnTT = |
| fConf.getLong( |
| REDUCEMEMORY_MB, |
| JobConf.DISABLED_MEMORY_LIMIT); |
| totalMemoryAllottedForTasks = |
| maxMapSlots * mapSlotMemorySizeOnTT + maxReduceSlots |
| * reduceSlotSizeMemoryOnTT; |
| if (totalMemoryAllottedForTasks < 0) { |
| //adding check for the old keys which might be used by the administrator |
| //while configuration of the memory monitoring on TT |
| long memoryAllotedForSlot = fConf.normalizeMemoryConfigValue( |
| fConf.getLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, |
| JobConf.DISABLED_MEMORY_LIMIT)); |
| long limitVmPerTask = fConf.normalizeMemoryConfigValue( |
| fConf.getLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, |
| JobConf.DISABLED_MEMORY_LIMIT)); |
| if(memoryAllotedForSlot == JobConf.DISABLED_MEMORY_LIMIT) { |
| totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT; |
| } else { |
| if(memoryAllotedForSlot > limitVmPerTask) { |
| LOG.info("DefaultMaxVmPerTask is mis-configured. " + |
| "It shouldn't be greater than task limits"); |
| totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT; |
| } else { |
| totalMemoryAllottedForTasks = (maxMapSlots + |
| maxReduceSlots) * (memoryAllotedForSlot/(1024 * 1024)); |
| } |
| } |
| } |
| if (totalMemoryAllottedForTasks > totalPhysicalMemoryOnTT) { |
| LOG.info("totalMemoryAllottedForTasks > totalPhysicalMemoryOnTT." |
| + " Thrashing might happen."); |
| } else if (totalMemoryAllottedForTasks > totalVirtualMemoryOnTT) { |
| LOG.info("totalMemoryAllottedForTasks > totalVirtualMemoryOnTT." |
| + " Thrashing might happen."); |
| } |
| |
| // start the taskMemoryManager thread only if enabled |
| setTaskMemoryManagerEnabledFlag(); |
| if (isTaskMemoryManagerEnabled()) { |
| taskMemoryManager = new TaskMemoryManagerThread(this); |
| taskMemoryManager.setDaemon(true); |
| taskMemoryManager.start(); |
| } |
| } |
| |
| private void setTaskMemoryManagerEnabledFlag() { |
| if (!ProcfsBasedProcessTree.isAvailable()) { |
| LOG.info("ProcessTree implementation is missing on this system. " |
| + "TaskMemoryManager is disabled."); |
| taskMemoryManagerEnabled = false; |
| return; |
| } |
| |
| if (totalMemoryAllottedForTasks == JobConf.DISABLED_MEMORY_LIMIT) { |
| taskMemoryManagerEnabled = false; |
| LOG.warn("TaskTracker's totalMemoryAllottedForTasks is -1." |
| + " TaskMemoryManager is disabled."); |
| return; |
| } |
| |
| taskMemoryManagerEnabled = true; |
| } |
| |
| /** |
| * Clean-up the task that TaskMemoryMangerThread requests to do so. |
| * @param tid |
| * @param wasFailure mark the task as failed or killed. 'failed' if true, |
| * 'killed' otherwise |
| * @param diagnosticMsg |
| */ |
| synchronized void cleanUpOverMemoryTask(TaskAttemptID tid, boolean wasFailure, |
| String diagnosticMsg) { |
| TaskInProgress tip = runningTasks.get(tid); |
| if (tip != null) { |
| tip.reportDiagnosticInfo(diagnosticMsg); |
| try { |
| purgeTask(tip, wasFailure); // Marking it as failed/killed. |
| } catch (IOException ioe) { |
| LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe, ioe); |
| } |
| } |
| } |
| |
| /** |
| * Wrapper method used by TaskTracker to check if {@link NodeHealthCheckerService} |
| * can be started |
| * @param conf configuration used to check if service can be started |
| * @return true if service can be started |
| */ |
| private boolean shouldStartHealthMonitor(Configuration conf) { |
| return NodeHealthCheckerService.shouldRun(conf); |
| } |
| |
| /** |
| * Wrapper method used to start {@link NodeHealthCheckerService} for |
| * Task Tracker |
| * @param conf Configuration used by the service. |
| */ |
| private void startHealthMonitor(Configuration conf) { |
| healthChecker = new NodeHealthCheckerService(conf); |
| healthChecker.start(); |
| } |
| |
| TrackerDistributedCacheManager getTrackerDistributedCacheManager() { |
| return distributedCacheManager; |
| } |
| |
| /** |
| * Download the job-token file from the FS and save on local fs. |
| * @param user |
| * @param jobId |
| * @param jobConf |
| * @return the local file system path of the downloaded file. |
| * @throws IOException |
| */ |
| private void localizeJobTokenFile(String user, JobID jobId, JobConf jobConf) |
| throws IOException { |
| // check if the tokenJob file is there.. |
| Path skPath = new Path(systemDirectory, |
| jobId.toString()+"/"+JobTokens.JOB_TOKEN_FILENAME); |
| |
| FileStatus status = null; |
| long jobTokenSize = -1; |
| status = systemFS.getFileStatus(skPath); //throws FileNotFoundException |
| jobTokenSize = status.getLen(); |
| |
| Path localJobTokenFile = |
| lDirAlloc.getLocalPathForWrite(getLocalJobTokenFile(user, |
| jobId.toString()), jobTokenSize, fConf); |
| |
| LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() + |
| " to " + localJobTokenFile.toUri().getPath()); |
| |
| // Download job_token |
| systemFS.copyToLocalFile(skPath, localJobTokenFile); |
| // set it into jobConf to transfer the name to TaskRunner |
| jobConf.set(JobContext.JOB_TOKEN_FILE,localJobTokenFile.toString()); |
| } |
| |
| |
| /** |
| * Thread that handles cleanup |
| */ |
| private class TaskCleanupThread extends Daemon { |
| |
| /** |
| * flag to halt work |
| */ |
| private volatile boolean live = true; |
| |
| |
| /** |
| * Construct a daemon thread. |
| */ |
| private TaskCleanupThread() { |
| setName("Task Tracker Task Cleanup Thread"); |
| } |
| |
| /** |
| * End the daemon. This is done by setting the live flag to false and |
| * interrupting ourselves. |
| */ |
| public void terminate() { |
| live = false; |
| interrupt(); |
| } |
| |
| /** |
| * process task kill actions until told to stop being live. |
| */ |
| public void run() { |
| LOG.debug("Task cleanup thread started"); |
| while (live) { |
| try { |
| TaskTrackerAction action = tasksToCleanup.take(); |
| if (action instanceof KillJobAction) { |
| purgeJob((KillJobAction) action); |
| } else if (action instanceof KillTaskAction) { |
| TaskInProgress tip; |
| KillTaskAction killAction = (KillTaskAction) action; |
| synchronized (TaskTracker.this) { |
| tip = tasks.get(killAction.getTaskID()); |
| } |
| LOG.info("Received KillTaskAction for task: " + |
| killAction.getTaskID()); |
| purgeTask(tip, false); |
| } else { |
| LOG.error("Non-delete action given to cleanup thread: " |
| + action); |
| } |
| } catch (InterruptedException except) { |
| //interrupted. this may have reset the live flag |
| } catch (Throwable except) { |
| LOG.warn("Exception in Cleanup thread: " + except, |
| except); |
| } |
| } |
| LOG.debug("Task cleanup thread ending"); |
| } |
| |
| } |
| |
| } |