| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.mapred; |
| |
| |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.io.Writer; |
| import java.net.BindException; |
| import java.net.InetSocketAddress; |
| import java.net.UnknownHostException; |
| import java.security.PrivilegedExceptionAction; |
| import java.text.ParseException; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| import java.util.Vector; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import javax.security.auth.login.LoginException; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalFileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.http.HttpServer; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.ipc.Server; |
| import org.apache.hadoop.ipc.RPC.VersionMismatch; |
| import org.apache.hadoop.mapred.AuditLogger.Constants; |
| import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo; |
| import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException; |
| import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType; |
| import org.apache.hadoop.mapred.JobTrackerStatistics.TaskTrackerStat; |
| import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus; |
| import org.apache.hadoop.mapreduce.ClusterMetrics; |
| import org.apache.hadoop.mapreduce.MRConfig; |
| import org.apache.hadoop.mapreduce.QueueInfo; |
| import org.apache.hadoop.mapreduce.TaskTrackerInfo; |
| import org.apache.hadoop.mapreduce.TaskType; |
| import org.apache.hadoop.mapreduce.jobhistory.JobHistory; |
| import org.apache.hadoop.mapreduce.protocol.ClientProtocol; |
| import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal; |
| import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; |
| import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; |
| import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSecretManager; |
| import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; |
| import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; |
| import org.apache.hadoop.mapreduce.util.ConfigUtil; |
| import org.apache.hadoop.mapreduce.util.MRAsyncDiskService; |
| import org.apache.hadoop.net.DNSToSwitchMapping; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.net.NetworkTopology; |
| import org.apache.hadoop.net.Node; |
| import org.apache.hadoop.net.NodeBase; |
| import org.apache.hadoop.net.ScriptBasedMapping; |
| import org.apache.hadoop.security.AccessControlException; |
| import org.apache.hadoop.security.Groups; |
| import org.apache.hadoop.security.RefreshUserMappingsProtocol; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; |
| import org.apache.hadoop.security.authorize.AccessControlList; |
| import org.apache.hadoop.security.authorize.AuthorizationException; |
| import org.apache.hadoop.security.authorize.ProxyUsers; |
| import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; |
| import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.util.HostsFileReader; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.util.VersionInfo; |
| |
| /******************************************************* |
| * JobTracker is the central location for submitting and |
| * tracking MR jobs in a network environment. |
| * |
| *******************************************************/ |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| public class JobTracker implements MRConstants, InterTrackerProtocol, |
| ClientProtocol, TaskTrackerManager, RefreshUserMappingsProtocol, |
| RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol, JTConfig { |
| |
| static{ |
| ConfigUtil.loadResources(); |
| } |
| |
| private final long tasktrackerExpiryInterval; |
| private final long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour |
| private final DelegationTokenSecretManager secretManager; |
| |
| // The interval after which one fault of a tracker will be discarded, |
| // if there are no faults during this. |
| private static long UPDATE_FAULTY_TRACKER_INTERVAL = 24 * 60 * 60 * 1000; |
| // The maximum percentage of trackers in cluster added |
| // to the 'blacklist' across all the jobs. |
| private static double MAX_BLACKLIST_PERCENT = 0.50; |
| // A tracker is blacklisted across jobs only if number of |
| // blacklists are X% above the average number of blacklists. |
| // X is the blacklist threshold here. |
| private double AVERAGE_BLACKLIST_THRESHOLD = 0.50; |
| // The maximum number of blacklists for a tracker after which the |
| // tracker could be blacklisted across all jobs |
| private int MAX_BLACKLISTS_PER_TRACKER = 4; |
| |
| // Approximate number of heartbeats that could arrive JobTracker |
| // in a second |
| private int NUM_HEARTBEATS_IN_SECOND; |
| private final int DEFAULT_NUM_HEARTBEATS_IN_SECOND = 100; |
| private final int MIN_NUM_HEARTBEATS_IN_SECOND = 1; |
| |
| // Scaling factor for heartbeats, used for testing only |
| private float HEARTBEATS_SCALING_FACTOR; |
| private final float MIN_HEARTBEATS_SCALING_FACTOR = 0.01f; |
| private final float DEFAULT_HEARTBEATS_SCALING_FACTOR = 1.0f; |
| |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| public static enum State { INITIALIZING, RUNNING } |
| State state = State.INITIALIZING; |
| private static final int FS_ACCESS_RETRY_PERIOD = 10000; |
| |
| static final String JOB_INFO_FILE = "job-info"; |
| |
| private DNSToSwitchMapping dnsToSwitchMapping; |
| NetworkTopology clusterMap = new NetworkTopology(); |
| private int numTaskCacheLevels; // the max level to which we cache tasks |
| /** |
| * {@link #nodesAtMaxLevel} is using the keySet from {@link ConcurrentHashMap} |
| * so that it can be safely written to and iterated on via 2 separate threads. |
| * Note: It can only be iterated from a single thread which is feasible since |
| * the only iteration is done in {@link JobInProgress} under the |
| * {@link JobTracker} lock. |
| */ |
| private Set<Node> nodesAtMaxLevel = |
| Collections.newSetFromMap(new ConcurrentHashMap<Node, Boolean>()); |
| final TaskScheduler taskScheduler; |
| private final List<JobInProgressListener> jobInProgressListeners = |
| new CopyOnWriteArrayList<JobInProgressListener>(); |
| |
| // system directory is completely owned by the JobTracker |
| final static FsPermission SYSTEM_DIR_PERMISSION = |
| FsPermission.createImmutable((short) 0700); // rwx------ |
| |
| // system files should have 700 permission |
| final static FsPermission SYSTEM_FILE_PERMISSION = |
| FsPermission.createImmutable((short) 0700); // rwx------ |
| |
| private static Clock clock = null; |
| |
| static final Clock DEFAULT_CLOCK = new Clock(); |
| |
| private final JobHistory jobHistory; |
| |
| private final JobTokenSecretManager jobTokenSecretManager |
| = new JobTokenSecretManager(); |
| |
| JobTokenSecretManager getJobTokenSecretManager() { |
| return jobTokenSecretManager; |
| } |
| |
| private MRAsyncDiskService asyncDiskService; |
| |
| /** |
| * Returns the delegation token secret manager instance in JobTracker. |
| * |
| * @return DelegationTokenSecretManager object |
| */ |
| public DelegationTokenSecretManager getDelegationTokenSecretManager() { |
| return secretManager; |
| } |
| |
| /** |
| * A client tried to submit a job before the Job Tracker was ready. |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| public static class IllegalStateException extends IOException { |
| |
| private static final long serialVersionUID = 1L; |
| |
| public IllegalStateException(String msg) { |
| super(msg); |
| } |
| } |
| |
| private final AtomicInteger nextJobId = new AtomicInteger(1); |
| |
| public static final Log LOG = LogFactory.getLog(JobTracker.class); |
| |
| /** |
| * Returns JobTracker's clock. Note that the correct clock implementation will |
| * be obtained only when the JobTracker is initialized. If the JobTracker is |
| * not initialized then the default clock i.e {@link Clock} is returned. |
| */ |
| static Clock getClock() { |
| return clock == null ? DEFAULT_CLOCK : clock; |
| } |
| |
| /** |
| * Return the JT's job history handle. |
| * @return the jobhistory handle |
| */ |
| public JobHistory getJobHistory() { return jobHistory; } |
| /** |
| * Start the JobTracker with given configuration. |
| * |
| * The conf will be modified to reflect the actual ports on which |
| * the JobTracker is up and running if the user passes the port as |
| * <code>zero</code>. |
| * |
| * @param conf configuration for the JobTracker. |
| * @throws IOException |
| */ |
| public static JobTracker startTracker(JobConf conf) |
| throws IOException, InterruptedException { |
| return startTracker(conf, DEFAULT_CLOCK); |
| } |
| |
| static JobTracker startTracker(JobConf conf, Clock clock) |
| throws IOException, InterruptedException { |
| return startTracker(conf, clock, generateNewIdentifier()); |
| } |
| |
| static JobTracker startTracker(JobConf conf, Clock clock, String identifier) |
| throws IOException, InterruptedException { |
| JobTracker result = null; |
| while (true) { |
| try { |
| result = new JobTracker(conf, clock, identifier); |
| result.taskScheduler.setTaskTrackerManager(result); |
| break; |
| } catch (VersionMismatch e) { |
| throw e; |
| } catch (BindException e) { |
| throw e; |
| } catch (UnknownHostException e) { |
| throw e; |
| } catch (AccessControlException ace) { |
| // in case of jobtracker not having right access |
| // bail out |
| throw ace; |
| } catch (IOException e) { |
| LOG.warn("Error starting tracker: " + |
| StringUtils.stringifyException(e)); |
| } |
| Thread.sleep(1000); |
| } |
| if (result != null) { |
| JobEndNotifier.startNotifier(); |
| } |
| return result; |
| } |
| |
| public void stopTracker() throws IOException { |
| JobEndNotifier.stopNotifier(); |
| close(); |
| } |
| |
| public long getProtocolVersion(String protocol, |
| long clientVersion) throws IOException { |
| if (protocol.equals(InterTrackerProtocol.class.getName())) { |
| return InterTrackerProtocol.versionID; |
| } else if (protocol.equals(ClientProtocol.class.getName())){ |
| return ClientProtocol.versionID; |
| } else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){ |
| return RefreshAuthorizationPolicyProtocol.versionID; |
| } else if (protocol.equals(AdminOperationsProtocol.class.getName())){ |
| return AdminOperationsProtocol.versionID; |
| } else if (protocol.equals(RefreshUserMappingsProtocol.class.getName())){ |
| return RefreshUserMappingsProtocol.versionID; |
| } else { |
| throw new IOException("Unknown protocol to job tracker: " + protocol); |
| } |
| } |
| |
| /** |
| * A thread to timeout tasks that have been assigned to task trackers, |
| * but that haven't reported back yet. |
| * Note that I included a stop() method, even though there is no place |
| * where JobTrackers are cleaned up. |
| */ |
| private class ExpireLaunchingTasks implements Runnable { |
| /** |
| * This is a map of the tasks that have been assigned to task trackers, |
| * but that have not yet been seen in a status report. |
| * map: task-id -> time-assigned |
| */ |
| private Map<TaskAttemptID, Long> launchingTasks = |
| new LinkedHashMap<TaskAttemptID, Long>(); |
| |
| public void run() { |
| while (true) { |
| try { |
| // Every 3 minutes check for any tasks that are overdue |
| Thread.sleep(tasktrackerExpiryInterval/3); |
| long now = clock.getTime(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Starting launching task sweep"); |
| } |
| synchronized (JobTracker.this) { |
| synchronized (launchingTasks) { |
| Iterator<Map.Entry<TaskAttemptID, Long>> itr = |
| launchingTasks.entrySet().iterator(); |
| while (itr.hasNext()) { |
| Map.Entry<TaskAttemptID, Long> pair = itr.next(); |
| TaskAttemptID taskId = pair.getKey(); |
| long age = now - (pair.getValue()).longValue(); |
| LOG.info(taskId + " is " + age + " ms debug."); |
| if (age > tasktrackerExpiryInterval) { |
| LOG.info("Launching task " + taskId + " timed out."); |
| TaskInProgress tip = null; |
| tip = taskidToTIPMap.get(taskId); |
| if (tip != null) { |
| JobInProgress job = tip.getJob(); |
| String trackerName = getAssignedTracker(taskId); |
| TaskTrackerStatus trackerStatus = |
| getTaskTrackerStatus(trackerName); |
| |
| // This might happen when the tasktracker has already |
| // expired and this thread tries to call failedtask |
| // again. expire tasktracker should have called failed |
| // task! |
| if (trackerStatus != null) |
| job.failedTask(tip, taskId, "Error launching task", |
| tip.isMapTask()? TaskStatus.Phase.MAP: |
| TaskStatus.Phase.STARTING, |
| TaskStatus.State.FAILED, |
| trackerName); |
| } |
| itr.remove(); |
| } else { |
| // the tasks are sorted by start time, so once we find |
| // one that we want to keep, we are done for this cycle. |
| break; |
| } |
| } |
| } |
| } |
| } catch (InterruptedException ie) { |
| // all done |
| break; |
| } catch (Exception e) { |
| LOG.error("Expire Launching Task Thread got exception: " + |
| StringUtils.stringifyException(e)); |
| } |
| } |
| } |
| |
| public void addNewTask(TaskAttemptID taskName) { |
| synchronized (launchingTasks) { |
| launchingTasks.put(taskName, |
| clock.getTime()); |
| } |
| } |
| |
| public void removeTask(TaskAttemptID taskName) { |
| synchronized (launchingTasks) { |
| launchingTasks.remove(taskName); |
| } |
| } |
| } |
| |
| /////////////////////////////////////////////////////// |
| // Used to expire TaskTrackers that have gone down |
| /////////////////////////////////////////////////////// |
| class ExpireTrackers implements Runnable { |
| public ExpireTrackers() { |
| } |
| /** |
| * The run method lives for the life of the JobTracker, and removes TaskTrackers |
| * that have not checked in for some time. |
| */ |
| public void run() { |
| while (true) { |
| try { |
| // |
| // Thread runs periodically to check whether trackers should be expired. |
| // The sleep interval must be no more than half the maximum expiry time |
| // for a task tracker. |
| // |
| Thread.sleep(tasktrackerExpiryInterval / 3); |
| checkExpiredTrackers(); |
| } catch (InterruptedException iex) { |
| break; |
| } catch (Exception t) { |
| LOG.error("Tracker Expiry Thread got exception: " + |
| StringUtils.stringifyException(t)); |
| } |
| } |
| } |
| } |
| |
| void checkExpiredTrackers() { |
| // |
| // Loop through all expired items in the queue |
| // |
| // Need to lock the JobTracker here since we are |
| // manipulating it's data-structures via |
| // ExpireTrackers.run -> JobTracker.lostTaskTracker -> |
| // JobInProgress.failedTask -> JobTracker.markCompleteTaskAttempt |
| // Also need to lock JobTracker before locking 'taskTracker' & |
| // 'trackerExpiryQueue' to prevent deadlock: |
| // @see {@link JobTracker.processHeartbeat(TaskTrackerStatus, boolean)} |
| synchronized (JobTracker.this) { |
| synchronized (taskTrackers) { |
| synchronized (trackerExpiryQueue) { |
| long now = clock.getTime(); |
| TaskTrackerStatus leastRecent = null; |
| while ((trackerExpiryQueue.size() > 0) && |
| (leastRecent = trackerExpiryQueue.first()) != null && |
| ((now - leastRecent.getLastSeen()) > |
| tasktrackerExpiryInterval)) { |
| |
| // Remove profile from head of queue |
| trackerExpiryQueue.remove(leastRecent); |
| String trackerName = leastRecent.getTrackerName(); |
| |
| // Figure out if last-seen time should be updated, or if |
| // tracker is dead |
| TaskTracker current = getTaskTracker(trackerName); |
| TaskTrackerStatus newProfile = |
| (current == null ) ? null : current.getStatus(); |
| // Items might leave the taskTracker set through other means; the |
| // status stored in 'taskTrackers' might be null, which means the |
| // tracker has already been destroyed. |
| if (newProfile != null) { |
| if ((now - newProfile.getLastSeen()) > |
| tasktrackerExpiryInterval) { |
| // Remove completely after marking the tasks as 'KILLED' |
| removeTracker(current); |
| // remove the mapping from the hosts list |
| String hostname = newProfile.getHost(); |
| hostnameToTaskTracker.get(hostname).remove(trackerName); |
| } else { |
| // Update time by inserting latest profile |
| trackerExpiryQueue.add(newProfile); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| // Assumes JobTracker, taskTrackers and trackerExpiryQueue are locked on entry |
| private void removeTracker(TaskTracker tracker) { |
| lostTaskTracker(tracker); |
| String trackerName = tracker.getStatus().getTrackerName(); |
| // tracker is lost, and if it is blacklisted, remove |
| // it from the count of blacklisted trackers in the cluster |
| if (isBlacklisted(trackerName)) { |
| faultyTrackers.decrBlackListedTrackers(1); |
| } |
| updateTaskTrackerStatus(trackerName, null); |
| statistics.taskTrackerRemoved(trackerName); |
| getInstrumentation().decTrackers(1); |
| } |
| |
| public synchronized void retireJob(JobID jobid, String historyFile) { |
| synchronized (jobs) { |
| JobInProgress job = jobs.get(jobid); |
| if (job != null) { |
| JobStatus status = job.getStatus(); |
| |
| //set the historyfile |
| if (historyFile != null) { |
| status.setHistoryFile(historyFile); |
| } |
| // clean up job files from the local disk |
| job.cleanupLocalizedJobConf(job.getProfile().getJobID()); |
| |
| //this configuration is primarily for testing |
| //test cases can set this to false to validate job data structures on |
| //job completion |
| boolean retireJob = |
| conf.getBoolean(JT_RETIREJOBS, true); |
| |
| if (retireJob) { |
| //purge the job from memory |
| removeJobTasks(job); |
| jobs.remove(job.getProfile().getJobID()); |
| for (JobInProgressListener l : jobInProgressListeners) { |
| l.jobRemoved(job); |
| } |
| |
| String jobUser = job.getProfile().getUser(); |
| LOG.info("Retired job with id: '" + |
| job.getProfile().getJobID() + "' of user '" + |
| jobUser + "'"); |
| |
| //add the job status to retired cache |
| retireJobs.addToCache(job.getStatus()); |
| } |
| } |
| } |
| } |
| |
| /////////////////////////////////////////////////////// |
| // Used to remove old finished Jobs that have been around for too long |
| /////////////////////////////////////////////////////// |
| class RetireJobs { |
| private final Map<JobID, JobStatus> jobIDStatusMap = |
| new HashMap<JobID, JobStatus>(); |
| private final LinkedList<JobStatus> jobStatusQ = |
| new LinkedList<JobStatus>(); |
| public RetireJobs() { |
| } |
| |
| synchronized void addToCache(JobStatus status) { |
| status.setRetired(); |
| jobStatusQ.add(status); |
| jobIDStatusMap.put(status.getJobID(), status); |
| if (jobStatusQ.size() > retiredJobsCacheSize) { |
| JobStatus removed = jobStatusQ.remove(); |
| jobIDStatusMap.remove(removed.getJobID()); |
| LOG.info("Retired job removed from cache " + removed.getJobID()); |
| } |
| } |
| |
| synchronized JobStatus get(JobID jobId) { |
| return jobIDStatusMap.get(jobId); |
| } |
| |
| @SuppressWarnings("unchecked") |
| synchronized LinkedList<JobStatus> getAll() { |
| return (LinkedList<JobStatus>) jobStatusQ.clone(); |
| } |
| } |
| |
| enum ReasonForBlackListing { |
| EXCEEDING_FAILURES, |
| NODE_UNHEALTHY |
| } |
| |
| // The FaultInfo which indicates the number of faults of a tracker |
| // and when the last fault occurred |
| // and whether the tracker is blacklisted across all jobs or not |
| private static class FaultInfo { |
| static final String FAULT_FORMAT_STRING = "%d failures on the tracker"; |
| int numFaults = 0; |
| long lastUpdated; |
| boolean blacklisted; |
| |
| private boolean isHealthy; |
| private HashMap<ReasonForBlackListing, String>rfbMap; |
| |
| FaultInfo(long time) { |
| numFaults = 0; |
| lastUpdated = time; |
| blacklisted = false; |
| rfbMap = new HashMap<ReasonForBlackListing, String>(); |
| } |
| |
| void setFaultCount(int num) { |
| numFaults = num; |
| } |
| |
| void setLastUpdated(long timeStamp) { |
| lastUpdated = timeStamp; |
| } |
| |
| int getFaultCount() { |
| return numFaults; |
| } |
| |
| long getLastUpdated() { |
| return lastUpdated; |
| } |
| |
| boolean isBlacklisted() { |
| return blacklisted; |
| } |
| |
| void setBlacklist(ReasonForBlackListing rfb, |
| String trackerFaultReport) { |
| blacklisted = true; |
| this.rfbMap.put(rfb, trackerFaultReport); |
| } |
| |
| public void setHealthy(boolean isHealthy) { |
| this.isHealthy = isHealthy; |
| } |
| |
| public boolean isHealthy() { |
| return isHealthy; |
| } |
| |
| public String getTrackerFaultReport() { |
| StringBuffer sb = new StringBuffer(); |
| for(String reasons : rfbMap.values()) { |
| sb.append(reasons); |
| sb.append("\n"); |
| } |
| if (sb.length() > 0) { |
| sb.replace(sb.length()-1, sb.length(), ""); |
| } |
| return sb.toString(); |
| } |
| |
| Set<ReasonForBlackListing> getReasonforblacklisting() { |
| return this.rfbMap.keySet(); |
| } |
| |
| public void unBlacklist() { |
| this.blacklisted = false; |
| this.rfbMap.clear(); |
| } |
| |
| public boolean removeBlackListedReason(ReasonForBlackListing rfb) { |
| String str = rfbMap.remove(rfb); |
| return str!=null; |
| } |
| |
| public void addBlackListedReason(ReasonForBlackListing rfb, String reason) { |
| this.rfbMap.put(rfb, reason); |
| } |
| |
| } |
| |
| private class FaultyTrackersInfo { |
| // A map from hostName to its faults |
| private Map<String, FaultInfo> potentiallyFaultyTrackers = |
| new HashMap<String, FaultInfo>(); |
| // This count gives the number of blacklisted trackers in the cluster |
| // at any time. This is maintained to avoid iteration over |
| // the potentiallyFaultyTrackers to get blacklisted trackers. And also |
| // this count doesn't include blacklisted trackers which are lost, |
| // although the fault info is maintained for lost trackers. |
| private volatile int numBlacklistedTrackers = 0; |
| |
| /** |
| * Increments faults(blacklist by job) for the tracker by one. |
| * |
| * Adds the tracker to the potentially faulty list. |
| * Assumes JobTracker is locked on the entry. |
| * |
| * @param hostName |
| */ |
| void incrementFaults(String hostName) { |
| synchronized (potentiallyFaultyTrackers) { |
| FaultInfo fi = getFaultInfo(hostName, true); |
| long now = clock.getTime(); |
| int numFaults = fi.getFaultCount(); |
| ++numFaults; |
| fi.setFaultCount(numFaults); |
| fi.setLastUpdated(now); |
| if (exceedsFaults(fi)) { |
| LOG.info("Adding " + hostName + " to the blacklist" |
| + " across all jobs"); |
| String reason = String.format(FaultInfo.FAULT_FORMAT_STRING, |
| numFaults); |
| blackListTracker(hostName, reason, |
| ReasonForBlackListing.EXCEEDING_FAILURES); |
| } |
| } |
| } |
| |
| private void incrBlackListedTrackers(int count) { |
| numBlacklistedTrackers += count; |
| getInstrumentation().addBlackListedTrackers(count); |
| } |
| |
| private void decrBlackListedTrackers(int count) { |
| numBlacklistedTrackers -= count; |
| getInstrumentation().decBlackListedTrackers(count); |
| } |
| |
| private void blackListTracker(String hostName, String reason, ReasonForBlackListing rfb) { |
| FaultInfo fi = getFaultInfo(hostName, true); |
| boolean blackListed = fi.isBlacklisted(); |
| if (blackListed) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Adding blacklisted reason for tracker : " + hostName |
| + " Reason for blacklisting is : " + rfb); |
| } |
| if (!fi.getReasonforblacklisting().contains(rfb)) { |
| LOG.info("Adding blacklisted reason for tracker : " + hostName |
| + " Reason for blacklisting is : " + rfb); |
| } |
| fi.addBlackListedReason(rfb, reason); |
| } else { |
| LOG.info("Blacklisting tracker : " + hostName |
| + " Reason for blacklisting is : " + rfb); |
| Set<TaskTracker> trackers = |
| hostnameToTaskTracker.get(hostName); |
| synchronized (trackers) { |
| for (TaskTracker tracker : trackers) { |
| tracker.cancelAllReservations(); |
| } |
| } |
| removeHostCapacity(hostName); |
| fi.setBlacklist(rfb, reason); |
| } |
| } |
| |
| private boolean canUnBlackListTracker(String hostName, |
| ReasonForBlackListing rfb) { |
| FaultInfo fi = getFaultInfo(hostName, false); |
| if (fi == null) { |
| return false; |
| } |
| |
| Set<ReasonForBlackListing> rfbSet = fi.getReasonforblacklisting(); |
| return fi.isBlacklisted() && rfbSet.contains(rfb); |
| } |
| |
| private void unBlackListTracker(String hostName, |
| ReasonForBlackListing rfb) { |
| // check if you can black list the tracker then call this methods |
| FaultInfo fi = getFaultInfo(hostName, false); |
| if (fi.removeBlackListedReason(rfb)) { |
| if (fi.getReasonforblacklisting().isEmpty()) { |
| addHostCapacity(hostName); |
| LOG.info("Unblacklisting tracker : " + hostName); |
| fi.unBlacklist(); |
| //We have unBlackListed tracker, so tracker should |
| //definitely be healthy. Check fault count if fault count |
| //is zero don't keep it memory. |
| if (fi.numFaults == 0) { |
| potentiallyFaultyTrackers.remove(hostName); |
| } |
| } |
| } |
| } |
| |
| // Assumes JobTracker is locked on entry. |
| private FaultInfo getFaultInfo(String hostName, |
| boolean createIfNeccessary) { |
| FaultInfo fi = null; |
| synchronized (potentiallyFaultyTrackers) { |
| fi = potentiallyFaultyTrackers.get(hostName); |
| long now = clock.getTime(); |
| if (fi == null && createIfNeccessary) { |
| fi = new FaultInfo(now); |
| potentiallyFaultyTrackers.put(hostName, fi); |
| } |
| } |
| return fi; |
| } |
| |
| /** |
| * Blacklists the tracker across all jobs if |
| * <ol> |
| * <li>#faults are more than |
| * MAX_BLACKLISTS_PER_TRACKER (configurable) blacklists</li> |
| * <li>#faults is 50% (configurable) above the average #faults</li> |
| * <li>50% the cluster is not blacklisted yet </li> |
| * </ol> |
| */ |
| private boolean exceedsFaults(FaultInfo fi) { |
| int faultCount = fi.getFaultCount(); |
| if (faultCount >= MAX_BLACKLISTS_PER_TRACKER) { |
| // calculate avgBlackLists |
| long clusterSize = getClusterStatus().getTaskTrackers(); |
| long sum = 0; |
| for (FaultInfo f : potentiallyFaultyTrackers.values()) { |
| sum += f.getFaultCount(); |
| } |
| double avg = (double) sum / clusterSize; |
| |
| long totalCluster = clusterSize + numBlacklistedTrackers; |
| if ((faultCount - avg) > (AVERAGE_BLACKLIST_THRESHOLD * avg) && |
| numBlacklistedTrackers < (totalCluster * MAX_BLACKLIST_PERCENT)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Removes the tracker from blacklist and |
| * from potentially faulty list, when it is restarted. |
| * |
| * Assumes JobTracker is locked on the entry. |
| * |
| * @param hostName |
| */ |
| void markTrackerHealthy(String hostName) { |
| synchronized (potentiallyFaultyTrackers) { |
| FaultInfo fi = potentiallyFaultyTrackers.remove(hostName); |
| if (fi != null && fi.isBlacklisted()) { |
| LOG.info("Removing " + hostName + " from blacklist"); |
| addHostCapacity(hostName); |
| } |
| } |
| } |
| |
| /** |
| * Check whether tasks can be assigned to the tracker. |
| * |
| * One fault of the tracker is discarded if there |
| * are no faults during one day. So, the tracker will get a |
| * chance again to run tasks of a job. |
| * Assumes JobTracker is locked on the entry. |
| * |
| * @param hostName The tracker name |
| * @param now The current time |
| * |
| * @return true if the tracker is blacklisted |
| * false otherwise |
| */ |
| boolean shouldAssignTasksToTracker(String hostName, long now) { |
| synchronized (potentiallyFaultyTrackers) { |
| FaultInfo fi = potentiallyFaultyTrackers.get(hostName); |
| if (fi != null && |
| (now - fi.getLastUpdated()) > UPDATE_FAULTY_TRACKER_INTERVAL) { |
| int numFaults = fi.getFaultCount() - 1; |
| fi.setFaultCount(numFaults); |
| fi.setLastUpdated(now); |
| if (canUnBlackListTracker(hostName, |
| ReasonForBlackListing.EXCEEDING_FAILURES)) { |
| unBlackListTracker(hostName, |
| ReasonForBlackListing.EXCEEDING_FAILURES); |
| } |
| } |
| return (fi != null && fi.isBlacklisted()); |
| } |
| } |
| |
| private void removeHostCapacity(String hostName) { |
| synchronized (taskTrackers) { |
| // remove the capacity of trackers on this host |
| int numTrackersOnHost = 0; |
| for (TaskTrackerStatus status : getStatusesOnHost(hostName)) { |
| int mapSlots = status.getMaxMapSlots(); |
| totalMapTaskCapacity -= mapSlots; |
| int reduceSlots = status.getMaxReduceSlots(); |
| totalReduceTaskCapacity -= reduceSlots; |
| ++numTrackersOnHost; |
| getInstrumentation().addBlackListedMapSlots( |
| mapSlots); |
| getInstrumentation().addBlackListedReduceSlots( |
| reduceSlots); |
| } |
| // remove the host |
| uniqueHostsMap.remove(hostName); |
| incrBlackListedTrackers(numTrackersOnHost); |
| } |
| } |
| |
| // This is called on tracker's restart or after a day of blacklist. |
| private void addHostCapacity(String hostName) { |
| synchronized (taskTrackers) { |
| int numTrackersOnHost = 0; |
| // add the capacity of trackers on the host |
| for (TaskTrackerStatus status : getStatusesOnHost(hostName)) { |
| int mapSlots = status.getMaxMapSlots(); |
| totalMapTaskCapacity += mapSlots; |
| int reduceSlots = status.getMaxReduceSlots(); |
| totalReduceTaskCapacity += reduceSlots; |
| numTrackersOnHost++; |
| getInstrumentation().decBlackListedMapSlots(mapSlots); |
| getInstrumentation().decBlackListedReduceSlots(reduceSlots); |
| } |
| uniqueHostsMap.put(hostName, |
| numTrackersOnHost); |
| decrBlackListedTrackers(numTrackersOnHost); |
| } |
| } |
| |
| /** |
| * Whether a host is blacklisted across all the jobs. |
| * |
| * Assumes JobTracker is locked on the entry. |
| * @param hostName |
| * @return |
| */ |
| boolean isBlacklisted(String hostName) { |
| synchronized (potentiallyFaultyTrackers) { |
| FaultInfo fi = null; |
| if ((fi = potentiallyFaultyTrackers.get(hostName)) != null) { |
| return fi.isBlacklisted(); |
| } |
| } |
| return false; |
| } |
| |
| // Assumes JobTracker is locked on the entry. |
| int getFaultCount(String hostName) { |
| synchronized (potentiallyFaultyTrackers) { |
| FaultInfo fi = null; |
| if ((fi = potentiallyFaultyTrackers.get(hostName)) != null) { |
| return fi.getFaultCount(); |
| } |
| } |
| return 0; |
| } |
| |
| // Assumes JobTracker is locked on the entry. |
| Set<ReasonForBlackListing> getReasonForBlackListing(String hostName) { |
| synchronized (potentiallyFaultyTrackers) { |
| FaultInfo fi = null; |
| if ((fi = potentiallyFaultyTrackers.get(hostName)) != null) { |
| return fi.getReasonforblacklisting(); |
| } |
| } |
| return null; |
| } |
| |
| |
| // Assumes JobTracker is locked on the entry. |
| void setNodeHealthStatus(String hostName, boolean isHealthy, String reason) { |
| FaultInfo fi = null; |
| // If tracker is not healthy, create a fault info object |
| // blacklist it. |
| if (!isHealthy) { |
| fi = getFaultInfo(hostName, true); |
| fi.setHealthy(isHealthy); |
| updateNodeHealthFailureStatistics(hostName, fi); |
| synchronized (potentiallyFaultyTrackers) { |
| blackListTracker(hostName, reason, |
| ReasonForBlackListing.NODE_UNHEALTHY); |
| } |
| } else { |
| fi = getFaultInfo(hostName, false); |
| if (fi == null) { |
| return; |
| } else { |
| if (canUnBlackListTracker(hostName, |
| ReasonForBlackListing.NODE_UNHEALTHY)) { |
| unBlackListTracker(hostName, ReasonForBlackListing.NODE_UNHEALTHY); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Update the node health failure statistics of the given |
| * host. |
| * |
| * We increment the count only when the host transitions |
| * from healthy -> unhealthy. |
| * |
| * @param hostName |
| * @param fi Fault info object for the host. |
| */ |
| private void updateNodeHealthFailureStatistics(String hostName, |
| FaultInfo fi) { |
| //Check if the node was already blacklisted due to |
| //unhealthy reason. If so dont increment the count. |
| if (!fi.getReasonforblacklisting().contains( |
| ReasonForBlackListing.NODE_UNHEALTHY)) { |
| Set<TaskTracker> trackers = hostnameToTaskTracker.get(hostName); |
| synchronized (trackers) { |
| for (TaskTracker t : trackers) { |
| TaskTrackerStat stat = statistics.getTaskTrackerStat( |
| t.getTrackerName()); |
| stat.incrHealthCheckFailed(); |
| } |
| } |
| } |
| } |
| |
| } |
| |
| /** |
| * Get all task tracker statuses on given host |
| * |
| * Assumes JobTracker is locked on the entry |
| * @param hostName |
| * @return {@link java.util.List} of {@link TaskTrackerStatus} |
| */ |
| private List<TaskTrackerStatus> getStatusesOnHost(String hostName) { |
| List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>(); |
| synchronized (taskTrackers) { |
| for (TaskTracker tt : taskTrackers.values()) { |
| TaskTrackerStatus status = tt.getStatus(); |
| if (hostName.equals(status.getHost())) { |
| statuses.add(status); |
| } |
| } |
| } |
| return statuses; |
| } |
| |
| /////////////////////////////////////////////////////// |
| // Used to recover the jobs upon restart |
| /////////////////////////////////////////////////////// |
| class RecoveryManager { |
| private Set<JobID> jobsToRecover; // set of jobs to be recovered |
| private int recovered; |
| private int restartCount = 0; |
| private boolean shouldRecover = false; |
| |
| public RecoveryManager() { |
| jobsToRecover = new TreeSet<JobID>(); |
| } |
| |
| public boolean contains(JobID id) { |
| return jobsToRecover.contains(id); |
| } |
| |
| int getRecovered() { |
| return recovered; |
| } |
| |
| void addJobForRecovery(JobID id) { |
| jobsToRecover.add(id); |
| } |
| |
| public boolean shouldRecover() { |
| return shouldRecover; |
| } |
| |
| Set<JobID> getJobsToRecover() { |
| return jobsToRecover; |
| } |
| |
| // add the job |
| void addJobForRecovery(FileStatus status) throws IOException { |
| recoveryManager.addJobForRecovery(JobID.forName(status.getPath().getName())); |
| shouldRecover = true; // enable actual recovery if num-files > 1 |
| } |
| |
| |
| Path getRestartCountFile() { |
| return new Path(getSystemDir(), "jobtracker.info"); |
| } |
| |
| Path getTempRestartCountFile() { |
| return new Path(getSystemDir(), "jobtracker.info.recover"); |
| } |
| |
| /** |
| * Initialize the recovery process. It simply creates a jobtracker.info file |
| * in the jobtracker's system directory and writes its restart count in it. |
| * For the first start, the jobtracker writes '0' in it. Upon subsequent |
| * restarts the jobtracker replaces the count with its current count which |
| * is (old count + 1). The whole purpose of this api is to obtain restart |
| * counts across restarts to avoid attempt-id clashes. |
| * |
| * Note that in between if the jobtracker.info files goes missing then the |
| * jobtracker will disable recovery and continue. |
| * |
| */ |
| void updateRestartCount() throws IOException { |
| Path restartFile = getRestartCountFile(); |
| Path tmpRestartFile = getTempRestartCountFile(); |
| FsPermission filePerm = new FsPermission(SYSTEM_FILE_PERMISSION); |
| |
| // read the count from the jobtracker info file |
| if (fs.exists(restartFile)) { |
| fs.delete(tmpRestartFile, false); // delete the tmp file |
| } else if (fs.exists(tmpRestartFile)) { |
| // if .rec exists then delete the main file and rename the .rec to main |
| fs.rename(tmpRestartFile, restartFile); // rename .rec to main file |
| } else { |
| // For the very first time the jobtracker will create a jobtracker.info |
| // file. If the jobtracker has restarted then disable recovery as files' |
| // needed for recovery are missing. |
| |
| // disable recovery if this is a restart |
| shouldRecover = false; |
| |
| // write the jobtracker.info file |
| try { |
| FSDataOutputStream out = FileSystem.create(fs, restartFile, |
| filePerm); |
| out.writeInt(0); |
| out.close(); |
| } catch (IOException ioe) { |
| LOG.warn("Writing to file " + restartFile + " failed!"); |
| LOG.warn("FileSystem is not ready yet!"); |
| fs.delete(restartFile, false); |
| throw ioe; |
| } |
| return; |
| } |
| |
| FSDataInputStream in = fs.open(restartFile); |
| try { |
| // read the old count |
| restartCount = in.readInt(); |
| ++restartCount; // increment the restart count |
| } catch (IOException ioe) { |
| LOG.warn("System directory is garbled. Failed to read file " |
| + restartFile); |
| LOG.warn("Jobtracker recovery is not possible with garbled" |
| + " system directory! Please delete the system directory and" |
| + " restart the jobtracker. Note that deleting the system" |
| + " directory will result in loss of all the running jobs."); |
| throw new RuntimeException(ioe); |
| } finally { |
| if (in != null) { |
| in.close(); |
| } |
| } |
| |
| // Write back the new restart count and rename the old info file |
| //TODO This is similar to jobhistory recovery, maybe this common code |
| // can be factored out. |
| |
| // write to the tmp file |
| FSDataOutputStream out = FileSystem.create(fs, tmpRestartFile, filePerm); |
| out.writeInt(restartCount); |
| out.close(); |
| |
| // delete the main file |
| fs.delete(restartFile, false); |
| |
| // rename the .rec to main file |
| fs.rename(tmpRestartFile, restartFile); |
| } |
| |
| public void recover() { |
| long recoveryProcessStartTime = clock.getTime(); |
| if (!shouldRecover()) { |
| // clean up jobs structure |
| jobsToRecover.clear(); |
| return; |
| } |
| |
| LOG.info("Starting the recovery process for " + jobsToRecover.size() + |
| " jobs ..."); |
| for (JobID jobId : jobsToRecover) { |
| LOG.info("Submitting job "+ jobId); |
| try { |
| Path jobInfoFile = getSystemFileForJob(jobId); |
| FSDataInputStream in = fs.open(jobInfoFile); |
| JobInfo token = new JobInfo(); |
| token.readFields(in); |
| in.close(); |
| UserGroupInformation ugi = |
| UserGroupInformation.createRemoteUser(token.getUser().toString()); |
| submitJob(token.getJobID(), restartCount, |
| ugi, token.getJobSubmitDir().toString(), true, null); |
| recovered++; |
| } catch (Exception e) { |
| LOG.warn("Could not recover job " + jobId, e); |
| } |
| } |
| recoveryDuration = clock.getTime() - recoveryProcessStartTime; |
| hasRecovered = true; |
| |
| LOG.info("Recovery done! Recoverd " + recovered +" of "+ |
| jobsToRecover.size() + " jobs."); |
| LOG.info("Recovery Duration (ms):" + recoveryDuration); |
| } |
| |
| } |
| |
| private final JobTrackerInstrumentation myInstrumentation; |
| |
| ///////////////////////////////////////////////////////////////// |
| // The real JobTracker |
| //////////////////////////////////////////////////////////////// |
| int port; |
| String localMachine; |
| private final String trackerIdentifier; |
| long startTime; |
| int totalSubmissions = 0; |
| private int totalMapTaskCapacity; |
| private int totalReduceTaskCapacity; |
| private final HostsFileReader hostsReader; |
| |
| // JobTracker recovery variables |
| private volatile boolean hasRecovered = false; |
| private volatile long recoveryDuration; |
| |
| // |
| // Properties to maintain while running Jobs and Tasks: |
| // |
| // 1. Each Task is always contained in a single Job. A Job succeeds when all its |
| // Tasks are complete. |
| // |
| // 2. Every running or successful Task is assigned to a Tracker. Idle Tasks are not. |
| // |
| // 3. When a Tracker fails, all of its assigned Tasks are marked as failures. |
| // |
| // 4. A Task might need to be reexecuted if it (or the machine it's hosted on) fails |
| // before the Job is 100% complete. Sometimes an upstream Task can fail without |
| // reexecution if all downstream Tasks that require its output have already obtained |
| // the necessary files. |
| // |
| |
| // All the known jobs. (jobid->JobInProgress) |
| Map<JobID, JobInProgress> jobs = |
| Collections.synchronizedMap(new TreeMap<JobID, JobInProgress>()); |
| |
| // (trackerID --> list of jobs to cleanup) |
| Map<String, Set<JobID>> trackerToJobsToCleanup = |
| new HashMap<String, Set<JobID>>(); |
| |
| // (trackerID --> list of tasks to cleanup) |
| Map<String, Set<TaskAttemptID>> trackerToTasksToCleanup = |
| new HashMap<String, Set<TaskAttemptID>>(); |
| |
| // All the known TaskInProgress items, mapped to by taskids (taskid->TIP) |
| Map<TaskAttemptID, TaskInProgress> taskidToTIPMap = |
| new TreeMap<TaskAttemptID, TaskInProgress>(); |
| |
| // (taskid --> trackerID) |
| TreeMap<TaskAttemptID, String> taskidToTrackerMap = new TreeMap<TaskAttemptID, String>(); |
| |
| // (trackerID->TreeSet of taskids running at that tracker) |
| TreeMap<String, Set<TaskAttemptID>> trackerToTaskMap = |
| new TreeMap<String, Set<TaskAttemptID>>(); |
| |
| // (trackerID -> TreeSet of completed taskids running at that tracker) |
| TreeMap<String, Set<TaskAttemptID>> trackerToMarkedTasksMap = |
| new TreeMap<String, Set<TaskAttemptID>>(); |
| |
| // (trackerID --> last sent HeartBeatResponse) |
| Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap = |
| new TreeMap<String, HeartbeatResponse>(); |
| |
| // (hostname --> Node (NetworkTopology)) |
| Map<String, Node> hostnameToNodeMap = |
| Collections.synchronizedMap(new TreeMap<String, Node>()); |
| |
| // (hostname --> Set(tasktracker)) |
| // This is used to keep track of all trackers running on one host. While |
| // decommissioning the host, all the trackers on the host will be lost. |
| Map<String, Set<TaskTracker>> hostnameToTaskTracker = |
| Collections.synchronizedMap(new TreeMap<String, Set<TaskTracker>>()); |
| |
| // Number of resolved entries |
| int numResolved; |
| |
| private FaultyTrackersInfo faultyTrackers = new FaultyTrackersInfo(); |
| |
| private JobTrackerStatistics statistics = |
| new JobTrackerStatistics(); |
| // |
| // Watch and expire TaskTracker objects using these structures. |
| // We can map from Name->TaskTrackerStatus, or we can expire by time. |
| // |
| int totalMaps = 0; |
| int totalReduces = 0; |
| private int occupiedMapSlots = 0; |
| private int occupiedReduceSlots = 0; |
| private int reservedMapSlots = 0; |
| private int reservedReduceSlots = 0; |
| private HashMap<String, TaskTracker> taskTrackers = |
| new HashMap<String, TaskTracker>(); |
| Map<String,Integer>uniqueHostsMap = new ConcurrentHashMap<String, Integer>(); |
| ExpireTrackers expireTrackers = new ExpireTrackers(); |
| Thread expireTrackersThread = null; |
| RetireJobs retireJobs = new RetireJobs(); |
| final int retiredJobsCacheSize; |
| ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks(); |
| Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks, |
| "expireLaunchingTasks"); |
| |
| final CompletedJobStatusStore completedJobStatusStore; |
| Thread completedJobsStoreThread = null; |
| final RecoveryManager recoveryManager; |
| |
| /** |
| * It might seem like a bug to maintain a TreeSet of tasktracker objects, |
| * which can be updated at any time. But that's not what happens! We |
| * only update status objects in the taskTrackers table. Status objects |
| * are never updated once they enter the expiry queue. Instead, we wait |
| * for them to expire and remove them from the expiry queue. If a status |
| * object has been updated in the taskTracker table, the latest status is |
| * reinserted. Otherwise, we assume the tracker has expired. |
| */ |
| TreeSet<TaskTrackerStatus> trackerExpiryQueue = |
| new TreeSet<TaskTrackerStatus>( |
| new Comparator<TaskTrackerStatus>() { |
| public int compare(TaskTrackerStatus p1, TaskTrackerStatus p2) { |
| if (p1.getLastSeen() < p2.getLastSeen()) { |
| return -1; |
| } else if (p1.getLastSeen() > p2.getLastSeen()) { |
| return 1; |
| } else { |
| return (p1.getTrackerName().compareTo(p2.getTrackerName())); |
| } |
| } |
| } |
| ); |
| |
| // Used to provide an HTML view on Job, Task, and TaskTracker structures |
| final HttpServer infoServer; |
| int infoPort; |
| |
| Server interTrackerServer; |
| |
| // Some jobs are stored in a local system directory. We can delete |
| // the files when we're done with the job. |
| static final String SUBDIR = "jobTracker"; |
| final LocalFileSystem localFs; |
| FileSystem fs = null; |
| Path systemDir = null; |
| JobConf conf; |
| |
| private final ACLsManager aclsManager; |
| |
| private static MemoryLimits jtMemLimits = new MemoryLimits(); |
| |
| private final QueueManager queueManager; |
| |
| //TO BE USED BY TEST CLASSES ONLY |
| //ONLY BUILD THE STATE WHICH IS REQUIRED BY TESTS |
| JobTracker() { |
| hostsReader = null; |
| retiredJobsCacheSize = 0; |
| infoServer = null; |
| queueManager = null; |
| aclsManager = null; |
| taskScheduler = null; |
| trackerIdentifier = null; |
| recoveryManager = null; |
| jobHistory = null; |
| completedJobStatusStore = null; |
| tasktrackerExpiryInterval = 0; |
| myInstrumentation = new JobTrackerMetricsInst(this, new JobConf()); |
| secretManager = null; |
| localFs = null; |
| } |
| |
| |
| JobTracker(JobConf conf) |
| throws IOException,InterruptedException { |
| this(conf, new Clock()); |
| } |
| /** |
| * Start the JobTracker process, listen on the indicated port |
| */ |
| JobTracker(JobConf conf, Clock clock) |
| throws IOException, InterruptedException { |
| this(conf, clock, generateNewIdentifier()); |
| } |
| |
| JobTracker(final JobConf conf, Clock newClock, String jobtrackerIndentifier) |
| throws IOException, InterruptedException { |
| // Set ports, start RPC servers, setup security policy etc. |
| InetSocketAddress addr = getAddress(conf); |
| this.localMachine = addr.getHostName(); |
| this.port = addr.getPort(); |
| UserGroupInformation.setConfiguration(conf); |
| SecurityUtil.login(conf, JTConfig.JT_KEYTAB_FILE, JTConfig.JT_USER_NAME, |
| localMachine); |
| |
| clock = newClock; |
| |
| long secretKeyInterval = |
| conf.getLong(MRConfig.DELEGATION_KEY_UPDATE_INTERVAL_KEY, |
| MRConfig.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); |
| long tokenMaxLifetime = |
| conf.getLong(MRConfig.DELEGATION_TOKEN_MAX_LIFETIME_KEY, |
| MRConfig.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); |
| long tokenRenewInterval = |
| conf.getLong(MRConfig.DELEGATION_TOKEN_RENEW_INTERVAL_KEY, |
| MRConfig.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); |
| secretManager = |
| new DelegationTokenSecretManager(secretKeyInterval, |
| tokenMaxLifetime, |
| tokenRenewInterval, |
| DELEGATION_TOKEN_GC_INTERVAL); |
| secretManager.startThreads(); |
| |
| // |
| // Grab some static constants |
| // |
| tasktrackerExpiryInterval = |
| conf.getLong(JT_TRACKER_EXPIRY_INTERVAL, 10 * 60 * 1000); |
| retiredJobsCacheSize = conf.getInt(JT_RETIREJOB_CACHE_SIZE, 1000); |
| MAX_BLACKLISTS_PER_TRACKER = |
| conf.getInt(JTConfig.JT_MAX_TRACKER_BLACKLISTS, 4); |
| |
| NUM_HEARTBEATS_IN_SECOND = |
| conf.getInt(JT_HEARTBEATS_IN_SECOND, DEFAULT_NUM_HEARTBEATS_IN_SECOND); |
| if (NUM_HEARTBEATS_IN_SECOND < MIN_NUM_HEARTBEATS_IN_SECOND) { |
| NUM_HEARTBEATS_IN_SECOND = DEFAULT_NUM_HEARTBEATS_IN_SECOND; |
| } |
| |
| HEARTBEATS_SCALING_FACTOR = |
| conf.getFloat(JT_HEARTBEATS_SCALING_FACTOR, |
| DEFAULT_HEARTBEATS_SCALING_FACTOR); |
| if (HEARTBEATS_SCALING_FACTOR < MIN_HEARTBEATS_SCALING_FACTOR) { |
| HEARTBEATS_SCALING_FACTOR = DEFAULT_HEARTBEATS_SCALING_FACTOR; |
| } |
| |
| //This configuration is there solely for tuning purposes and |
| //once this feature has been tested in real clusters and an appropriate |
| //value for the threshold has been found, this config might be taken out. |
| AVERAGE_BLACKLIST_THRESHOLD = conf.getFloat(JTConfig.JT_AVG_BLACKLIST_THRESHOLD, 0.5f); |
| |
| // This is a directory of temporary submission files. We delete it |
| // on startup, and can delete any files that we're done with |
| this.conf = conf; |
| JobConf jobConf = new JobConf(conf); |
| |
| initializeTaskMemoryRelatedConfig(); |
| |
| // Read the hosts/exclude files to restrict access to the jobtracker. |
| this.hostsReader = new HostsFileReader(conf.get(JTConfig.JT_HOSTS_FILENAME, ""), |
| conf.get(JTConfig.JT_HOSTS_EXCLUDE_FILENAME, "")); |
| |
| Configuration clusterConf = new Configuration(this.conf); |
| queueManager = new QueueManager(clusterConf); |
| |
| aclsManager = new ACLsManager(conf, new JobACLsManager(conf), queueManager); |
| |
| LOG.info("Starting jobtracker with owner as " + |
| getMROwner().getShortUserName() + " and supergroup as " + |
| getSuperGroup()); |
| |
| // Create the scheduler |
| Class<? extends TaskScheduler> schedulerClass |
| = conf.getClass(JT_TASK_SCHEDULER, |
| JobQueueTaskScheduler.class, TaskScheduler.class); |
| taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf); |
| |
| int handlerCount = conf.getInt(JT_IPC_HANDLER_COUNT, 10); |
| this.interTrackerServer = RPC.getServer(ClientProtocol.class, |
| this, |
| addr.getHostName(), |
| addr.getPort(), handlerCount, |
| false, conf, secretManager); |
| |
| // Set service-level authorization security policy |
| if (conf.getBoolean( |
| CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) { |
| this.interTrackerServer.refreshServiceAcl(conf, new MapReducePolicyProvider()); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| Properties p = System.getProperties(); |
| for (Iterator it = p.keySet().iterator(); it.hasNext();) { |
| String key = (String) it.next(); |
| String val = p.getProperty(key); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Property '" + key + "' is " + val); |
| } |
| } |
| } |
| |
| InetSocketAddress infoSocAddr = NetUtils.createSocketAddr( |
| conf.get(JT_HTTP_ADDRESS, "0.0.0.0:50030")); |
| String infoBindAddress = infoSocAddr.getHostName(); |
| int tmpInfoPort = infoSocAddr.getPort(); |
| this.startTime = clock.getTime(); |
| infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort, |
| tmpInfoPort == 0, conf); |
| infoServer.setAttribute("job.tracker", this); |
| // initialize history parameters. |
| jobHistory = new JobHistory(); |
| jobHistory.init(this, conf, this.localMachine, this.startTime); |
| |
| infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class); |
| infoServer.start(); |
| |
| this.trackerIdentifier = jobtrackerIndentifier; |
| |
| // Initialize instrumentation |
| JobTrackerInstrumentation tmp; |
| Class<? extends JobTrackerInstrumentation> metricsInst = |
| getInstrumentationClass(jobConf); |
| try { |
| java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c = |
| metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} ); |
| tmp = c.newInstance(this, jobConf); |
| } catch(Exception e) { |
| //Reflection can throw lots of exceptions -- handle them all by |
| //falling back on the default. |
| LOG.error("failed to initialize job tracker metrics", e); |
| tmp = new JobTrackerMetricsInst(this, jobConf); |
| } |
| myInstrumentation = tmp; |
| |
| // The rpc/web-server ports can be ephemeral ports... |
| // ... ensure we have the correct info |
| this.port = interTrackerServer.getListenerAddress().getPort(); |
| this.conf.set(JT_IPC_ADDRESS, (this.localMachine + ":" + this.port)); |
| this.localFs = FileSystem.getLocal(conf); |
| LOG.info("JobTracker up at: " + this.port); |
| this.infoPort = this.infoServer.getPort(); |
| this.conf.set(JT_HTTP_ADDRESS, |
| infoBindAddress + ":" + this.infoPort); |
| LOG.info("JobTracker webserver: " + this.infoServer.getPort()); |
| |
| // start the recovery manager |
| recoveryManager = new RecoveryManager(); |
| |
| while (!Thread.currentThread().isInterrupted()) { |
| try { |
| // if we haven't contacted the namenode go ahead and do it |
| if (fs == null) { |
| fs = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() { |
| public FileSystem run() throws IOException { |
| return FileSystem.get(conf); |
| }}); |
| } |
| // clean up the system dir, which will only work if hdfs is out of |
| // safe mode |
| if (systemDir == null) { |
| systemDir = new Path(getSystemDir()); |
| } |
| try { |
| FileStatus systemDirStatus = fs.getFileStatus(systemDir); |
| if (!systemDirStatus.getOwner().equals( |
| getMROwner().getShortUserName())) { |
| throw new AccessControlException("The systemdir " + systemDir + |
| " is not owned by " + getMROwner().getShortUserName()); |
| } |
| if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) { |
| LOG.warn("Incorrect permissions on " + systemDir + |
| ". Setting it to " + SYSTEM_DIR_PERMISSION); |
| fs.setPermission(systemDir,new FsPermission(SYSTEM_DIR_PERMISSION)); |
| } |
| } catch (FileNotFoundException fnf) {} //ignore |
| // Make sure that the backup data is preserved |
| FileStatus[] systemDirData; |
| try { |
| systemDirData = fs.listStatus(this.systemDir); |
| } catch (FileNotFoundException fnfe) { |
| systemDirData = null; |
| } |
| |
| // Check if the history is enabled .. as we can't have persistence with |
| // history disabled |
| if (conf.getBoolean(JT_RESTART_ENABLED, false) |
| && systemDirData != null) { |
| for (FileStatus status : systemDirData) { |
| try { |
| recoveryManager.addJobForRecovery(status); |
| } catch (Throwable t) { |
| LOG.warn("Failed to add the job " + status.getPath().getName(), |
| t); |
| } |
| } |
| |
| // Check if there are jobs to be recovered |
| if (recoveryManager.shouldRecover()) { |
| break; // if there is something to recover else clean the sys dir |
| } |
| } |
| LOG.info("Cleaning up the system directory"); |
| fs.delete(systemDir, true); |
| if (FileSystem.mkdirs(fs, systemDir, |
| new FsPermission(SYSTEM_DIR_PERMISSION))) { |
| break; |
| } |
| LOG.error("Mkdirs failed to create " + systemDir); |
| } catch (AccessControlException ace) { |
| LOG.warn("Failed to operate on " + JTConfig.JT_SYSTEM_DIR + "(" + systemDir |
| + ") because of permissions."); |
| LOG.warn("Manually delete the " + JTConfig.JT_SYSTEM_DIR + "(" + systemDir |
| + ") and then start the JobTracker."); |
| LOG.warn("Bailing out ... "); |
| throw ace; |
| } catch (IOException ie) { |
| LOG.info("problem cleaning system directory: " + systemDir, ie); |
| } |
| Thread.sleep(FS_ACCESS_RETRY_PERIOD); |
| } |
| |
| if (Thread.currentThread().isInterrupted()) { |
| throw new InterruptedException(); |
| } |
| |
| // Same with 'localDir' except it's always on the local disk. |
| asyncDiskService = new MRAsyncDiskService(FileSystem.getLocal(conf), conf.getLocalDirs()); |
| asyncDiskService.moveAndDeleteFromEachVolume(SUBDIR); |
| |
| // Initialize history DONE folder |
| jobHistory.initDone(conf, fs); |
| final String historyLogDir = |
| jobHistory.getCompletedJobHistoryLocation().toString(); |
| infoServer.setAttribute("historyLogDir", historyLogDir); |
| FileSystem historyFS = getMROwner().doAs( |
| new PrivilegedExceptionAction<FileSystem>() { |
| public FileSystem run() throws IOException { |
| return new Path(historyLogDir).getFileSystem(conf); |
| } |
| }); |
| infoServer.setAttribute("fileSys", historyFS); |
| infoServer.setAttribute("jobHistoryHistory", jobHistory); |
| infoServer.setAttribute("jobHistoryGlobber", JobHistory.globString()); |
| infoServer.setAttribute("jobHistoryLeafGlobber", JobHistory.leafGlobString()); |
| |
| this.dnsToSwitchMapping = ReflectionUtils.newInstance( |
| conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class, |
| DNSToSwitchMapping.class), conf); |
| this.numTaskCacheLevels = conf.getInt(JT_TASKCACHE_LEVELS, |
| NetworkTopology.DEFAULT_HOST_LEVEL); |
| |
| //initializes the job status store |
| completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager); |
| } |
| |
| private static SimpleDateFormat getDateFormat() { |
| return new SimpleDateFormat("yyyyMMddHHmm"); |
| } |
| |
| private static String generateNewIdentifier() { |
| return getDateFormat().format(new Date()); |
| } |
| |
| static boolean validateIdentifier(String id) { |
| try { |
| // the jobtracker id should be 'date' parseable |
| getDateFormat().parse(id); |
| return true; |
| } catch (ParseException pe) {} |
| return false; |
| } |
| |
| static boolean validateJobNumber(String id) { |
| try { |
| // the job number should be integer parseable |
| Integer.parseInt(id); |
| return true; |
| } catch (IllegalArgumentException pe) {} |
| return false; |
| } |
| |
| /** |
| * Whether the JT has recovered upon restart |
| */ |
| public boolean hasRecovered() { |
| return hasRecovered; |
| } |
| |
| /** |
| * How long the jobtracker took to recover from restart. |
| */ |
| public long getRecoveryDuration() { |
| return recoveryDuration; |
| } |
| |
| /** |
| * Get JobTracker's FileSystem. This is the filesystem for mapreduce.system.dir. |
| */ |
| FileSystem getFileSystem() { |
| return fs; |
| } |
| |
| /** |
| * Get JobTracker's LocalFileSystem handle. This is used by jobs for |
| * localizing job files to the local disk. |
| */ |
| LocalFileSystem getLocalFileSystem() throws IOException { |
| return localFs; |
| } |
| |
| TaskScheduler getScheduler() { |
| return taskScheduler; |
| } |
| |
| public static Class<? extends JobTrackerInstrumentation> getInstrumentationClass(Configuration conf) { |
| return conf.getClass(JT_INSTRUMENTATION, |
| JobTrackerMetricsInst.class, JobTrackerInstrumentation.class); |
| } |
| |
| public static void setInstrumentationClass(Configuration conf, Class<? extends JobTrackerInstrumentation> t) { |
| conf.setClass(JT_INSTRUMENTATION, |
| t, JobTrackerInstrumentation.class); |
| } |
| |
| JobTrackerInstrumentation getInstrumentation() { |
| return myInstrumentation; |
| } |
| |
| public static InetSocketAddress getAddress(Configuration conf) { |
| String jobTrackerStr = |
| conf.get(JT_IPC_ADDRESS, "localhost:8012"); |
| return NetUtils.createSocketAddr(jobTrackerStr); |
| } |
| |
| void startExpireTrackersThread() { |
| this.expireTrackersThread = new Thread(this.expireTrackers, "expireTrackers"); |
| this.expireTrackersThread.start(); |
| } |
| |
| /** |
| * Run forever |
| */ |
| public void offerService() throws InterruptedException, IOException { |
| // Prepare for recovery. This is done irrespective of the status of restart |
| // flag. |
| while (true) { |
| try { |
| recoveryManager.updateRestartCount(); |
| break; |
| } catch (IOException ioe) { |
| LOG.warn("Failed to initialize recovery manager. ", ioe); |
| // wait for some time |
| Thread.sleep(FS_ACCESS_RETRY_PERIOD); |
| LOG.warn("Retrying..."); |
| } |
| } |
| |
| taskScheduler.start(); |
| |
| recoveryManager.recover(); |
| |
| // refresh the node list as the recovery manager might have added |
| // disallowed trackers |
| refreshHosts(); |
| |
| startExpireTrackersThread(); |
| |
| expireLaunchingTaskThread.start(); |
| |
| if (completedJobStatusStore.isActive()) { |
| completedJobsStoreThread = new Thread(completedJobStatusStore, |
| "completedjobsStore-housekeeper"); |
| completedJobsStoreThread.start(); |
| } |
| |
| // start the inter-tracker server once the jt is ready |
| this.interTrackerServer.start(); |
| |
| synchronized (this) { |
| state = State.RUNNING; |
| } |
| LOG.info("Starting RUNNING"); |
| |
| this.interTrackerServer.join(); |
| LOG.info("Stopped interTrackerServer"); |
| } |
| |
| void close() throws IOException { |
| if (this.infoServer != null) { |
| LOG.info("Stopping infoServer"); |
| try { |
| this.infoServer.stop(); |
| } catch (Exception ex) { |
| LOG.warn("Exception shutting down JobTracker", ex); |
| } |
| } |
| if (this.interTrackerServer != null) { |
| LOG.info("Stopping interTrackerServer"); |
| this.interTrackerServer.stop(); |
| } |
| |
| stopExpireTrackersThread(); |
| |
| if (taskScheduler != null) { |
| taskScheduler.terminate(); |
| } |
| if (this.expireLaunchingTaskThread != null && this.expireLaunchingTaskThread.isAlive()) { |
| LOG.info("Stopping expireLaunchingTasks"); |
| this.expireLaunchingTaskThread.interrupt(); |
| try { |
| this.expireLaunchingTaskThread.join(); |
| } catch (InterruptedException ex) { |
| ex.printStackTrace(); |
| } |
| } |
| if (this.completedJobsStoreThread != null && |
| this.completedJobsStoreThread.isAlive()) { |
| LOG.info("Stopping completedJobsStore thread"); |
| this.completedJobsStoreThread.interrupt(); |
| try { |
| this.completedJobsStoreThread.join(); |
| } catch (InterruptedException ex) { |
| ex.printStackTrace(); |
| } |
| } |
| |
| if (jobHistory != null) { |
| jobHistory.shutDown(); |
| } |
| DelegationTokenRenewal.close(); |
| LOG.info("stopped all jobtracker services"); |
| return; |
| } |
| |
| void stopExpireTrackersThread() { |
| if (this.expireTrackersThread != null && this.expireTrackersThread.isAlive()) { |
| LOG.info("Stopping expireTrackers"); |
| this.expireTrackersThread.interrupt(); |
| try { |
| this.expireTrackersThread.join(); |
| } catch (InterruptedException ex) { |
| ex.printStackTrace(); |
| } |
| } |
| } |
| |
| |
| /////////////////////////////////////////////////////// |
| // Maintain lookup tables; called by JobInProgress |
| // and TaskInProgress |
| /////////////////////////////////////////////////////// |
| void createTaskEntry(TaskAttemptID taskid, String taskTracker, TaskInProgress tip) { |
| LOG.info("Adding task (" + tip.getAttemptType(taskid) + ") " + |
| "'" + taskid + "' to tip " + |
| tip.getTIPId() + ", for tracker '" + taskTracker + "'"); |
| |
| // taskid --> tracker |
| taskidToTrackerMap.put(taskid, taskTracker); |
| |
| // tracker --> taskid |
| Set<TaskAttemptID> taskset = trackerToTaskMap.get(taskTracker); |
| if (taskset == null) { |
| taskset = new TreeSet<TaskAttemptID>(); |
| trackerToTaskMap.put(taskTracker, taskset); |
| } |
| taskset.add(taskid); |
| |
| // taskid --> TIP |
| taskidToTIPMap.put(taskid, tip); |
| |
| } |
| |
| void removeTaskEntry(TaskAttemptID taskid) { |
| // taskid --> tracker |
| String tracker = taskidToTrackerMap.remove(taskid); |
| |
| // tracker --> taskid |
| if (tracker != null) { |
| Set<TaskAttemptID> trackerSet = trackerToTaskMap.get(tracker); |
| if (trackerSet != null) { |
| trackerSet.remove(taskid); |
| } |
| } |
| |
| // taskid --> TIP |
| if (taskidToTIPMap.remove(taskid) != null) { |
| // log the task removal in case of success |
| LOG.info("Removing task '" + taskid + "'"); |
| } |
| } |
| |
| /** |
| * Mark a 'task' for removal later. |
| * This function assumes that the JobTracker is locked on entry. |
| * |
| * @param taskTracker the tasktracker at which the 'task' was running |
| * @param taskid completed (success/failure/killed) task |
| */ |
| void markCompletedTaskAttempt(String taskTracker, TaskAttemptID taskid) { |
| // tracker --> taskid |
| Set<TaskAttemptID> taskset = trackerToMarkedTasksMap.get(taskTracker); |
| if (taskset == null) { |
| taskset = new TreeSet<TaskAttemptID>(); |
| trackerToMarkedTasksMap.put(taskTracker, taskset); |
| } |
| taskset.add(taskid); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Marked '" + taskid + "' from '" + taskTracker + "'"); |
| } |
| } |
| |
| /** |
| * Mark all 'non-running' jobs of the job for pruning. |
| * This function assumes that the JobTracker is locked on entry. |
| * |
| * @param job the completed job |
| */ |
| void markCompletedJob(JobInProgress job) { |
| for (TaskInProgress tip : job.getTasks(TaskType.JOB_SETUP)) { |
| for (TaskStatus taskStatus : tip.getTaskStatuses()) { |
| if (taskStatus.getRunState() != TaskStatus.State.RUNNING && |
| taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING && |
| taskStatus.getRunState() != TaskStatus.State.UNASSIGNED) { |
| markCompletedTaskAttempt(taskStatus.getTaskTracker(), |
| taskStatus.getTaskID()); |
| } |
| } |
| } |
| for (TaskInProgress tip : job.getTasks(TaskType.MAP)) { |
| for (TaskStatus taskStatus : tip.getTaskStatuses()) { |
| if (taskStatus.getRunState() != TaskStatus.State.RUNNING && |
| taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING && |
| taskStatus.getRunState() != TaskStatus.State.FAILED_UNCLEAN && |
| taskStatus.getRunState() != TaskStatus.State.KILLED_UNCLEAN && |
| taskStatus.getRunState() != TaskStatus.State.UNASSIGNED) { |
| markCompletedTaskAttempt(taskStatus.getTaskTracker(), |
| taskStatus.getTaskID()); |
| } |
| } |
| } |
| for (TaskInProgress tip : job.getTasks(TaskType.REDUCE)) { |
| for (TaskStatus taskStatus : tip.getTaskStatuses()) { |
| if (taskStatus.getRunState() != TaskStatus.State.RUNNING && |
| taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING && |
| taskStatus.getRunState() != TaskStatus.State.FAILED_UNCLEAN && |
| taskStatus.getRunState() != TaskStatus.State.KILLED_UNCLEAN && |
| taskStatus.getRunState() != TaskStatus.State.UNASSIGNED) { |
| markCompletedTaskAttempt(taskStatus.getTaskTracker(), |
| taskStatus.getTaskID()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Remove all 'marked' tasks running on a given {@link TaskTracker} |
| * from the {@link JobTracker}'s data-structures. |
| * This function assumes that the JobTracker is locked on entry. |
| * |
| * @param taskTracker tasktracker whose 'non-running' tasks are to be purged |
| */ |
| void removeMarkedTasks(String taskTracker) { |
| // Purge all the 'marked' tasks which were running at taskTracker |
| Set<TaskAttemptID> markedTaskSet = |
| trackerToMarkedTasksMap.get(taskTracker); |
| if (markedTaskSet != null) { |
| for (TaskAttemptID taskid : markedTaskSet) { |
| removeTaskEntry(taskid); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Removed marked completed task '" + taskid + "' from '" + |
| taskTracker + "'"); |
| } |
| } |
| // Clear |
| trackerToMarkedTasksMap.remove(taskTracker); |
| } |
| } |
| |
| /** |
| * Call {@link #removeTaskEntry(String)} for each of the |
| * job's tasks. |
| * When the job is retiring we can afford to nuke all it's tasks |
| * |
| * @param job the job about to be 'retired' |
| */ |
| synchronized void removeJobTasks(JobInProgress job) { |
| // iterate over all the task types |
| for (TaskType type : TaskType.values()) { |
| // iterate over all the tips of the type under consideration |
| for (TaskInProgress tip : job.getTasks(type)) { |
| // iterate over all the task-ids in the tip under consideration |
| for (TaskAttemptID id : tip.getAllTaskAttemptIDs()) { |
| // remove the task-id entry from the jobtracker |
| removeTaskEntry(id); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Safe clean-up all data structures at the end of the |
| * job (success/failure/killed). |
| * Here we also ensure that for a given user we maintain |
| * information for only MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs |
| * on the JobTracker. |
| * |
| * @param job completed job. |
| */ |
| synchronized void finalizeJob(JobInProgress job) { |
| // Mark the 'non-running' tasks for pruning |
| markCompletedJob(job); |
| |
| JobEndNotifier.registerNotification(job.getJobConf(), job.getStatus()); |
| |
| // start the merge of log files |
| JobID id = job.getStatus().getJobID(); |
| |
| // mark the job as completed |
| try { |
| jobHistory.markCompleted(id); |
| } catch (IOException ioe) { |
| LOG.info("Failed to mark job " + id + " as completed!", ioe); |
| } |
| |
| final JobTrackerInstrumentation metrics = getInstrumentation(); |
| metrics.finalizeJob(conf, id); |
| |
| // mark the job for cleanup at all the trackers |
| addJobForCleanup(id); |
| |
| // add the blacklisted trackers to potentially faulty list |
| if (job.getStatus().getRunState() == JobStatus.SUCCEEDED) { |
| if (job.getNoOfBlackListedTrackers() > 0) { |
| for (String hostName : job.getBlackListedTrackers()) { |
| faultyTrackers.incrementFaults(hostName); |
| } |
| } |
| } |
| } |
| |
| /////////////////////////////////////////////////////// |
| // Accessors for objects that want info on jobs, tasks, |
| // trackers, etc. |
| /////////////////////////////////////////////////////// |
| public int getTotalSubmissions() { |
| return totalSubmissions; |
| } |
| public String getJobTrackerMachine() { |
| return localMachine; |
| } |
| |
| /** |
| * Get the unique identifier (ie. timestamp) of this job tracker start. |
| * @return a string with a unique identifier |
| */ |
| public String getTrackerIdentifier() { |
| return trackerIdentifier; |
| } |
| |
| public int getTrackerPort() { |
| return port; |
| } |
| public int getInfoPort() { |
| return infoPort; |
| } |
| public long getStartTime() { |
| return startTime; |
| } |
| public Vector<JobInProgress> runningJobs() { |
| Vector<JobInProgress> v = new Vector<JobInProgress>(); |
| for (Iterator it = jobs.values().iterator(); it.hasNext();) { |
| JobInProgress jip = (JobInProgress) it.next(); |
| JobStatus status = jip.getStatus(); |
| if (status.getRunState() == JobStatus.RUNNING) { |
| v.add(jip); |
| } |
| } |
| return v; |
| } |
| /** |
| * Version that is called from a timer thread, and therefore needs to be |
| * careful to synchronize. |
| */ |
| public synchronized List<JobInProgress> getRunningJobs() { |
| synchronized (jobs) { |
| return runningJobs(); |
| } |
| } |
| public Vector<JobInProgress> failedJobs() { |
| Vector<JobInProgress> v = new Vector<JobInProgress>(); |
| for (Iterator it = jobs.values().iterator(); it.hasNext();) { |
| JobInProgress jip = (JobInProgress) it.next(); |
| JobStatus status = jip.getStatus(); |
| if ((status.getRunState() == JobStatus.FAILED) |
| || (status.getRunState() == JobStatus.KILLED)) { |
| v.add(jip); |
| } |
| } |
| return v; |
| } |
| |
| public synchronized List<JobInProgress> getFailedJobs() { |
| synchronized (jobs) { |
| return failedJobs(); |
| } |
| } |
| |
| public Vector<JobInProgress> completedJobs() { |
| Vector<JobInProgress> v = new Vector<JobInProgress>(); |
| for (Iterator it = jobs.values().iterator(); it.hasNext();) { |
| JobInProgress jip = (JobInProgress) it.next(); |
| JobStatus status = jip.getStatus(); |
| if (status.getRunState() == JobStatus.SUCCEEDED) { |
| v.add(jip); |
| } |
| } |
| return v; |
| } |
| |
| public synchronized List<JobInProgress> getCompletedJobs() { |
| synchronized (jobs) { |
| return completedJobs(); |
| } |
| } |
| |
| /** |
| * Get all the task trackers in the cluster |
| * |
| * @return {@link Collection} of {@link TaskTrackerStatus} |
| */ |
| // lock to taskTrackers should hold JT lock first. |
| public synchronized Collection<TaskTrackerStatus> taskTrackers() { |
| Collection<TaskTrackerStatus> ttStatuses; |
| synchronized (taskTrackers) { |
| ttStatuses = |
| new ArrayList<TaskTrackerStatus>(taskTrackers.values().size()); |
| for (TaskTracker tt : taskTrackers.values()) { |
| ttStatuses.add(tt.getStatus()); |
| } |
| } |
| return ttStatuses; |
| } |
| |
| /** |
| * Get the active task tracker statuses in the cluster |
| * |
| * @return {@link Collection} of active {@link TaskTrackerStatus} |
| */ |
| // This method is synchronized to make sure that the locking order |
| // "taskTrackers lock followed by faultyTrackers.potentiallyFaultyTrackers |
| // lock" is under JobTracker lock to avoid deadlocks. |
| synchronized public Collection<TaskTrackerStatus> activeTaskTrackers() { |
| Collection<TaskTrackerStatus> activeTrackers = |
| new ArrayList<TaskTrackerStatus>(); |
| synchronized (taskTrackers) { |
| for ( TaskTracker tt : taskTrackers.values()) { |
| TaskTrackerStatus status = tt.getStatus(); |
| if (!faultyTrackers.isBlacklisted(status.getHost())) { |
| activeTrackers.add(status); |
| } |
| } |
| } |
| return activeTrackers; |
| } |
| |
| /** |
| * Get the active and blacklisted task tracker names in the cluster. The first |
| * element in the returned list contains the list of active tracker names. |
| * The second element in the returned list contains the list of blacklisted |
| * tracker names. |
| */ |
| // This method is synchronized to make sure that the locking order |
| // "taskTrackers lock followed by faultyTrackers.potentiallyFaultyTrackers |
| // lock" is under JobTracker lock to avoid deadlocks. |
| synchronized public List<List<String>> taskTrackerNames() { |
| List<String> activeTrackers = |
| new ArrayList<String>(); |
| List<String> blacklistedTrackers = |
| new ArrayList<String>(); |
| synchronized (taskTrackers) { |
| for (TaskTracker tt : taskTrackers.values()) { |
| TaskTrackerStatus status = tt.getStatus(); |
| if (!faultyTrackers.isBlacklisted(status.getHost())) { |
| activeTrackers.add(status.getTrackerName()); |
| } else { |
| blacklistedTrackers.add(status.getTrackerName()); |
| } |
| } |
| } |
| List<List<String>> result = new ArrayList<List<String>>(2); |
| result.add(activeTrackers); |
| result.add(blacklistedTrackers); |
| return result; |
| } |
| |
| /** |
| * Get the blacklisted task tracker statuses in the cluster |
| * |
| * @return {@link Collection} of blacklisted {@link TaskTrackerStatus} |
| */ |
| // This method is synchronized to make sure that the locking order |
| // "taskTrackers lock followed by faultyTrackers.potentiallyFaultyTrackers |
| // lock" is under JobTracker lock to avoid deadlocks. |
| synchronized public Collection<TaskTrackerStatus> blacklistedTaskTrackers() { |
| Collection<TaskTrackerStatus> blacklistedTrackers = |
| new ArrayList<TaskTrackerStatus>(); |
| synchronized (taskTrackers) { |
| for (TaskTracker tt : taskTrackers.values()) { |
| TaskTrackerStatus status = tt.getStatus(); |
| if (faultyTrackers.isBlacklisted(status.getHost())) { |
| blacklistedTrackers.add(status); |
| } |
| } |
| } |
| return blacklistedTrackers; |
| } |
| |
| synchronized int getFaultCount(String hostName) { |
| return faultyTrackers.getFaultCount(hostName); |
| } |
| |
| /** |
| * Get the number of blacklisted trackers across all the jobs |
| * |
| * @return |
| */ |
| int getBlacklistedTrackerCount() { |
| return faultyTrackers.numBlacklistedTrackers; |
| } |
| |
| /** |
| * Whether the tracker is blacklisted or not |
| * |
| * @param trackerID |
| * |
| * @return true if blacklisted, false otherwise |
| */ |
| synchronized public boolean isBlacklisted(String trackerID) { |
| TaskTrackerStatus status = getTaskTrackerStatus(trackerID); |
| if (status != null) { |
| return faultyTrackers.isBlacklisted(status.getHost()); |
| } |
| return false; |
| } |
| |
| // lock to taskTrackers should hold JT lock first. |
| synchronized public TaskTrackerStatus getTaskTrackerStatus(String trackerID) { |
| TaskTracker taskTracker; |
| synchronized (taskTrackers) { |
| taskTracker = taskTrackers.get(trackerID); |
| } |
| return (taskTracker == null) ? null : taskTracker.getStatus(); |
| } |
| |
| // lock to taskTrackers should hold JT lock first. |
| synchronized public TaskTracker getTaskTracker(String trackerID) { |
| synchronized (taskTrackers) { |
| return taskTrackers.get(trackerID); |
| } |
| } |
| |
| JobTrackerStatistics getStatistics() { |
| return statistics; |
| } |
| /** |
| * Adds a new node to the jobtracker. It involves adding it to the expiry |
| * thread and adding it for resolution |
| * |
| * Assumes JobTracker, taskTrackers and trackerExpiryQueue are locked on entry |
| * |
| * @param status Task Tracker's status |
| */ |
| void addNewTracker(TaskTracker taskTracker) { |
| TaskTrackerStatus status = taskTracker.getStatus(); |
| trackerExpiryQueue.add(status); |
| |
| // Register the tracker if its not registered |
| String hostname = status.getHost(); |
| if (getNode(status.getTrackerName()) == null) { |
| // Making the network location resolution inline .. |
| resolveAndAddToTopology(hostname); |
| } |
| |
| // add it to the set of tracker per host |
| Set<TaskTracker> trackers = hostnameToTaskTracker.get(hostname); |
| if (trackers == null) { |
| trackers = Collections.synchronizedSet(new HashSet<TaskTracker>()); |
| hostnameToTaskTracker.put(hostname, trackers); |
| } |
| statistics.taskTrackerAdded(status.getTrackerName()); |
| getInstrumentation().addTrackers(1); |
| LOG.info("Adding tracker " + status.getTrackerName() + " to host " |
| + hostname); |
| trackers.add(taskTracker); |
| } |
| |
| public Node resolveAndAddToTopology(String name) { |
| List <String> tmpList = new ArrayList<String>(1); |
| tmpList.add(name); |
| List <String> rNameList = dnsToSwitchMapping.resolve(tmpList); |
| String rName = rNameList.get(0); |
| String networkLoc = NodeBase.normalize(rName); |
| return addHostToNodeMapping(name, networkLoc); |
| } |
| |
| private Node addHostToNodeMapping(String host, String networkLoc) { |
| Node node = null; |
| synchronized (nodesAtMaxLevel) { |
| if ((node = clusterMap.getNode(networkLoc+"/"+host)) == null) { |
| node = new NodeBase(host, networkLoc); |
| clusterMap.add(node); |
| if (node.getLevel() < getNumTaskCacheLevels()) { |
| LOG.fatal("Got a host whose level is: " + node.getLevel() + "." |
| + " Should get at least a level of value: " |
| + getNumTaskCacheLevels()); |
| try { |
| stopTracker(); |
| } catch (IOException ie) { |
| LOG.warn("Exception encountered during shutdown: " |
| + StringUtils.stringifyException(ie)); |
| System.exit(-1); |
| } |
| } |
| hostnameToNodeMap.put(host, node); |
| // Make an entry for the node at the max level in the cache |
| nodesAtMaxLevel.add(getParentNode(node, getNumTaskCacheLevels() - 1)); |
| } |
| } |
| return node; |
| } |
| |
| /** |
| * Returns a collection of nodes at the max level |
| */ |
| public Collection<Node> getNodesAtMaxLevel() { |
| return nodesAtMaxLevel; |
| } |
| |
| public static Node getParentNode(Node node, int level) { |
| for (int i = 0; i < level; ++i) { |
| node = node.getParent(); |
| } |
| return node; |
| } |
| |
| /** |
| * Return the Node in the network topology that corresponds to the hostname |
| */ |
| public Node getNode(String name) { |
| return hostnameToNodeMap.get(name); |
| } |
| public int getNumTaskCacheLevels() { |
| return numTaskCacheLevels; |
| } |
| public int getNumResolvedTaskTrackers() { |
| return numResolved; |
| } |
| |
| public int getNumberOfUniqueHosts() { |
| return uniqueHostsMap.size(); |
| } |
| |
| public void addJobInProgressListener(JobInProgressListener listener) { |
| jobInProgressListeners.add(listener); |
| } |
| |
| public void removeJobInProgressListener(JobInProgressListener listener) { |
| jobInProgressListeners.remove(listener); |
| } |
| |
| // Update the listeners about the job |
| // Assuming JobTracker is locked on entry. |
| void updateJobInProgressListeners(JobChangeEvent event) { |
| for (JobInProgressListener listener : jobInProgressListeners) { |
| listener.jobUpdated(event); |
| } |
| } |
| |
| /** |
| * Return the {@link QueueManager} associated with the JobTracker. |
| */ |
| public QueueManager getQueueManager() { |
| return queueManager; |
| } |
| |
| //////////////////////////////////////////////////// |
| // InterTrackerProtocol |
| //////////////////////////////////////////////////// |
| |
| public String getBuildVersion() throws IOException{ |
| return VersionInfo.getBuildVersion(); |
| } |
| |
| /** |
| * The periodic heartbeat mechanism between the {@link TaskTracker} and |
| * the {@link JobTracker}. |
| * |
| * The {@link JobTracker} processes the status information sent by the |
| * {@link TaskTracker} and responds with instructions to start/stop |
| * tasks or jobs, and also 'reset' instructions during contingencies. |
| */ |
| public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, |
| boolean restarted, |
| boolean initialContact, |
| boolean acceptNewTasks, |
| short responseId) |
| throws IOException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Got heartbeat from: " + status.getTrackerName() + |
| " (restarted: " + restarted + |
| " initialContact: " + initialContact + |
| " acceptNewTasks: " + acceptNewTasks + ")" + |
| " with responseId: " + responseId); |
| } |
| |
| // Make sure heartbeat is from a tasktracker allowed by the jobtracker. |
| if (!acceptTaskTracker(status)) { |
| throw new DisallowedTaskTrackerException(status); |
| } |
| |
| // First check if the last heartbeat response got through |
| String trackerName = status.getTrackerName(); |
| long now = clock.getTime(); |
| boolean isBlacklisted = false; |
| if (restarted) { |
| faultyTrackers.markTrackerHealthy(status.getHost()); |
| } else { |
| isBlacklisted = |
| faultyTrackers.shouldAssignTasksToTracker(status.getHost(), now); |
| } |
| |
| HeartbeatResponse prevHeartbeatResponse = |
| trackerToHeartbeatResponseMap.get(trackerName); |
| |
| if (initialContact != true) { |
| // If this isn't the 'initial contact' from the tasktracker, |
| // there is something seriously wrong if the JobTracker has |
| // no record of the 'previous heartbeat'; if so, ask the |
| // tasktracker to re-initialize itself. |
| if (prevHeartbeatResponse == null) { |
| // This is the first heartbeat from the old tracker to the newly |
| // started JobTracker |
| |
| // Jobtracker might have restarted but no recovery is needed |
| // otherwise this code should not be reached |
| LOG.warn("Serious problem, cannot find record of 'previous' " + |
| "heartbeat for '" + trackerName + |
| "'; reinitializing the tasktracker"); |
| return new HeartbeatResponse(responseId, |
| new TaskTrackerAction[] {new ReinitTrackerAction()}); |
| |
| } else { |
| |
| // It is completely safe to not process a 'duplicate' heartbeat from a |
| // {@link TaskTracker} since it resends the heartbeat when rpcs are |
| // lost see {@link TaskTracker.transmitHeartbeat()}; |
| // acknowledge it by re-sending the previous response to let the |
| // {@link TaskTracker} go forward. |
| if (prevHeartbeatResponse.getResponseId() != responseId) { |
| LOG.info("Ignoring 'duplicate' heartbeat from '" + |
| trackerName + "'; resending the previous 'lost' response"); |
| return prevHeartbeatResponse; |
| } |
| } |
| } |
| |
| // Process this heartbeat |
| short newResponseId = (short)(responseId + 1); |
| status.setLastSeen(now); |
| if (!processHeartbeat(status, initialContact)) { |
| if (prevHeartbeatResponse != null) { |
| trackerToHeartbeatResponseMap.remove(trackerName); |
| } |
| return new HeartbeatResponse(newResponseId, |
| new TaskTrackerAction[] {new ReinitTrackerAction()}); |
| } |
| |
| // Initialize the response to be sent for the heartbeat |
| HeartbeatResponse response = new HeartbeatResponse(newResponseId, null); |
| List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>(); |
| isBlacklisted = faultyTrackers.isBlacklisted(status.getHost()); |
| // Check for new tasks to be executed on the tasktracker |
| if (acceptNewTasks && !isBlacklisted) { |
| TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName); |
| if (taskTrackerStatus == null) { |
| LOG.warn("Unknown task tracker polling; ignoring: " + trackerName); |
| } else { |
| List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus); |
| if (tasks == null ) { |
| tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName)); |
| } |
| if (tasks != null) { |
| for (Task task : tasks) { |
| expireLaunchingTasks.addNewTask(task.getTaskID()); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID()); |
| } |
| actions.add(new LaunchTaskAction(task)); |
| } |
| } |
| } |
| } |
| |
| // Check for tasks to be killed |
| List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName); |
| if (killTasksList != null) { |
| actions.addAll(killTasksList); |
| } |
| |
| // Check for jobs to be killed/cleanedup |
| List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName); |
| if (killJobsList != null) { |
| actions.addAll(killJobsList); |
| } |
| |
| // Check for tasks whose outputs can be saved |
| actions.addAll(getTasksToSave(status)); |
| |
| // calculate next heartbeat interval and put in heartbeat response |
| int nextInterval = getNextHeartbeatInterval(); |
| response.setHeartbeatInterval(nextInterval); |
| response.setActions( |
| actions.toArray(new TaskTrackerAction[actions.size()])); |
| |
| // Update the trackerToHeartbeatResponseMap |
| trackerToHeartbeatResponseMap.put(trackerName, response); |
| |
| // Done processing the hearbeat, now remove 'marked' tasks |
| removeMarkedTasks(trackerName); |
| |
| return response; |
| } |
| |
| /** |
| * Calculates next heartbeat interval using cluster size. |
| * Heartbeat interval is incremented by 1 second for every 100 nodes by default. |
| * @return next heartbeat interval. |
| */ |
| public int getNextHeartbeatInterval() { |
| // get the no of task trackers |
| int clusterSize = getClusterStatus().getTaskTrackers(); |
| int heartbeatInterval = Math.max( |
| (int)(1000 * HEARTBEATS_SCALING_FACTOR * |
| Math.ceil((double)clusterSize / |
| NUM_HEARTBEATS_IN_SECOND)), |
| HEARTBEAT_INTERVAL_MIN) ; |
| return heartbeatInterval; |
| } |
| |
| /** |
| * Return if the specified tasktracker is in the hosts list, |
| * if one was configured. If none was configured, then this |
| * returns true. |
| */ |
| private boolean inHostsList(TaskTrackerStatus status) { |
| Set<String> hostsList = hostsReader.getHosts(); |
| return (hostsList.isEmpty() || hostsList.contains(status.getHost())); |
| } |
| |
| /** |
| * Return if the specified tasktracker is in the exclude list. |
| */ |
| private boolean inExcludedHostsList(TaskTrackerStatus status) { |
| Set<String> excludeList = hostsReader.getExcludedHosts(); |
| return excludeList.contains(status.getHost()); |
| } |
| |
| /** |
| * Returns true if the tasktracker is in the hosts list and |
| * not in the exclude list. |
| */ |
| private boolean acceptTaskTracker(TaskTrackerStatus status) { |
| return (inHostsList(status) && !inExcludedHostsList(status)); |
| } |
| |
| /** |
| * Update the last recorded status for the given task tracker. |
| * It assumes that the taskTrackers are locked on entry. |
| * @param trackerName The name of the tracker |
| * @param status The new status for the task tracker |
| * @return Was an old status found? |
| */ |
| boolean updateTaskTrackerStatus(String trackerName, |
| TaskTrackerStatus status) { |
| TaskTracker tt = getTaskTracker(trackerName); |
| TaskTrackerStatus oldStatus = (tt == null) ? null : tt.getStatus(); |
| if (oldStatus != null) { |
| totalMaps -= oldStatus.countMapTasks(); |
| totalReduces -= oldStatus.countReduceTasks(); |
| occupiedMapSlots -= oldStatus.countOccupiedMapSlots(); |
| occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots(); |
| getInstrumentation().decRunningMaps(oldStatus.countMapTasks()); |
| getInstrumentation().decRunningReduces(oldStatus.countReduceTasks()); |
| getInstrumentation().decOccupiedMapSlots(oldStatus.countOccupiedMapSlots()); |
| getInstrumentation().decOccupiedReduceSlots(oldStatus.countOccupiedReduceSlots()); |
| if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) { |
| int mapSlots = oldStatus.getMaxMapSlots(); |
| totalMapTaskCapacity -= mapSlots; |
| int reduceSlots = oldStatus.getMaxReduceSlots(); |
| totalReduceTaskCapacity -= reduceSlots; |
| } |
| if (status == null) { |
| taskTrackers.remove(trackerName); |
| Integer numTaskTrackersInHost = |
| uniqueHostsMap.get(oldStatus.getHost()); |
| if (numTaskTrackersInHost != null) { |
| numTaskTrackersInHost --; |
| if (numTaskTrackersInHost > 0) { |
| uniqueHostsMap.put(oldStatus.getHost(), numTaskTrackersInHost); |
| } |
| else { |
| uniqueHostsMap.remove(oldStatus.getHost()); |
| } |
| } |
| } |
| } |
| if (status != null) { |
| totalMaps += status.countMapTasks(); |
| totalReduces += status.countReduceTasks(); |
| occupiedMapSlots += status.countOccupiedMapSlots(); |
| occupiedReduceSlots += status.countOccupiedReduceSlots(); |
| getInstrumentation().addRunningMaps(status.countMapTasks()); |
| getInstrumentation().addRunningReduces(status.countReduceTasks()); |
| getInstrumentation().addOccupiedMapSlots(status.countOccupiedMapSlots()); |
| getInstrumentation().addOccupiedReduceSlots(status.countOccupiedReduceSlots()); |
| if (!faultyTrackers.isBlacklisted(status.getHost())) { |
| int mapSlots = status.getMaxMapSlots(); |
| totalMapTaskCapacity += mapSlots; |
| int reduceSlots = status.getMaxReduceSlots(); |
| totalReduceTaskCapacity += reduceSlots; |
| } |
| boolean alreadyPresent = false; |
| TaskTracker taskTracker = taskTrackers.get(trackerName); |
| if (taskTracker != null) { |
| alreadyPresent = true; |
| } else { |
| taskTracker = new TaskTracker(trackerName); |
| } |
| |
| taskTracker.setStatus(status); |
| taskTrackers.put(trackerName, taskTracker); |
| |
| if (LOG.isDebugEnabled()) { |
| int runningMaps = 0, runningReduces = 0; |
| int commitPendingMaps = 0, commitPendingReduces = 0; |
| int unassignedMaps = 0, unassignedReduces = 0; |
| int miscMaps = 0, miscReduces = 0; |
| List<TaskStatus> taskReports = status.getTaskReports(); |
| for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) { |
| TaskStatus ts = it.next(); |
| boolean isMap = ts.getIsMap(); |
| TaskStatus.State state = ts.getRunState(); |
| if (state == TaskStatus.State.RUNNING) { |
| if (isMap) { ++runningMaps; } |
| else { ++runningReduces; } |
| } else if (state == TaskStatus.State.UNASSIGNED) { |
| if (isMap) { ++unassignedMaps; } |
| else { ++unassignedReduces; } |
| } else if (state == TaskStatus.State.COMMIT_PENDING) { |
| if (isMap) { ++commitPendingMaps; } |
| else { ++commitPendingReduces; } |
| } else { |
| if (isMap) { ++miscMaps; } |
| else { ++miscReduces; } |
| } |
| } |
| LOG.debug(trackerName + ": Status -" + |
| " running(m) = " + runningMaps + |
| " unassigned(m) = " + unassignedMaps + |
| " commit_pending(m) = " + commitPendingMaps + |
| " misc(m) = " + miscMaps + |
| " running(r) = " + runningReduces + |
| " unassigned(r) = " + unassignedReduces + |
| " commit_pending(r) = " + commitPendingReduces + |
| " misc(r) = " + miscReduces); |
| } |
| |
| if (!alreadyPresent) { |
| Integer numTaskTrackersInHost = |
| uniqueHostsMap.get(status.getHost()); |
| if (numTaskTrackersInHost == null) { |
| numTaskTrackersInHost = 0; |
| } |
| numTaskTrackersInHost ++; |
| uniqueHostsMap.put(status.getHost(), numTaskTrackersInHost); |
| } |
| } |
| getInstrumentation().setMapSlots(totalMapTaskCapacity); |
| getInstrumentation().setReduceSlots(totalReduceTaskCapacity); |
| return oldStatus != null; |
| } |
| |
| // Increment the number of reserved slots in the cluster. |
| // This method assumes the caller has JobTracker lock. |
| void incrementReservations(TaskType type, int reservedSlots) { |
| if (type.equals(TaskType.MAP)) { |
| reservedMapSlots += reservedSlots; |
| } else if (type.equals(TaskType.REDUCE)) { |
| reservedReduceSlots += reservedSlots; |
| } |
| } |
| |
| // Decrement the number of reserved slots in the cluster. |
| // This method assumes the caller has JobTracker lock. |
| void decrementReservations(TaskType type, int reservedSlots) { |
| if (type.equals(TaskType.MAP)) { |
| reservedMapSlots -= reservedSlots; |
| } else if (type.equals(TaskType.REDUCE)) { |
| reservedReduceSlots -= reservedSlots; |
| } |
| } |
| |
| private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus) { |
| TaskTrackerHealthStatus status = trackerStatus.getHealthStatus(); |
| synchronized (faultyTrackers) { |
| faultyTrackers.setNodeHealthStatus(trackerStatus.getHost(), |
| status.isNodeHealthy(), status.getHealthReport()); |
| } |
| } |
| |
| /** |
| * Process incoming heartbeat messages from the task trackers. |
| */ |
| synchronized boolean processHeartbeat( |
| TaskTrackerStatus trackerStatus, |
| boolean initialContact) { |
| |
| getInstrumentation().heartbeat(); |
| |
| String trackerName = trackerStatus.getTrackerName(); |
| |
| synchronized (taskTrackers) { |
| synchronized (trackerExpiryQueue) { |
| boolean seenBefore = updateTaskTrackerStatus(trackerName, |
| trackerStatus); |
| TaskTracker taskTracker = getTaskTracker(trackerName); |
| if (initialContact) { |
| // If it's first contact, then clear out |
| // any state hanging around |
| if (seenBefore) { |
| lostTaskTracker(taskTracker); |
| } |
| } else { |
| // If not first contact, there should be some record of the tracker |
| if (!seenBefore) { |
| LOG.warn("Status from unknown Tracker : " + trackerName); |
| updateTaskTrackerStatus(trackerName, null); |
| return false; |
| } |
| } |
| |
| if (initialContact) { |
| // if this is lost tracker that came back now, and if it blacklisted |
| // increment the count of blacklisted trackers in the cluster |
| if (isBlacklisted(trackerName)) { |
| faultyTrackers.incrBlackListedTrackers(1); |
| } |
| addNewTracker(taskTracker); |
| } |
| } |
| } |
| |
| updateTaskStatuses(trackerStatus); |
| updateNodeHealthStatus(trackerStatus); |
| |
| return true; |
| } |
| |
| /** |
| * A tracker wants to know if any of its Tasks have been |
| * closed (because the job completed, whether successfully or not) |
| */ |
| synchronized List<TaskTrackerAction> getTasksToKill(String taskTracker) { |
| |
| Set<TaskAttemptID> taskIds = trackerToTaskMap.get(taskTracker); |
| List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>(); |
| if (taskIds != null) { |
| for (TaskAttemptID killTaskId : taskIds) { |
| TaskInProgress tip = taskidToTIPMap.get(killTaskId); |
| if (tip == null) { |
| continue; |
| } |
| if (tip.shouldClose(killTaskId)) { |
| // |
| // This is how the JobTracker ends a task at the TaskTracker. |
| // It may be successfully completed, or may be killed in |
| // mid-execution. |
| // |
| if (!tip.getJob().isComplete()) { |
| killList.add(new KillTaskAction(killTaskId)); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(taskTracker + " -> KillTaskAction: " + killTaskId); |
| } |
| } |
| } |
| } |
| } |
| |
| // add the stray attempts for uninited jobs |
| synchronized (trackerToTasksToCleanup) { |
| Set<TaskAttemptID> set = trackerToTasksToCleanup.remove(taskTracker); |
| if (set != null) { |
| for (TaskAttemptID id : set) { |
| killList.add(new KillTaskAction(id)); |
| } |
| } |
| } |
| return killList; |
| } |
| |
| /** |
| * Add a job to cleanup for the tracker. |
| */ |
| private void addJobForCleanup(JobID id) { |
| for (String taskTracker : taskTrackers.keySet()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Marking job " + id |
| + " for cleanup by tracker " + taskTracker); |
| } |
| synchronized (trackerToJobsToCleanup) { |
| Set<JobID> jobsToKill = trackerToJobsToCleanup.get(taskTracker); |
| if (jobsToKill == null) { |
| jobsToKill = new HashSet<JobID>(); |
| trackerToJobsToCleanup.put(taskTracker, jobsToKill); |
| } |
| jobsToKill.add(id); |
| } |
| } |
| } |
| |
| /** |
| * A tracker wants to know if any job needs cleanup because the job completed. |
| */ |
| private List<TaskTrackerAction> getJobsForCleanup(String taskTracker) { |
| Set<JobID> jobs = null; |
| synchronized (trackerToJobsToCleanup) { |
| jobs = trackerToJobsToCleanup.remove(taskTracker); |
| } |
| if (jobs != null) { |
| // prepare the actions list |
| List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>(); |
| for (JobID killJobId : jobs) { |
| killList.add(new KillJobAction(killJobId)); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(taskTracker + " -> KillJobAction: " + killJobId); |
| } |
| } |
| |
| return killList; |
| } |
| return null; |
| } |
| |
| /** |
| * A tracker wants to know if any of its Tasks can be committed |
| */ |
| synchronized List<TaskTrackerAction> getTasksToSave( |
| TaskTrackerStatus tts) { |
| List<TaskStatus> taskStatuses = tts.getTaskReports(); |
| if (taskStatuses != null) { |
| List<TaskTrackerAction> saveList = new ArrayList<TaskTrackerAction>(); |
| for (TaskStatus taskStatus : taskStatuses) { |
| if (taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) { |
| TaskAttemptID taskId = taskStatus.getTaskID(); |
| TaskInProgress tip = taskidToTIPMap.get(taskId); |
| if (tip == null) { |
| continue; |
| } |
| if (tip.shouldCommit(taskId)) { |
| saveList.add(new CommitTaskAction(taskId)); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(tts.getTrackerName() + |
| " -> CommitTaskAction: " + taskId); |
| } |
| } |
| } |
| } |
| return saveList; |
| } |
| return null; |
| } |
| |
| // returns cleanup tasks first, then setup tasks. |
| synchronized List<Task> getSetupAndCleanupTasks( |
| TaskTrackerStatus taskTracker) throws IOException { |
| int maxMapTasks = taskTracker.getMaxMapSlots(); |
| int maxReduceTasks = taskTracker.getMaxReduceSlots(); |
| int numMaps = taskTracker.countOccupiedMapSlots(); |
| int numReduces = taskTracker.countOccupiedReduceSlots(); |
| int numTaskTrackers = getClusterStatus().getTaskTrackers(); |
| int numUniqueHosts = getNumberOfUniqueHosts(); |
| |
| Task t = null; |
| synchronized (jobs) { |
| if (numMaps < maxMapTasks) { |
| for (Iterator<JobInProgress> it = jobs.values().iterator(); |
| it.hasNext();) { |
| JobInProgress job = it.next(); |
| t = job.obtainJobCleanupTask(taskTracker, numTaskTrackers, |
| numUniqueHosts, true); |
| if (t != null) { |
| return Collections.singletonList(t); |
| } |
| } |
| for (Iterator<JobInProgress> it = jobs.values().iterator(); |
| it.hasNext();) { |
| JobInProgress job = it.next(); |
| t = job.obtainTaskCleanupTask(taskTracker, true); |
| if (t != null) { |
| return Collections.singletonList(t); |
| } |
| } |
| for (Iterator<JobInProgress> it = jobs.values().iterator(); |
| it.hasNext();) { |
| JobInProgress job = it.next(); |
| t = job.obtainJobSetupTask(taskTracker, numTaskTrackers, |
| numUniqueHosts, true); |
| if (t != null) { |
| return Collections.singletonList(t); |
| } |
| } |
| } |
| if (numReduces < maxReduceTasks) { |
| for (Iterator<JobInProgress> it = jobs.values().iterator(); |
| it.hasNext();) { |
| JobInProgress job = it.next(); |
| t = job.obtainJobCleanupTask(taskTracker, numTaskTrackers, |
| numUniqueHosts, false); |
| if (t != null) { |
| return Collections.singletonList(t); |
| } |
| } |
| for (Iterator<JobInProgress> it = jobs.values().iterator(); |
| it.hasNext();) { |
| JobInProgress job = it.next(); |
| t = job.obtainTaskCleanupTask(taskTracker, false); |
| if (t != null) { |
| return Collections.singletonList(t); |
| } |
| } |
| for (Iterator<JobInProgress> it = jobs.values().iterator(); |
| it.hasNext();) { |
| JobInProgress job = it.next(); |
| t = job.obtainJobSetupTask(taskTracker, numTaskTrackers, |
| numUniqueHosts, false); |
| if (t != null) { |
| return Collections.singletonList(t); |
| } |
| } |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Grab the local fs name |
| */ |
| public synchronized String getFilesystemName() throws IOException { |
| if (fs == null) { |
| throw new IllegalStateException("FileSystem object not available yet"); |
| } |
| return fs.getUri().toString(); |
| } |
| |
| /** |
| * Returns a handle to the JobTracker's Configuration |
| */ |
| public JobConf getConf() { |
| return conf; |
| } |
| |
| public void reportTaskTrackerError(String taskTracker, |
| String errorClass, |
| String errorMessage) throws IOException { |
| LOG.warn("Report from " + taskTracker + ": " + errorMessage); |
| } |
| |
| /** |
| * Remove the job_ from jobids to get the unique string. |
| */ |
| static String getJobUniqueString(String jobid) { |
| return jobid.substring(4); |
| } |
| |
| //////////////////////////////////////////////////// |
| // JobSubmissionProtocol |
| //////////////////////////////////////////////////// |
| |
| /** |
| * Allocates a new JobId string. |
| * @deprecated use {@link #getNewJobID()} instead |
| */ |
| @Deprecated |
| public JobID getNewJobId() throws IOException { |
| return JobID.downgrade(getNewJobID()); |
| } |
| |
| /** |
| * Allocates a new JobId string. |
| */ |
| public org.apache.hadoop.mapreduce.JobID getNewJobID() throws IOException { |
| return new org.apache.hadoop.mapreduce.JobID |
| (getTrackerIdentifier(), nextJobId.getAndIncrement()); |
| } |
| |
| /** |
| * JobTracker.submitJob() kicks off a new job. |
| * |
| * Create a 'JobInProgress' object, which contains both JobProfile |
| * and JobStatus. Those two sub-objects are sometimes shipped outside |
| * of the JobTracker. But JobInProgress adds info that's useful for |
| * the JobTracker alone. |
| */ |
| public synchronized |
| org.apache.hadoop.mapreduce.JobStatus |
| submitJob(org.apache.hadoop.mapreduce.JobID jobId, String jobSubmitDir, |
| Credentials ts |
| ) throws IOException, InterruptedException { |
| return submitJob(JobID.downgrade(jobId), jobSubmitDir, ts); |
| } |
| |
| /** |
| * JobTracker.submitJob() kicks off a new job. |
| * |
| * Create a 'JobInProgress' object, which contains both JobProfile |
| * and JobStatus. Those two sub-objects are sometimes shipped outside |
| * of the JobTracker. But JobInProgress adds info that's useful for |
| * the JobTracker alone. |
| * @deprecated Use |
| * {@link #submitJob(org.apache.hadoop.mapreduce.JobID, String, Credentials)} |
| * instead |
| */ |
| @Deprecated |
| public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) |
| throws IOException, InterruptedException { |
| |
| return submitJob(jobId, 0, UserGroupInformation.getCurrentUser(), |
| jobSubmitDir, false, ts); |
| } |
| |
| /** |
| * Submits either a new job or a job from an earlier run. |
| */ |
| private JobStatus submitJob(org.apache.hadoop.mapreduce.JobID jobID, |
| int restartCount, UserGroupInformation ugi, |
| String jobSubmitDir, boolean recovered, Credentials ts |
| ) |
| throws IOException, InterruptedException { |
| |
| JobID jobId = null; |
| |
| JobInfo jobInfo; |
| |
| synchronized (this) { |
| jobId = JobID.downgrade(jobID); |
| if (jobs.containsKey(jobId)) { |
| // job already running, don't start twice |
| return jobs.get(jobId).getStatus(); |
| } |
| |
| // the conversion from String to Text for the UGI's username will |
| // not be required when we have the UGI to return us the username as |
| // Text. |
| jobInfo = |
| new JobInfo(jobId, new Text(ugi.getShortUserName()), new Path( |
| jobSubmitDir)); |
| } |
| |
| // Create the JobInProgress, temporarily unlock the JobTracker since |
| // we are about to copy job.xml from HDFSJobInProgress |
| JobInProgress job = |
| new JobInProgress(this, this.conf, restartCount, jobInfo, ts); |
| |
| synchronized (this) { |
| try { |
| checkQueueValidity(job); |
| } catch(IOException ioe) { |
| LOG.error("Queue given for job " + job.getJobID() + " is not valid: " + ioe); |
| throw ioe; |
| } |
| try { |
| aclsManager.checkAccess(job, ugi, Operation.SUBMIT_JOB); |
| } catch (AccessControlException ace) { |
| LOG.warn("Access denied for user " + job.getJobConf().getUser() |
| + ". Ignoring job " + jobId, ace); |
| throw ace; |
| } |
| |
| // Check the job if it cannot run in the cluster because of invalid memory |
| // requirements. |
| try { |
| checkMemoryRequirements(job); |
| } catch (IOException ioe) { |
| throw ioe; |
| } |
| |
| if (!recovered) { |
| // Store the information in a file so that the job can be recovered |
| // later (if at all) |
| Path jobDir = getSystemDirectoryForJob(jobId); |
| FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION)); |
| FSDataOutputStream out = fs.create(getSystemFileForJob(jobId)); |
| jobInfo.write(out); |
| out.close(); |
| } |
| return addJob(jobId, job); |
| } |
| } |
| |
| /** |
| * Adds a job to the jobtracker. Make sure that the checks are inplace before |
| * adding a job. This is the core job submission logic |
| * @param jobId The id for the job submitted which needs to be added |
| */ |
| synchronized JobStatus addJob(JobID jobId, JobInProgress job) { |
| totalSubmissions++; |
| |
| synchronized (jobs) { |
| synchronized (taskScheduler) { |
| jobs.put(job.getProfile().getJobID(), job); |
| for (JobInProgressListener listener : jobInProgressListeners) { |
| try { |
| listener.jobAdded(job); |
| } catch (IOException ioe) { |
| LOG.warn("Failed to add and so skipping the job : " |
| + job.getJobID() + ". Exception : " + ioe); |
| } |
| } |
| } |
| } |
| myInstrumentation.submitJob(job.getJobConf(), jobId); |
| LOG.info("Job " + jobId + " added successfully for user '" |
| + job.getJobConf().getUser() + "' to queue '" |
| + job.getJobConf().getQueueName() + "'"); |
| |
| return job.getStatus(); |
| } |
| |
| /** |
| * For a JobInProgress that is being submitted, check whether |
| * queue that the job has been submitted to exists and is RUNNING. |
| * @param job The JobInProgress object being submitted. |
| * @throws IOException |
| */ |
| public void checkQueueValidity(JobInProgress job) throws IOException { |
| String queue = job.getProfile().getQueueName(); |
| if (!(queueManager.getLeafQueueNames().contains(queue))) { |
| throw new IOException("Queue \"" + queue + "\" does not exist"); |
| } |
| |
| // check if queue is RUNNING |
| if (!queueManager.isRunning(queue)) { |
| throw new IOException("Queue \"" + queue + "\" is not running"); |
| } |
| } |
| |
| /** |
| * Are ACLs for authorization checks enabled on the MR cluster ? |
| * |
| * @return true if ACLs(job acls and queue acls) are enabled |
| */ |
| boolean areACLsEnabled() { |
| return conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false); |
| } |
| |
| /**@deprecated use {@link #getClusterStatus(boolean)}*/ |
| @Deprecated |
| public synchronized ClusterStatus getClusterStatus() { |
| return getClusterStatus(false); |
| } |
| |
| public synchronized ClusterStatus getClusterStatus(boolean detailed) { |
| synchronized (taskTrackers) { |
| if (detailed) { |
| List<List<String>> trackerNames = taskTrackerNames(); |
| Collection<BlackListInfo> blackListedTrackers = getBlackListedTrackers(); |
| return new ClusterStatus(trackerNames.get(0), |
| blackListedTrackers, |
| tasktrackerExpiryInterval, |
| totalMaps, |
| totalReduces, |
| totalMapTaskCapacity, |
| totalReduceTaskCapacity, |
| state, getExcludedNodes().size() |
| ); |
| } else { |
| return new ClusterStatus(taskTrackers.size() - |
| getBlacklistedTrackerCount(), |
| getBlacklistedTrackerCount(), |
| tasktrackerExpiryInterval, |
| totalMaps, |
| totalReduces, |
| totalMapTaskCapacity, |
| totalReduceTaskCapacity, |
| state, getExcludedNodes().size()); |
| } |
| } |
| } |
| |
| public synchronized ClusterMetrics getClusterMetrics() { |
| return new ClusterMetrics(totalMaps, |
| totalReduces, occupiedMapSlots, occupiedReduceSlots, |
| reservedMapSlots, reservedReduceSlots, |
| totalMapTaskCapacity, totalReduceTaskCapacity, |
| totalSubmissions, |
| taskTrackers.size() - getBlacklistedTrackerCount(), |
| getBlacklistedTrackerCount(), getExcludedNodes().size()) ; |
| } |
| |
| public org.apache.hadoop.mapreduce.server.jobtracker.State |
| getJobTrackerState() { |
| return org.apache.hadoop.mapreduce.server.jobtracker. |
| State.valueOf(state.name()); |
| } |
| |
| public long getTaskTrackerExpiryInterval() { |
| return tasktrackerExpiryInterval; |
| } |
| |
| /** |
| * Get all active trackers in cluster. |
| * @return array of TaskTrackerInfo |
| */ |
| public TaskTrackerInfo[] getActiveTrackers() |
| throws IOException, InterruptedException { |
| List<String> activeTrackers = taskTrackerNames().get(0); |
| TaskTrackerInfo[] info = new TaskTrackerInfo[activeTrackers.size()]; |
| for (int i = 0; i < activeTrackers.size(); i++) { |
| info[i] = new TaskTrackerInfo(activeTrackers.get(i)); |
| } |
| return info; |
| } |
| |
| /** |
| * Get all blacklisted trackers in cluster. |
| * @return array of TaskTrackerInfo |
| */ |
| public TaskTrackerInfo[] getBlacklistedTrackers() |
| throws IOException, InterruptedException { |
| Collection<BlackListInfo> blackListed = getBlackListedTrackers(); |
| TaskTrackerInfo[] info = new TaskTrackerInfo[blackListed.size()]; |
| int i = 0; |
| for (BlackListInfo binfo : blackListed) { |
| info[i++] = new TaskTrackerInfo(binfo.getTrackerName(), |
| binfo.getReasonForBlackListing(), binfo.getBlackListReport()); |
| } |
| return info; |
| } |
| |
| /** |
| * @see ClientProtocol#killJob(org.apache.hadoop.mapreduce.JobID) |
| */ |
| @Override |
| public synchronized void killJob(org.apache.hadoop.mapreduce.JobID jobid) |
| throws IOException { |
| killJob(JobID.downgrade(jobid)); |
| } |
| |
| /** |
| * @deprecated Use {@link #killJob(org.apache.hadoop.mapreduce.JobID)} instead |
| */ |
| @Deprecated |
| public synchronized void killJob(JobID jobid) throws IOException { |
| if (null == jobid) { |
| LOG.info("Null jobid object sent to JobTracker.killJob()"); |
| return; |
| } |
| |
| JobInProgress job = jobs.get(jobid); |
| |
| if (null == job) { |
| LOG.info("killJob(): JobId " + jobid.toString() + " is not a valid job"); |
| return; |
| } |
| |
| // check both queue-level and job-level access |
| aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(), |
| Operation.KILL_JOB); |
| |
| killJob(job); |
| } |
| |
| private synchronized void killJob(JobInProgress job) { |
| LOG.info("Killing job " + job.getJobID()); |
| JobStatus prevStatus = (JobStatus)job.getStatus().clone(); |
| job.kill(); |
| |
| // Inform the listeners if the job is killed |
| // Note : |
| // If the job is killed in the PREP state then the listeners will be |
| // invoked |
| // If the job is killed in the RUNNING state then cleanup tasks will be |
| // launched and the updateTaskStatuses() will take care of it |
| JobStatus newStatus = (JobStatus)job.getStatus().clone(); |
| if (prevStatus.getRunState() != newStatus.getRunState() |
| && newStatus.getRunState() == JobStatus.KILLED) { |
| JobStatusChangeEvent event = |
| new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, |
| newStatus); |
| updateJobInProgressListeners(event); |
| } |
| } |
| |
| /** |
| * Initialize a job and inform the listeners about a state change, if any. |
| * Other components in the framework should use this api to initialize a job. |
| */ |
| public void initJob(JobInProgress job) { |
| if (null == job) { |
| LOG.info("Init on null job is not valid"); |
| return; |
| } |
| |
| try { |
| JobStatus prevStatus = (JobStatus)job.getStatus().clone(); |
| LOG.info("Initializing " + job.getJobID()); |
| job.initTasks(); |
| // Here the job *should* be in the PREP state. |
| // From here there are 3 ways : |
| // - job requires setup : the job remains in PREP state and |
| // setup is launched to move the job in RUNNING state |
| // - job is complete (no setup required and no tasks) : complete |
| // the job and move it to SUCCEEDED |
| // - job has tasks but doesnt require setup : make the job RUNNING. |
| if (job.isJobEmpty()) { // is the job empty? |
| completeEmptyJob(job); // complete it |
| } else if (!job.isSetupCleanupRequired()) { // setup/cleanup not required |
| job.completeSetup(); // complete setup and make job running |
| } |
| // Inform the listeners if the job state has changed |
| // Note : |
| // If job does not require setup, job state will be RUNNING |
| // If job is configured with 0 maps, 0 reduces and no setup-cleanup then |
| // the job state will be SUCCEEDED |
| // otherwise, job state is PREP. |
| JobStatus newStatus = (JobStatus)job.getStatus().clone(); |
| if (prevStatus.getRunState() != newStatus.getRunState()) { |
| JobStatusChangeEvent event = |
| new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, |
| newStatus); |
| synchronized (JobTracker.this) { |
| updateJobInProgressListeners(event); |
| } |
| } |
| } catch (KillInterruptedException kie) { |
| // If job was killed during initialization, job state will be KILLED |
| LOG.error("Job initialization interrupted :\n" + |
| StringUtils.stringifyException(kie)); |
| killJob(job); |
| } catch (Throwable t) { |
| // If the job initialization is failed, job state will be FAILED |
| LOG.error("Job initialization failed:\n" + |
| StringUtils.stringifyException(t)); |
| failJob(job); |
| } |
| } |
| |
| // This simply marks the job as completed. Note that the caller is responsible |
| // for raising events. |
| private synchronized void completeEmptyJob(JobInProgress job) { |
| job.completeEmptyJob(); |
| } |
| |
| /** |
| * Fail a job and inform the listeners. Other components in the framework |
| * should use this to fail a job. |
| */ |
| public synchronized void failJob(JobInProgress job) { |
| if (null == job) { |
| LOG.info("Fail on null job is not valid"); |
| return; |
| } |
| |
| JobStatus prevStatus = (JobStatus)job.getStatus().clone(); |
| LOG.info("Failing job " + job.getJobID()); |
| job.fail(); |
| |
| // Inform the listeners if the job state has changed |
| JobStatus newStatus = (JobStatus)job.getStatus().clone(); |
| if (prevStatus.getRunState() != newStatus.getRunState()) { |
| JobStatusChangeEvent event = |
| new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, |
| newStatus); |
| updateJobInProgressListeners(event); |
| } |
| } |
| |
| /** |
| * Set the priority of a job |
| * @param jobid |
| * @param priority |
| * @throws IOException |
| */ |
| public synchronized void setJobPriority(org.apache.hadoop.mapreduce.JobID |
| jobid, String priority) throws IOException { |
| setJobPriority(JobID.downgrade(jobid), priority); |
| } |
| /** |
| * Set the priority of a job |
| * @param jobid id of the job |
| * @param priority new priority of the job |
| * @deprecated Use |
| * {@link #setJobPriority(org.apache.hadoop.mapreduce.JobID, String)} instead |
| */ |
| @Deprecated |
| public synchronized void setJobPriority(JobID jobid, |
| String priority) |
| throws IOException { |
| JobInProgress job = jobs.get(jobid); |
| if (null == job) { |
| LOG.info("setJobPriority(): JobId " + jobid.toString() |
| + " is not a valid job"); |
| return; |
| } |
| JobPriority newPriority = JobPriority.valueOf(priority); |
| setJobPriority(jobid, newPriority); |
| } |
| |
| void storeCompletedJob(JobInProgress job) { |
| //persists the job info in DFS |
| completedJobStatusStore.store(job); |
| } |
| |
| public JobProfile getJobProfile(org.apache.hadoop.mapreduce.JobID jobid) { |
| return getJobProfile(JobID.downgrade(jobid)); |
| } |
| |
| /** |
| * Check if the <code>job</code> has been initialized. |
| * |
| * @param job {@link JobInProgress} to be checked |
| * @return <code>true</code> if the job has been initialized, |
| * <code>false</code> otherwise |
| */ |
| private boolean isJobInited(JobInProgress job) { |
| return job.inited(); |
| } |
| |
| |
| /** |
| * @deprecated Use {@link #getJobProfile(org.apache.hadoop.mapreduce.JobID)} |
| * instead |
| */ |
| @Deprecated |
| public JobProfile getJobProfile(JobID jobid) { |
| synchronized (this) { |
| JobInProgress job = jobs.get(jobid); |
| if (job != null) { |
| // Safe to call JobInProgress.getProfile while holding the lock |
| // on the JobTracker since it isn't a synchronized method |
| return job.getProfile(); |
| } |
| } |
| return completedJobStatusStore.readJobProfile(jobid); |
| } |
| |
| /** |
| * see {@link ClientProtocol#getJobStatus(org.apache.hadoop.mapreduce.JobID)} |
| */ |
| @Override |
| public JobStatus getJobStatus(org.apache.hadoop.mapreduce.JobID jobid) { |
| return getJobStatus(JobID.downgrade(jobid)); |
| } |
| |
| /** |
| * @deprecated Use |
| * {@link #getJobStatus(org.apache.hadoop.mapreduce.JobID)} instead |
| */ |
| @Deprecated |
| public JobStatus getJobStatus(JobID jobid) { |
| if (null == jobid) { |
| LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid"); |
| return null; |
| } |
| synchronized (this) { |
| JobInProgress job = jobs.get(jobid); |
| if (job != null) { |
| // Safe to call JobInProgress.getStatus while holding the lock |
| // on the JobTracker since it isn't a synchronized method |
| return job.getStatus(); |
| } else { |
| JobStatus status = retireJobs.get(jobid); |
| if (status != null) { |
| return status; |
| } |
| } |
| } |
| return completedJobStatusStore.readJobStatus(jobid); |
| } |
| |
| private static final org.apache.hadoop.mapreduce.Counters EMPTY_COUNTERS |
| = new org.apache.hadoop.mapreduce.Counters(); |
| |
| /** |
| * see |
| * {@link ClientProtocol#getJobCounters(org.apache.hadoop.mapreduce.JobID)} |
| * |
| * @throws IOException |
| * @throws AccessControlException |
| */ |
| @Override |
| public org.apache.hadoop.mapreduce.Counters getJobCounters( |
| org.apache.hadoop.mapreduce.JobID jobid) |
| throws AccessControlException, IOException { |
| |
| JobID oldJobID = JobID.downgrade(jobid); |
| |
| synchronized (this) { |
| JobInProgress job = jobs.get(oldJobID); |
| if (job != null) { |
| // check the job-access |
| aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(), |
| Operation.VIEW_JOB_COUNTERS); |
| |
| if (!isJobInited(job)) { |
| return EMPTY_COUNTERS; |
| } |
| |
| Counters counters = job.getCounters(); |
| if (counters != null) { |
| return new org.apache.hadoop.mapreduce.Counters(counters); |
| } |
| return null; |
| } |
| } |
| |
| Counters counters = completedJobStatusStore.readCounters(oldJobID); |
| if (counters != null) { |
| return new org.apache.hadoop.mapreduce.Counters(counters); |
| } |
| return null; |
| } |
| |
| /** |
| * @deprecated Use |
| * {@link #getJobCounters(org.apache.hadoop.mapreduce.JobID)} instead |
| */ |
| @Deprecated |
| public Counters getJobCounters(JobID jobid) { |
| try { |
| return Counters.downgrade( |
| getJobCounters((org.apache.hadoop.mapreduce.JobID) jobid)); |
| } catch (AccessControlException e) { |
| return null; |
| } catch (IOException e) { |
| return null; |
| } |
| } |
| |
| private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0]; |
| |
| /** |
| * @param jobid |
| * @return array of TaskReport |
| * @deprecated Use |
| * {@link #getTaskReports(org.apache.hadoop.mapreduce.JobID, TaskType)} |
| * instead |
| */ |
| @Deprecated |
| public synchronized TaskReport[] getMapTaskReports(JobID jobid) { |
| JobInProgress job = jobs.get(jobid); |
| if (job == null || !isJobInited(job)) { |
| return EMPTY_TASK_REPORTS; |
| } else { |
| Vector<TaskReport> reports = new Vector<TaskReport>(); |
| Vector<TaskInProgress> completeMapTasks = |
| job.reportTasksInProgress(true, true); |
| for (Iterator it = completeMapTasks.iterator(); it.hasNext();) { |
| TaskInProgress tip = (TaskInProgress) it.next(); |
| reports.add(tip.generateSingleReport()); |
| } |
| Vector<TaskInProgress> incompleteMapTasks = |
| job.reportTasksInProgress(true, false); |
| for (Iterator it = incompleteMapTasks.iterator(); it.hasNext();) { |
| TaskInProgress tip = (TaskInProgress) it.next(); |
| reports.add(tip.generateSingleReport()); |
| } |
| return reports.toArray(new TaskReport[reports.size()]); |
| } |
| } |
| |
| /** |
| * @param jobid |
| * @return array of TaskReport |
| * @deprecated Use |
| * {@link #getTaskReports(org.apache.hadoop.mapreduce.JobID, TaskType)} |
| * instead |
| */ |
| @Deprecated |
| public synchronized TaskReport[] getReduceTaskReports(JobID jobid) { |
| JobInProgress job = jobs.get(jobid); |
| if (job == null || !isJobInited(job)) { |
| return EMPTY_TASK_REPORTS; |
| } else { |
| Vector<TaskReport> reports = new Vector<TaskReport>(); |
| Vector completeReduceTasks = job.reportTasksInProgress(false, true); |
| for (Iterator it = completeReduceTasks.iterator(); it.hasNext();) { |
| TaskInProgress tip = (TaskInProgress) it.next(); |
| reports.add(tip.generateSingleReport()); |
| } |
| Vector incompleteReduceTasks = job.reportTasksInProgress(false, false); |
| for (Iterator it = incompleteReduceTasks.iterator(); it.hasNext();) { |
| TaskInProgress tip = (TaskInProgress) it.next(); |
| reports.add(tip.generateSingleReport()); |
| } |
| return reports.toArray(new TaskReport[reports.size()]); |
| } |
| } |
| |
| /** |
| * @param jobid |
| * @return array of TaskReport |
| * @deprecated Use |
| * {@link #getTaskReports(org.apache.hadoop.mapreduce.JobID, TaskType)} |
| * instead |
| */ |
| @Deprecated |
| public synchronized TaskReport[] getCleanupTaskReports(JobID jobid) { |
| JobInProgress job = jobs.get(jobid); |
| if (job == null || !isJobInited(job)) { |
| return EMPTY_TASK_REPORTS; |
| } else { |
| Vector<TaskReport> reports = new Vector<TaskReport>(); |
| Vector<TaskInProgress> completeTasks = job.reportCleanupTIPs(true); |
| for (Iterator<TaskInProgress> it = completeTasks.iterator(); |
| it.hasNext();) { |
| TaskInProgress tip = it.next(); |
| reports.add(tip.generateSingleReport()); |
| } |
| Vector<TaskInProgress> incompleteTasks = job.reportCleanupTIPs(false); |
| for (Iterator<TaskInProgress> it = incompleteTasks.iterator(); |
| it.hasNext();) { |
| TaskInProgress tip = it.next(); |
| reports.add(tip.generateSingleReport()); |
| } |
| return reports.toArray(new TaskReport[reports.size()]); |
| } |
| |
| } |
| |
| /** |
| * @param jobid |
| * @return array of TaskReport |
| * @deprecated Use |
| * {@link #getTaskReports(org.apache.hadoop.mapreduce.JobID, TaskType)} |
| * instead |
| */ |
| @Deprecated |
| public synchronized TaskReport[] getSetupTaskReports(JobID jobid) { |
| JobInProgress job = jobs.get(jobid); |
| if (job == null || !isJobInited(job)) { |
| return EMPTY_TASK_REPORTS; |
| } else { |
| Vector<TaskReport> reports = new Vector<TaskReport>(); |
| Vector<TaskInProgress> completeTasks = job.reportSetupTIPs(true); |
| for (Iterator<TaskInProgress> it = completeTasks.iterator(); |
| it.hasNext();) { |
| TaskInProgress tip = it.next(); |
| reports.add(tip.generateSingleReport()); |
| } |
| Vector<TaskInProgress> incompleteTasks = job.reportSetupTIPs(false); |
| for (Iterator<TaskInProgress> it = incompleteTasks.iterator(); |
| it.hasNext();) { |
| TaskInProgress tip = it.next(); |
| reports.add(tip.generateSingleReport()); |
| } |
| return reports.toArray(new TaskReport[reports.size()]); |
| } |
| } |
| |
| /** |
| * see |
| * {@link ClientProtocol#getTaskReports(org.apache.hadoop.mapreduce.JobID, TaskType)} |
| * @throws IOException |
| * @throws AccessControlException |
| */ |
| @Override |
| public synchronized TaskReport[] getTaskReports( |
| org.apache.hadoop.mapreduce.JobID jobid, TaskType type) |
| throws AccessControlException, IOException { |
| |
| // Check authorization |
| JobInProgress job = jobs.get(jobid); |
| if (job != null) { |
| aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(), |
| Operation.VIEW_JOB_DETAILS); |
| } else { |
| return EMPTY_TASK_REPORTS; |
| } |
| |
| switch (type) { |
| case MAP : |
| return getMapTaskReports(JobID.downgrade(jobid)); |
| case REDUCE : |
| return getReduceTaskReports(JobID.downgrade(jobid)); |
| case JOB_CLEANUP: |
| return getCleanupTaskReports(JobID.downgrade(jobid)); |
| case JOB_SETUP : |
| return getSetupTaskReports(JobID.downgrade(jobid)); |
| } |
| return EMPTY_TASK_REPORTS; |
| } |
| |
| /* |
| * Returns a list of TaskCompletionEvent for the given job, |
| * starting from fromEventId. |
| */ |
| public TaskCompletionEvent[] getTaskCompletionEvents( |
| org.apache.hadoop.mapreduce.JobID jobid, int fromEventId, int maxEvents) |
| throws IOException { |
| return getTaskCompletionEvents(JobID.downgrade(jobid), |
| fromEventId, maxEvents); |
| } |
| |
| /* |
| * Returns a list of TaskCompletionEvent for the given job, |
| * starting from fromEventId. |
| * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int, int) |
| */ |
| @Deprecated |
| public TaskCompletionEvent[] getTaskCompletionEvents( |
| JobID jobid, int fromEventId, int maxEvents) throws IOException{ |
| |
| JobInProgress job = this.jobs.get(jobid); |
| if (null != job) { |
| return job.inited() ? job.getTaskCompletionEvents(fromEventId, maxEvents) |
| : TaskCompletionEvent.EMPTY_ARRAY; |
| } |
| |
| return completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents); |
| } |
| |
| private static final String[] EMPTY_TASK_DIAGNOSTICS = new String[0]; |
| |
| /** |
| * Get the diagnostics for a given task |
| * @param taskId the id of the task |
| * @return an array of the diagnostic messages |
| */ |
| public synchronized String[] getTaskDiagnostics( |
| org.apache.hadoop.mapreduce.TaskAttemptID taskId) |
| throws IOException { |
| return getTaskDiagnostics(TaskAttemptID.downgrade(taskId)); |
| } |
| |
| /** |
| * Get the diagnostics for a given task |
| * @param taskId the id of the task |
| * @return an array of the diagnostic messages |
| */ |
| @Deprecated |
| public synchronized String[] getTaskDiagnostics(TaskAttemptID taskId) |
| throws IOException { |
| List<String> taskDiagnosticInfo = null; |
| JobID jobId = taskId.getJobID(); |
| TaskID tipId = taskId.getTaskID(); |
| JobInProgress job = jobs.get(jobId); |
| if (job != null) { |
| |
| // check the access to the job. |
| aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(), |
| Operation.VIEW_JOB_DETAILS); |
| |
| if (isJobInited(job)) { |
| TaskInProgress tip = job.getTaskInProgress(tipId); |
| if (tip != null) { |
| taskDiagnosticInfo = tip.getDiagnosticInfo(taskId); |
| } |
| } |
| } |
| |
| return ((taskDiagnosticInfo == null) ? EMPTY_TASK_DIAGNOSTICS : |
| taskDiagnosticInfo.toArray(new String[taskDiagnosticInfo.size()])); |
| } |
| |
| /** Get all the TaskStatuses from the tipid. */ |
| TaskStatus[] getTaskStatuses(TaskID tipid) { |
| TaskInProgress tip = getTip(tipid); |
| return (tip == null ? new TaskStatus[0] |
| : tip.getTaskStatuses()); |
| } |
| |
| /** Returns the TaskStatus for a particular taskid. */ |
| TaskStatus getTaskStatus(TaskAttemptID taskid) { |
| TaskInProgress tip = getTip(taskid.getTaskID()); |
| return (tip == null ? null |
| : tip.getTaskStatus(taskid)); |
| } |
| |
| /** |
| * Returns the counters for the specified task in progress. |
| */ |
| Counters getTipCounters(TaskID tipid) { |
| TaskInProgress tip = getTip(tipid); |
| return (tip == null ? null : tip.getCounters()); |
| } |
| |
| /** |
| * Returns the configured task scheduler for this job tracker. |
| * @return the configured task scheduler |
| */ |
| TaskScheduler getTaskScheduler() { |
| return taskScheduler; |
| } |
| |
| /** |
| * Returns specified TaskInProgress, or null. |
| */ |
| public TaskInProgress getTip(TaskID tipid) { |
| JobInProgress job = jobs.get(tipid.getJobID()); |
| return (job == null ? null : job.getTaskInProgress(tipid)); |
| } |
| |
| /** |
| * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#killTask(org.apache.hadoop.mapreduce.TaskAttemptID, |
| * boolean) |
| */ |
| @Override |
| public synchronized boolean killTask( |
| org.apache.hadoop.mapreduce.TaskAttemptID taskid, |
| boolean shouldFail) throws IOException { |
| return killTask(TaskAttemptID.downgrade(taskid), shouldFail); |
| } |
| |
| /** Mark a Task to be killed */ |
| @Deprecated |
| public synchronized boolean killTask(TaskAttemptID taskid, |
| boolean shouldFail) throws IOException { |
| TaskInProgress tip = taskidToTIPMap.get(taskid); |
| if (tip != null) { |
| |
| // check both queue-level and job-level access |
| aclsManager.checkAccess(tip.getJob(), |
| UserGroupInformation.getCurrentUser(), |
| shouldFail ? Operation.FAIL_TASK : Operation.KILL_TASK); |
| |
| return tip.killTask(taskid, shouldFail); |
| } |
| else { |
| LOG.info("Kill task attempt failed since task " + taskid + " was not found"); |
| return false; |
| } |
| } |
| |
| /** |
| * Get tracker name for a given task id. |
| * @param taskId the name of the task |
| * @return The name of the task tracker |
| */ |
| public synchronized String getAssignedTracker(TaskAttemptID taskId) { |
| return taskidToTrackerMap.get(taskId); |
| } |
| |
| public JobStatus[] jobsToComplete() { |
| return getJobStatus(jobs.values(), true); |
| } |
| |
| /** |
| * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getSystemDir() |
| */ |
| public org.apache.hadoop.mapreduce.JobStatus[] getAllJobs() { |
| List<JobStatus> list = new ArrayList<JobStatus>(); |
| list.addAll(Arrays.asList(getJobStatus(jobs.values(),false))); |
| list.addAll(retireJobs.getAll()); |
| return list.toArray(new JobStatus[list.size()]); |
| } |
| |
| /** |
| * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getSystemDir() |
| */ |
| public String getSystemDir() { |
| Path sysDir = new Path(conf.get(JTConfig.JT_SYSTEM_DIR, "/tmp/hadoop/mapred/system")); |
| return fs.makeQualified(sysDir).toString(); |
| } |
| |
| /** |
| * @throws LoginException |
| * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir() |
| */ |
| public String getStagingAreaDir() throws IOException { |
| try { |
| final String user = |
| UserGroupInformation.getCurrentUser().getShortUserName(); |
| return getMROwner().doAs(new PrivilegedExceptionAction<String>() { |
| @Override |
| public String run() throws Exception { |
| Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, |
| "/tmp/hadoop/mapred/staging")); |
| FileSystem fs = stagingRootDir.getFileSystem(conf); |
| return fs.makeQualified(new Path(stagingRootDir, |
| user+"/.staging")).toString(); |
| } |
| }); |
| } catch(InterruptedException ie) { |
| throw new IOException(ie); |
| } |
| } |
| |
| /** |
| * @see |
| * org.apache.hadoop.mapreduce.protocol.ClientProtocol#getJobHistoryDir() |
| */ |
| public String getJobHistoryDir() { |
| return jobHistory.getCompletedJobHistoryLocation().toString(); |
| } |
| |
| /** |
| * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getQueueAdmins(String) |
| */ |
| public AccessControlList getQueueAdmins(String queueName) throws IOException { |
| AccessControlList acl = |
| queueManager.getQueueACL(queueName, QueueACL.ADMINISTER_JOBS); |
| if (acl == null) { |
| acl = new AccessControlList(" "); |
| } |
| return acl; |
| } |
| |
| /////////////////////////////////////////////////////////////// |
| // JobTracker methods |
| /////////////////////////////////////////////////////////////// |
| public JobInProgress getJob(JobID jobid) { |
| return jobs.get(jobid); |
| } |
| |
| //Get the job directory in system directory |
| Path getSystemDirectoryForJob(JobID id) { |
| return new Path(getSystemDir(), id.toString()); |
| } |
| |
| //Get the job token file in system directory |
| Path getSystemFileForJob(JobID id) { |
| return new Path(getSystemDirectoryForJob(id)+"/" + JOB_INFO_FILE); |
| } |
| |
| /** |
| * Change the run-time priority of the given job. |
| * |
| * @param jobId job id |
| * @param priority new {@link JobPriority} for the job |
| * @throws IOException |
| * @throws AccessControlException |
| */ |
| synchronized void setJobPriority(JobID jobId, JobPriority priority) |
| throws AccessControlException, IOException { |
| JobInProgress job = jobs.get(jobId); |
| if (job != null) { |
| |
| // check both queue-level and job-level access |
| aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(), |
| Operation.SET_JOB_PRIORITY); |
| |
| synchronized (taskScheduler) { |
| JobStatus oldStatus = (JobStatus)job.getStatus().clone(); |
| job.setPriority(priority); |
| JobStatus newStatus = (JobStatus)job.getStatus().clone(); |
| JobStatusChangeEvent event = |
| new JobStatusChangeEvent(job, EventType.PRIORITY_CHANGED, oldStatus, |
| newStatus); |
| updateJobInProgressListeners(event); |
| } |
| } else { |
| LOG.warn("Trying to change the priority of an unknown job: " + jobId); |
| } |
| } |
| |
| //////////////////////////////////////////////////// |
| // Methods to track all the TaskTrackers |
| //////////////////////////////////////////////////// |
| /** |
| * Accept and process a new TaskTracker profile. We might |
| * have known about the TaskTracker previously, or it might |
| * be brand-new. All task-tracker structures have already |
| * been updated. Just process the contained tasks and any |
| * jobs that might be affected. |
| */ |
| void updateTaskStatuses(TaskTrackerStatus status) { |
| String trackerName = status.getTrackerName(); |
| for (TaskStatus report : status.getTaskReports()) { |
| report.setTaskTracker(trackerName); |
| TaskAttemptID taskId = report.getTaskID(); |
| |
| // expire it |
| expireLaunchingTasks.removeTask(taskId); |
| |
| JobInProgress job = getJob(taskId.getJobID()); |
| if (job == null) { |
| // if job is not there in the cleanup list ... add it |
| synchronized (trackerToJobsToCleanup) { |
| Set<JobID> jobs = trackerToJobsToCleanup.get(trackerName); |
| if (jobs == null) { |
| jobs = new HashSet<JobID>(); |
| trackerToJobsToCleanup.put(trackerName, jobs); |
| } |
| jobs.add(taskId.getJobID()); |
| } |
| continue; |
| } |
| |
| if (!job.inited()) { |
| // if job is not yet initialized ... kill the attempt |
| synchronized (trackerToTasksToCleanup) { |
| Set<TaskAttemptID> tasks = trackerToTasksToCleanup.get(trackerName); |
| if (tasks == null) { |
| tasks = new HashSet<TaskAttemptID>(); |
| trackerToTasksToCleanup.put(trackerName, tasks); |
| } |
| tasks.add(taskId); |
| } |
| continue; |
| } |
| |
| TaskInProgress tip = taskidToTIPMap.get(taskId); |
| |
| if (tip != null) { |
| // Update the job and inform the listeners if necessary |
| JobStatus prevStatus = (JobStatus)job.getStatus().clone(); |
| // Clone TaskStatus object here, because JobInProgress |
| // or TaskInProgress can modify this object and |
| // the changes should not get reflected in TaskTrackerStatus. |
| // An old TaskTrackerStatus is used later in countMapTasks, etc. |
| job.updateTaskStatus(tip, (TaskStatus)report.clone()); |
| JobStatus newStatus = (JobStatus)job.getStatus().clone(); |
| |
| // Update the listeners if an incomplete job completes |
| if (prevStatus.getRunState() != newStatus.getRunState()) { |
| JobStatusChangeEvent event = |
| new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, |
| prevStatus, newStatus); |
| updateJobInProgressListeners(event); |
| } |
| } else { |
| LOG.info("Serious problem. While updating status, cannot find taskid " |
| + report.getTaskID()); |
| } |
| |
| // Process 'failed fetch' notifications |
| List<TaskAttemptID> failedFetchMaps = report.getFetchFailedMaps(); |
| if (failedFetchMaps != null) { |
| for (TaskAttemptID mapTaskId : failedFetchMaps) { |
| TaskInProgress failedFetchMap = taskidToTIPMap.get(mapTaskId); |
| |
| if (failedFetchMap != null) { |
| // Gather information about the map which has to be failed, if need be |
| String failedFetchTrackerName = getAssignedTracker(mapTaskId); |
| if (failedFetchTrackerName == null) { |
| failedFetchTrackerName = "Lost task tracker"; |
| } |
| failedFetchMap.getJob().fetchFailureNotification(failedFetchMap, |
| mapTaskId, |
| failedFetchTrackerName); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * We lost the task tracker! All task-tracker structures have |
| * already been updated. Just process the contained tasks and any |
| * jobs that might be affected. |
| */ |
| void lostTaskTracker(TaskTracker taskTracker) { |
| String trackerName = taskTracker.getTrackerName(); |
| LOG.info("Lost tracker '" + trackerName + "'"); |
| |
| // remove the tracker from the local structures |
| synchronized (trackerToJobsToCleanup) { |
| trackerToJobsToCleanup.remove(trackerName); |
| } |
| |
| synchronized (trackerToTasksToCleanup) { |
| trackerToTasksToCleanup.remove(trackerName); |
| } |
| |
| Set<TaskAttemptID> lostTasks = trackerToTaskMap.get(trackerName); |
| trackerToTaskMap.remove(trackerName); |
| |
| if (lostTasks != null) { |
| // List of jobs which had any of their tasks fail on this tracker |
| Set<JobInProgress> jobsWithFailures = new HashSet<JobInProgress>(); |
| for (TaskAttemptID taskId : lostTasks) { |
| TaskInProgress tip = taskidToTIPMap.get(taskId); |
| JobInProgress job = tip.getJob(); |
| |
| // Completed reduce tasks never need to be failed, because |
| // their outputs go to dfs |
| // And completed maps with zero reducers of the job |
| // never need to be failed. |
| if (!tip.isComplete() || |
| (tip.isMapTask() && !tip.isJobSetupTask() && |
| job.desiredReduces() != 0)) { |
| // if the job is done, we don't want to change anything |
| if (job.getStatus().getRunState() == JobStatus.RUNNING || |
| job.getStatus().getRunState() == JobStatus.PREP) { |
| // the state will be KILLED_UNCLEAN, if the task(map or reduce) |
| // was RUNNING on the tracker |
| TaskStatus.State killState = (tip.isRunningTask(taskId) && |
| !tip.isJobSetupTask() && !tip.isJobCleanupTask()) ? |
| TaskStatus.State.KILLED_UNCLEAN : TaskStatus.State.KILLED; |
| job.failedTask(tip, taskId, ("Lost task tracker: " + trackerName), |
| (tip.isMapTask() ? |
| TaskStatus.Phase.MAP : |
| TaskStatus.Phase.REDUCE), |
| killState, |
| trackerName); |
| jobsWithFailures.add(job); |
| } |
| } else { |
| // Completed 'reduce' task and completed 'maps' with zero |
| // reducers of the job, not failed; |
| // only removed from data-structures. |
| markCompletedTaskAttempt(trackerName, taskId); |
| } |
| } |
| |
| // Penalize this tracker for each of the jobs which |
| // had any tasks running on it when it was 'lost' |
| // Also, remove any reserved slots on this tasktracker |
| for (JobInProgress job : jobsWithFailures) { |
| job.addTrackerTaskFailure(trackerName, taskTracker); |
| } |
| |
| // Cleanup |
| taskTracker.cancelAllReservations(); |
| |
| // Purge 'marked' tasks, needs to be done |
| // here to prevent hanging references! |
| removeMarkedTasks(trackerName); |
| } |
| } |
| |
| /** |
| * Rereads the config to get hosts and exclude list file names. |
| * Rereads the files to update the hosts and exclude lists. |
| */ |
| public synchronized void refreshNodes() throws IOException { |
| String user = UserGroupInformation.getCurrentUser().getShortUserName(); |
| // check access |
| if (!isMRAdmin(UserGroupInformation.getCurrentUser())) { |
| AuditLogger.logFailure(user, Constants.REFRESH_NODES, |
| getMROwner() + " " + getSuperGroup(), Constants.JOBTRACKER, |
| Constants.UNAUTHORIZED_USER); |
| throw new AccessControlException(user + |
| " is not authorized to refresh nodes."); |
| } |
| |
| AuditLogger.logSuccess(user, Constants.REFRESH_NODES, Constants.JOBTRACKER); |
| // call the actual api |
| refreshHosts(); |
| } |
| |
| UserGroupInformation getMROwner() { |
| return aclsManager.getMROwner(); |
| } |
| |
| String getSuperGroup() { |
| return aclsManager.getSuperGroup(); |
| } |
| |
| boolean isMRAdmin(UserGroupInformation ugi) { |
| return aclsManager.isMRAdmin(ugi); |
| } |
| |
| private synchronized void refreshHosts() throws IOException { |
| // Reread the config to get HOSTS and HOSTS_EXCLUDE filenames. |
| // Update the file names and refresh internal includes and excludes list |
| LOG.info("Refreshing hosts information"); |
| Configuration conf = new Configuration(); |
| |
| hostsReader.updateFileNames(conf.get(JTConfig.JT_HOSTS_FILENAME,""), |
| conf.get(JTConfig.JT_HOSTS_EXCLUDE_FILENAME, "")); |
| hostsReader.refresh(); |
| |
| Set<String> excludeSet = new HashSet<String>(); |
| for(Map.Entry<String, TaskTracker> eSet : taskTrackers.entrySet()) { |
| String trackerName = eSet.getKey(); |
| TaskTrackerStatus status = eSet.getValue().getStatus(); |
| // Check if not include i.e not in host list or in hosts list but excluded |
| if (!inHostsList(status) || inExcludedHostsList(status)) { |
| excludeSet.add(status.getHost()); // add to rejected trackers |
| } |
| } |
| decommissionNodes(excludeSet); |
| } |
| |
| // main decommission |
| synchronized void decommissionNodes(Set<String> hosts) |
| throws IOException { |
| LOG.info("Decommissioning " + hosts.size() + " nodes"); |
| // create a list of tracker hostnames |
| synchronized (taskTrackers) { |
| synchronized (trackerExpiryQueue) { |
| int trackersDecommissioned = 0; |
| for (String host : hosts) { |
| LOG.info("Decommissioning host " + host); |
| Set<TaskTracker> trackers = hostnameToTaskTracker.remove(host); |
| if (trackers != null) { |
| for (TaskTracker tracker : trackers) { |
| LOG.info("Decommission: Losing tracker " |
| + tracker.getTrackerName() + " on host " + host); |
| removeTracker(tracker); |
| } |
| trackersDecommissioned += trackers.size(); |
| } |
| LOG.info("Host " + host + " is ready for decommissioning"); |
| } |
| getInstrumentation().setDecommissionedTrackers(trackersDecommissioned); |
| } |
| } |
| } |
| |
| /** |
| * Returns a set of excluded nodes. |
| */ |
| Collection<String> getExcludedNodes() { |
| return hostsReader.getExcludedHosts(); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // main() |
| //////////////////////////////////////////////////////////// |
| |
| /** |
| * Start the JobTracker process. This is used only for debugging. As a rule, |
| * JobTracker should be run as part of the DFS Namenode process. |
| */ |
| public static void main(String argv[] |
| ) throws IOException, InterruptedException { |
| StringUtils.startupShutdownMessage(JobTracker.class, argv, LOG); |
| |
| try { |
| if (argv.length == 0) { |
| JobTracker tracker = startTracker(new JobConf()); |
| tracker.offerService(); |
| } |
| else { |
| if ("-dumpConfiguration".equals(argv[0]) && argv.length == 1) { |
| dumpConfiguration(new PrintWriter(System.out)); |
| System.out.println(); |
| Configuration conf = new Configuration(); |
| QueueManager.dumpConfiguration(new PrintWriter(System.out), conf); |
| } |
| else { |
| System.out.println("usage: JobTracker [-dumpConfiguration]"); |
| System.exit(-1); |
| } |
| } |
| } catch (Throwable e) { |
| LOG.fatal(StringUtils.stringifyException(e)); |
| System.exit(-1); |
| } |
| } |
| |
| /** |
| * Dumps the configuration properties in Json format |
| * @param writer {@link}Writer object to which the output is written |
| * @throws IOException |
| */ |
| private static void dumpConfiguration(Writer writer) throws IOException { |
| Configuration.dumpConfiguration(new JobConf(), writer); |
| writer.write("\n"); |
| } |
| |
| /** |
| * Gets the root level queues. |
| * |
| * @return array of QueueInfo object. |
| * @throws java.io.IOException |
| */ |
| @Override |
| public QueueInfo[] getRootQueues() throws IOException { |
| return getQueueInfoArray(queueManager.getRootQueues()); |
| } |
| |
| /** |
| * Returns immediate children of queueName. |
| * |
| * @param queueName |
| * @return array of QueueInfo which are children of queueName |
| * @throws java.io.IOException |
| */ |
| @Override |
| public QueueInfo[] getChildQueues(String queueName) throws IOException { |
| return getQueueInfoArray(queueManager.getChildQueues(queueName)); |
| } |
| |
| /** |
| * Gets the root level queues. |
| * |
| * @return array of JobQueueInfo object. |
| * @throws java.io.IOException |
| */ |
| @Deprecated |
| public JobQueueInfo[] getRootJobQueues() throws IOException { |
| return queueManager.getRootQueues(); |
| } |
| |
| @Deprecated |
| public JobQueueInfo[] getJobQueues() throws IOException { |
| return queueManager.getJobQueueInfos(); |
| } |
| |
| @Deprecated |
| public JobQueueInfo getQueueInfo(String queue) throws IOException { |
| return queueManager.getJobQueueInfo(queue); |
| } |
| |
| private QueueInfo[] getQueueInfoArray(JobQueueInfo[] queues) |
| throws IOException { |
| for (JobQueueInfo queue : queues) { |
| queue.setJobStatuses(getJobsFromQueue(queue.getQueueName())); |
| } |
| return queues; |
| } |
| |
| @Override |
| public QueueInfo[] getQueues() throws IOException { |
| return getQueueInfoArray(queueManager.getJobQueueInfos()); |
| } |
| |
| @Override |
| public QueueInfo getQueue(String queue) throws IOException { |
| JobQueueInfo jqueue = queueManager.getJobQueueInfo(queue); |
| if (jqueue != null) { |
| jqueue.setJobStatuses(getJobsFromQueue(jqueue.getQueueName())); |
| } |
| return jqueue; |
| } |
| |
| public org.apache.hadoop.mapreduce.JobStatus[] getJobsFromQueue(String queue) |
| throws IOException { |
| Collection<JobInProgress> jips = null; |
| if (queueManager.getLeafQueueNames().contains(queue)) { |
| jips = taskScheduler.getJobs(queue); |
| } |
| return getJobStatus(jips,false); |
| } |
| |
| @Override |
| public org.apache.hadoop.mapreduce.QueueAclsInfo[] |
| getQueueAclsForCurrentUser() throws IOException { |
| return queueManager.getQueueAcls(UserGroupInformation.getCurrentUser()); |
| } |
| |
| private synchronized JobStatus[] getJobStatus(Collection<JobInProgress> jips, |
| boolean toComplete) { |
| if (jips == null || jips.isEmpty()) { |
| return new JobStatus[]{}; |
| } |
| ArrayList<JobStatus> jobStatusList = new ArrayList<JobStatus>(); |
| for(JobInProgress jip : jips) { |
| JobStatus status = jip.getStatus(); |
| status.setStartTime(jip.getStartTime()); |
| status.setUsername(jip.getProfile().getUser()); |
| if (toComplete) { |
| if (status.getRunState() == JobStatus.RUNNING || |
| status.getRunState() == JobStatus.PREP) { |
| jobStatusList.add(status); |
| } |
| }else { |
| jobStatusList.add(status); |
| } |
| } |
| return jobStatusList.toArray( |
| new JobStatus[jobStatusList.size()]); |
| } |
| |
| /** |
| * Returns the confgiured maximum number of tasks for a single job |
| */ |
| int getMaxTasksPerJob() { |
| return conf.getInt(JT_TASKS_PER_JOB, -1); |
| } |
| |
| @Override |
| public void refreshServiceAcl() throws IOException { |
| if (!conf.getBoolean( |
| CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) { |
| throw new AuthorizationException("Service Level Authorization not enabled!"); |
| } |
| this.interTrackerServer.refreshServiceAcl(conf, new MapReducePolicyProvider()); |
| } |
| |
| @Override |
| public void refreshQueues() throws IOException{ |
| LOG.info("Refreshing queue information. requested by : " + |
| UserGroupInformation.getCurrentUser().getShortUserName()); |
| synchronized (taskScheduler) { |
| queueManager.refreshQueues(new Configuration(this.conf), taskScheduler |
| .getQueueRefresher()); |
| } |
| } |
| |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| static class MemoryLimits { |
| private long memSizeForMapSlot = JobConf.DISABLED_MEMORY_LIMIT; |
| private long memSizeForReduceSlot = JobConf.DISABLED_MEMORY_LIMIT; |
| private long maxMemForMapTasks = JobConf.DISABLED_MEMORY_LIMIT; |
| private long maxMemForReduceTasks = JobConf.DISABLED_MEMORY_LIMIT; |
| |
| public long getMemSizeForMapSlot() { return memSizeForMapSlot; } |
| public long getMemSizeForReduceSlot() { return memSizeForReduceSlot; } |
| public long getMaxMemForMapTasks() { return maxMemForMapTasks; } |
| public long getMaxMemForReduceTasks() { return maxMemForReduceTasks; } |
| |
| public void setMemSizeForMapSlot(long mapMemSize) { |
| memSizeForMapSlot = mapMemSize; |
| } |
| public void setMemSizeForReduceSlot(long reduceMemSize) { |
| memSizeForReduceSlot = reduceMemSize; |
| } |
| public void setMaxMemForMapTasks(long maxMapMem) { |
| maxMemForMapTasks = maxMapMem; |
| } |
| public void setMaxMemForReduceTasks(long maxReduceMem) { |
| maxMemForReduceTasks = maxReduceMem; |
| } |
| |
| public boolean isMemoryConfigSet() { |
| return (getMemSizeForMapSlot() != JobConf.DISABLED_MEMORY_LIMIT |
| && getMemSizeForReduceSlot() != JobConf.DISABLED_MEMORY_LIMIT |
| && getMaxMemForMapTasks() != JobConf.DISABLED_MEMORY_LIMIT |
| && getMaxMemForReduceTasks() != JobConf.DISABLED_MEMORY_LIMIT); |
| } |
| } |
| |
| private void initializeTaskMemoryRelatedConfig() { |
| initializeMemoryRelatedConfig(conf, jtMemLimits); |
| LOG.info(new StringBuilder().append("JobTracker configured with ") |
| .append("(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT,") |
| .append(" limitMaxMemForMapTasks, limitMaxMemForReduceTasks) (") |
| .append(jtMemLimits.getMemSizeForMapSlot()).append(", ") |
| .append(jtMemLimits.getMemSizeForReduceSlot()).append(", ") |
| .append(jtMemLimits.getMaxMemForMapTasks()).append(", ") |
| .append(jtMemLimits.getMaxMemForReduceTasks()).append(")")); |
| } |
| |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| public static void initializeMemoryRelatedConfig(Configuration conf, |
| MemoryLimits memLimits) { |
| memLimits.setMemSizeForMapSlot( |
| JobConf.normalizeMemoryConfigValue(conf.getLong( |
| MRConfig.MAPMEMORY_MB, JobConf.DISABLED_MEMORY_LIMIT))); |
| memLimits.setMemSizeForReduceSlot( |
| JobConf.normalizeMemoryConfigValue(conf.getLong( |
| MRConfig.REDUCEMEMORY_MB, JobConf.DISABLED_MEMORY_LIMIT))); |
| |
| // handle @deprecated values |
| if (conf.get(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY) != null) { |
| LOG.warn( |
| JobConf.deprecatedString(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY)); |
| } |
| |
| long limitMaxMemForMapTasks, limitMaxMemForReduceTasks; |
| |
| if (conf.get(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY) != null) { |
| LOG.warn( |
| JobConf.deprecatedString(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY) |
| + " instead use " + JTConfig.JT_MAX_MAPMEMORY_MB |
| + " and " + JTConfig.JT_MAX_REDUCEMEMORY_MB |
| ); |
| |
| limitMaxMemForMapTasks = limitMaxMemForReduceTasks = |
| JobConf.normalizeMemoryConfigValue( |
| conf.getLong( |
| JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, |
| JobConf.DISABLED_MEMORY_LIMIT)); |
| if (limitMaxMemForMapTasks != JobConf.DISABLED_MEMORY_LIMIT && |
| limitMaxMemForMapTasks >= 0) { |
| limitMaxMemForMapTasks = limitMaxMemForReduceTasks = |
| limitMaxMemForMapTasks / |
| (1024 * 1024); //Converting old values in bytes to MB |
| } |
| } else { |
| limitMaxMemForMapTasks = |
| JobConf.normalizeMemoryConfigValue( |
| conf.getLong( |
| JTConfig.JT_MAX_MAPMEMORY_MB, |
| JobConf.DISABLED_MEMORY_LIMIT)); |
| limitMaxMemForReduceTasks = |
| JobConf.normalizeMemoryConfigValue( |
| conf.getLong( |
| JTConfig.JT_MAX_REDUCEMEMORY_MB, |
| JobConf.DISABLED_MEMORY_LIMIT)); |
| } |
| |
| memLimits.setMaxMemForMapTasks(limitMaxMemForMapTasks); |
| memLimits.setMaxMemForReduceTasks(limitMaxMemForReduceTasks); |
| } |
| |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| static public long getMemSizeForMapSlot() { |
| return jtMemLimits.getMemSizeForMapSlot(); |
| } |
| |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| static public long getMemSizeForReduceSlot() { |
| return jtMemLimits.getMemSizeForReduceSlot(); |
| } |
| |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| static public long getLimitMaxMemForMapTasks() { |
| return jtMemLimits.getMaxMemForMapTasks(); |
| } |
| |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| static public long getLimitMaxMemForReduceTasks() { |
| return jtMemLimits.getMaxMemForReduceTasks(); |
| } |
| |
| @Override |
| public void refreshSuperUserGroupsConfiguration() { |
| LOG.info("Refreshing superuser proxy groups mapping "); |
| |
| ProxyUsers.refreshSuperUserGroupsConfiguration(); |
| } |
| |
| @Override |
| public void refreshUserToGroupsMappings() throws IOException { |
| LOG.info("Refreshing all user-to-groups mappings. Requested by user: " + |
| UserGroupInformation.getCurrentUser().getShortUserName()); |
| |
| Groups.getUserToGroupsMappingService().refresh(); |
| } |
| |
| /** |
| * Check the job if it has invalid requirements and throw and IOException if does have. |
| * |
| * @param job |
| * @throws IOException |
| */ |
| void checkMemoryRequirements(JobInProgress job) |
| throws IOException { |
| if (!jtMemLimits.isMemoryConfigSet()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Per-Task memory configuration is not set on JT. " |
| + "Not checking the job for invalid memory requirements."); |
| } |
| return; |
| } |
| |
| boolean invalidJob = false; |
| String msg = ""; |
| long maxMemForMapTask = job.getMemoryForMapTask(); |
| long maxMemForReduceTask = job.getMemoryForReduceTask(); |
| |
| if (maxMemForMapTask == JobConf.DISABLED_MEMORY_LIMIT |
| || maxMemForReduceTask == JobConf.DISABLED_MEMORY_LIMIT) { |
| invalidJob = true; |
| msg = "Invalid job requirements."; |
| } |
| |
| if (maxMemForMapTask > getLimitMaxMemForMapTasks() |
| || maxMemForReduceTask > getLimitMaxMemForReduceTasks()) { |
| invalidJob = true; |
| msg = "Exceeds the cluster's max-memory-limit."; |
| } |
| |
| if (invalidJob) { |
| StringBuilder jobStr = |
| new StringBuilder().append(job.getJobID().toString()).append("(") |
| .append(maxMemForMapTask).append(" memForMapTasks ").append( |
| maxMemForReduceTask).append(" memForReduceTasks): "); |
| LOG.warn(jobStr.toString() + msg); |
| |
| throw new IOException(jobStr.toString() + msg); |
| } |
| } |
| |
| synchronized String getFaultReport(String host) { |
| FaultInfo fi = faultyTrackers.getFaultInfo(host, false); |
| if (fi == null) { |
| return ""; |
| } |
| return fi.getTrackerFaultReport(); |
| } |
| |
| synchronized Set<ReasonForBlackListing> getReasonForBlackList(String host) { |
| FaultInfo fi = faultyTrackers.getFaultInfo(host, false); |
| if (fi == null) { |
| return new HashSet<ReasonForBlackListing>(); |
| } |
| return fi.getReasonforblacklisting(); |
| } |
| |
| synchronized Collection<BlackListInfo> getBlackListedTrackers() { |
| Collection<BlackListInfo> blackListedTrackers = |
| new ArrayList<BlackListInfo>(); |
| for(TaskTrackerStatus tracker : blacklistedTaskTrackers()) { |
| String hostName = tracker.getHost(); |
| BlackListInfo bi = new BlackListInfo(); |
| bi.setTrackerName(tracker.getTrackerName()); |
| Set<ReasonForBlackListing> rfbs = |
| getReasonForBlackList(hostName); |
| StringBuffer sb = new StringBuffer(); |
| for(ReasonForBlackListing rfb : rfbs) { |
| sb.append(rfb.toString()); |
| sb.append(","); |
| } |
| if (sb.length() > 0) { |
| sb.replace(sb.length()-1, sb.length(), ""); |
| } |
| bi.setReasonForBlackListing(sb.toString()); |
| bi.setBlackListReport( |
| getFaultReport(hostName)); |
| blackListedTrackers.add(bi); |
| } |
| return blackListedTrackers; |
| } |
| |
| /** Test method to increment the fault |
| * This method is synchronized to make sure that the locking order |
| * "faultyTrackers.potentiallyFaultyTrackers lock followed by taskTrackers |
| * lock" is under JobTracker lock to avoid deadlocks. |
| */ |
| synchronized void incrementFaults(String hostName) { |
| faultyTrackers.incrementFaults(hostName); |
| } |
| |
| JobTracker(final JobConf conf, Clock clock, boolean ignoredForSimulation) |
| throws IOException { |
| this.clock = clock; |
| this.conf = conf; |
| trackerIdentifier = getDateFormat().format(new Date()); |
| |
| if (fs == null) { |
| fs = FileSystem.get(conf); |
| } |
| this.localFs = FileSystem.getLocal(conf); |
| |
| tasktrackerExpiryInterval = |
| conf.getLong("mapred.tasktracker.expiry.interval", 10 * 60 * 1000); |
| retiredJobsCacheSize = |
| conf.getInt("mapred.job.tracker.retiredjobs.cache.size", 1000); |
| |
| // min time before retire |
| MAX_BLACKLISTS_PER_TRACKER = |
| conf.getInt("mapred.max.tracker.blacklists", 4); |
| NUM_HEARTBEATS_IN_SECOND = |
| conf.getInt("mapred.heartbeats.in.second", 100); |
| |
| // Set ports, start RPC servers, setup security policy etc. |
| InetSocketAddress addr = getAddress(conf); |
| this.localMachine = addr.getHostName(); |
| this.port = addr.getPort(); |
| UserGroupInformation.setConfiguration(conf); |
| SecurityUtil.login(conf, JTConfig.JT_KEYTAB_FILE, JTConfig.JT_USER_NAME, |
| localMachine); |
| |
| secretManager = null; |
| |
| this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""), |
| conf.get("mapred.hosts.exclude", "")); |
| // queue manager |
| Configuration clusterConf = new Configuration(this.conf); |
| queueManager = new QueueManager(clusterConf); |
| |
| aclsManager = new ACLsManager(conf, new JobACLsManager(conf), queueManager); |
| |
| LOG.info("Starting jobtracker with owner as " + |
| getMROwner().getShortUserName() + " and supergroup as " + |
| getSuperGroup()); |
| |
| // Create the scheduler |
| Class<? extends TaskScheduler> schedulerClass |
| = conf.getClass("mapred.jobtracker.taskScheduler", |
| JobQueueTaskScheduler.class, TaskScheduler.class); |
| taskScheduler = |
| (TaskScheduler)ReflectionUtils.newInstance(schedulerClass, conf); |
| |
| // Create the jetty server |
| InetSocketAddress infoSocAddr = NetUtils.createSocketAddr( |
| conf.get("mapred.job.tracker.http.address", "0.0.0.0:50030")); |
| String infoBindAddress = infoSocAddr.getHostName(); |
| int tmpInfoPort = infoSocAddr.getPort(); |
| this.startTime = clock.getTime(); |
| infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort, |
| tmpInfoPort == 0, conf); |
| infoServer.setAttribute("job.tracker", this); |
| |
| // initialize history parameters. |
| FileSystem historyFS = null; |
| |
| jobHistory = new JobHistory(); |
| final JobTracker jtFinal = this; |
| try { |
| historyFS = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() { |
| public FileSystem run() throws IOException { |
| jobHistory.init(jtFinal, conf, jtFinal.localMachine, jtFinal.startTime); |
| jobHistory.initDone(conf, fs); |
| final String historyLogDir = |
| jobHistory.getCompletedJobHistoryLocation().toString(); |
| infoServer.setAttribute("historyLogDir", historyLogDir); |
| return new Path(historyLogDir).getFileSystem(conf); |
| } |
| }); |
| } catch (InterruptedException e1) { |
| throw (IOException) new IOException().initCause(e1); |
| } |
| |
| infoServer.setAttribute("fileSys", historyFS); |
| infoServer.setAttribute("jobHistoryHistory", jobHistory); |
| infoServer.setAttribute("jobHistoryGlobber", JobHistory.globString()); |
| infoServer.setAttribute("jobHistoryLeafGlobber", JobHistory.leafGlobString()); |
| infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class); |
| infoServer.start(); |
| this.infoPort = this.infoServer.getPort(); |
| |
| // Initialize instrumentation |
| JobTrackerInstrumentation tmp; |
| Class<? extends JobTrackerInstrumentation> metricsInst = |
| getInstrumentationClass(conf); |
| try { |
| java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c = |
| metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} ); |
| tmp = c.newInstance(this, conf); |
| } catch(Exception e) { |
| //Reflection can throw lots of exceptions -- handle them all by |
| //falling back on the default. |
| LOG.error("failed to initialize job tracker metrics", e); |
| tmp = new JobTrackerMetricsInst(this, conf); |
| } |
| myInstrumentation = tmp; |
| |
| // start the recovery manager |
| recoveryManager = new RecoveryManager(); |
| |
| this.dnsToSwitchMapping = ReflectionUtils.newInstance( |
| conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class, |
| DNSToSwitchMapping.class), conf); |
| this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", |
| NetworkTopology.DEFAULT_HOST_LEVEL); |
| |
| //initializes the job status store |
| completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager); |
| } |
| |
| /** |
| * Get the path of the locally stored job file |
| * @param jobId id of the job |
| * @return the path of the job file on the local file system |
| */ |
| String getLocalJobFilePath(org.apache.hadoop.mapreduce.JobID jobId){ |
| return System.getProperty("hadoop.log.dir") + |
| File.separator + jobId + "_conf.xml"; |
| } |
| |
| /** |
| * Discard a current delegation token. |
| */ |
| @Override |
| public void cancelDelegationToken(Token<DelegationTokenIdentifier> token |
| ) throws IOException, |
| InterruptedException { |
| String user = UserGroupInformation.getCurrentUser().getUserName(); |
| secretManager.cancelToken(token, user); |
| } |
| |
| /** |
| * Get a new delegation token. |
| */ |
| @Override |
| public Token<DelegationTokenIdentifier> |
| getDelegationToken(Text renewer |
| )throws IOException, InterruptedException { |
| if (!isAllowedDelegationTokenOp()) { |
| throw new IOException( |
| "Delegation Token can be issued only with kerberos authentication"); |
| } |
| UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); |
| Text owner = new Text(ugi.getUserName()); |
| Text realUser = null; |
| if (ugi.getRealUser() != null) { |
| realUser = new Text(ugi.getRealUser().getUserName()); |
| } |
| DelegationTokenIdentifier ident = |
| new DelegationTokenIdentifier(owner, renewer, realUser); |
| return new Token<DelegationTokenIdentifier>(ident, secretManager); |
| } |
| |
| /** |
| * Renew a delegation token to extend its lifetime. |
| */ |
| @Override |
| public long renewDelegationToken(Token<DelegationTokenIdentifier> token |
| ) throws IOException, |
| InterruptedException { |
| if (!isAllowedDelegationTokenOp()) { |
| throw new IOException( |
| "Delegation Token can be renewed only with kerberos authentication"); |
| } |
| String user = UserGroupInformation.getCurrentUser().getUserName(); |
| return secretManager.renewToken(token, user); |
| } |
| |
| JobACLsManager getJobACLsManager() { |
| return aclsManager.getJobACLsManager(); |
| } |
| |
| ACLsManager getACLsManager() { |
| return aclsManager; |
| } |
| |
| /** |
| * |
| * @return true if delegation token operation is allowed |
| */ |
| private boolean isAllowedDelegationTokenOp() throws IOException { |
| AuthenticationMethod authMethod = UserGroupInformation |
| .getRealAuthenticationMethod(UserGroupInformation.getCurrentUser()); |
| if (UserGroupInformation.isSecurityEnabled() |
| && (authMethod != AuthenticationMethod.KERBEROS)) { |
| return false; |
| } |
| return true; |
| } |
| } |