| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.mapred; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.List; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; |
| import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; |
| |
| /** |
| * A {@link TaskScheduler} that keeps jobs in a queue in priority order (FIFO |
| * by default). |
| */ |
| class JobQueueTaskScheduler extends TaskScheduler { |
| |
| private static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3; |
| public static final Log LOG = LogFactory.getLog(JobQueueTaskScheduler.class); |
| |
| protected JobQueueJobInProgressListener jobQueueJobInProgressListener; |
| protected EagerTaskInitializationListener eagerTaskInitializationListener; |
| private float padFraction; |
| |
| public JobQueueTaskScheduler() { |
| this.jobQueueJobInProgressListener = new JobQueueJobInProgressListener(); |
| } |
| |
| @Override |
| public synchronized void start() throws IOException { |
| super.start(); |
| taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener); |
| eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager); |
| eagerTaskInitializationListener.start(); |
| taskTrackerManager.addJobInProgressListener( |
| eagerTaskInitializationListener); |
| } |
| |
| @Override |
| public synchronized void terminate() throws IOException { |
| if (jobQueueJobInProgressListener != null) { |
| taskTrackerManager.removeJobInProgressListener( |
| jobQueueJobInProgressListener); |
| } |
| if (eagerTaskInitializationListener != null) { |
| taskTrackerManager.removeJobInProgressListener( |
| eagerTaskInitializationListener); |
| eagerTaskInitializationListener.terminate(); |
| } |
| super.terminate(); |
| } |
| |
| @Override |
| public synchronized void setConf(Configuration conf) { |
| super.setConf(conf); |
| padFraction = conf.getFloat(JTConfig.JT_TASK_ALLOC_PAD_FRACTION, |
| 0.01f); |
| this.eagerTaskInitializationListener = |
| new EagerTaskInitializationListener(conf); |
| } |
| |
| @Override |
| public synchronized List<Task> assignTasks(TaskTracker taskTracker) |
| throws IOException { |
| TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); |
| ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus(); |
| final int numTaskTrackers = clusterStatus.getTaskTrackers(); |
| final int clusterMapCapacity = clusterStatus.getMaxMapTasks(); |
| final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks(); |
| |
| Collection<JobInProgress> jobQueue = |
| jobQueueJobInProgressListener.getJobQueue(); |
| |
| // |
| // Get map + reduce counts for the current tracker. |
| // |
| final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots(); |
| final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots(); |
| final int trackerRunningMaps = taskTrackerStatus.countMapTasks(); |
| final int trackerRunningReduces = taskTrackerStatus.countReduceTasks(); |
| |
| // Assigned tasks |
| List<Task> assignedTasks = new ArrayList<Task>(); |
| |
| // |
| // Compute (running + pending) map and reduce task numbers across pool |
| // |
| int remainingReduceLoad = 0; |
| int remainingMapLoad = 0; |
| synchronized (jobQueue) { |
| for (JobInProgress job : jobQueue) { |
| if (job.getStatus().getRunState() == JobStatus.RUNNING) { |
| remainingMapLoad += (job.desiredMaps() - job.finishedMaps()); |
| if (job.scheduleReduces()) { |
| remainingReduceLoad += |
| (job.desiredReduces() - job.finishedReduces()); |
| } |
| } |
| } |
| } |
| |
| // Compute the 'load factor' for maps and reduces |
| double mapLoadFactor = 0.0; |
| if (clusterMapCapacity > 0) { |
| mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity; |
| } |
| double reduceLoadFactor = 0.0; |
| if (clusterReduceCapacity > 0) { |
| reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity; |
| } |
| |
| // |
| // In the below steps, we allocate first map tasks (if appropriate), |
| // and then reduce tasks if appropriate. We go through all jobs |
| // in order of job arrival; jobs only get serviced if their |
| // predecessors are serviced, too. |
| // |
| |
| // |
| // We assign tasks to the current taskTracker if the given machine |
| // has a workload that's less than the maximum load of that kind of |
| // task. |
| // However, if the cluster is close to getting loaded i.e. we don't |
| // have enough _padding_ for speculative executions etc., we only |
| // schedule the "highest priority" task i.e. the task from the job |
| // with the highest priority. |
| // |
| |
| final int trackerCurrentMapCapacity = |
| Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), |
| trackerMapCapacity); |
| int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps; |
| boolean exceededMapPadding = false; |
| if (availableMapSlots > 0) { |
| exceededMapPadding = |
| exceededPadding(true, clusterStatus, trackerMapCapacity); |
| } |
| |
| int numLocalMaps = 0; |
| int numNonLocalMaps = 0; |
| scheduleMaps: |
| for (int i=0; i < availableMapSlots; ++i) { |
| synchronized (jobQueue) { |
| for (JobInProgress job : jobQueue) { |
| if (job.getStatus().getRunState() != JobStatus.RUNNING) { |
| continue; |
| } |
| |
| Task t = null; |
| |
| // Try to schedule a node-local or rack-local Map task |
| t = |
| job.obtainNewLocalMapTask(taskTrackerStatus, numTaskTrackers, |
| taskTrackerManager.getNumberOfUniqueHosts()); |
| if (t != null) { |
| assignedTasks.add(t); |
| ++numLocalMaps; |
| |
| // Don't assign map tasks to the hilt! |
| // Leave some free slots in the cluster for future task-failures, |
| // speculative tasks etc. beyond the highest priority job |
| if (exceededMapPadding) { |
| break scheduleMaps; |
| } |
| |
| // Try all jobs again for the next Map task |
| break; |
| } |
| |
| // Try to schedule a non-local Map task |
| t = |
| job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers, |
| taskTrackerManager.getNumberOfUniqueHosts()); |
| |
| if (t != null) { |
| assignedTasks.add(t); |
| ++numNonLocalMaps; |
| |
| // We assign at most 1 off-switch or speculative task |
| // This is to prevent TaskTrackers from stealing local-tasks |
| // from other TaskTrackers. |
| break scheduleMaps; |
| } |
| } |
| } |
| } |
| int assignedMaps = assignedTasks.size(); |
| |
| // |
| // Same thing, but for reduce tasks |
| // However we _never_ assign more than 1 reduce task per heartbeat |
| // |
| final int trackerCurrentReduceCapacity = |
| Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity), |
| trackerReduceCapacity); |
| final int availableReduceSlots = |
| Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1); |
| boolean exceededReducePadding = false; |
| if (availableReduceSlots > 0) { |
| exceededReducePadding = exceededPadding(false, clusterStatus, |
| trackerReduceCapacity); |
| synchronized (jobQueue) { |
| for (JobInProgress job : jobQueue) { |
| if (job.getStatus().getRunState() != JobStatus.RUNNING || |
| job.numReduceTasks == 0) { |
| continue; |
| } |
| |
| Task t = |
| job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers, |
| taskTrackerManager.getNumberOfUniqueHosts() |
| ); |
| if (t != null) { |
| assignedTasks.add(t); |
| break; |
| } |
| |
| // Don't assign reduce tasks to the hilt! |
| // Leave some free slots in the cluster for future task-failures, |
| // speculative tasks etc. beyond the highest priority job |
| if (exceededReducePadding) { |
| break; |
| } |
| } |
| } |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Task assignments for " + taskTrackerStatus.getTrackerName() + " --> " + |
| "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " + |
| trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" + |
| (trackerCurrentMapCapacity - trackerRunningMaps) + ", " + |
| assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps + |
| ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " + |
| trackerCurrentReduceCapacity + "," + trackerRunningReduces + |
| "] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) + |
| ", " + (assignedTasks.size()-assignedMaps) + "]"); |
| } |
| |
| return assignedTasks; |
| } |
| |
| private boolean exceededPadding(boolean isMapTask, |
| ClusterStatus clusterStatus, |
| int maxTaskTrackerSlots) { |
| int numTaskTrackers = clusterStatus.getTaskTrackers(); |
| int totalTasks = |
| (isMapTask) ? clusterStatus.getMapTasks() : |
| clusterStatus.getReduceTasks(); |
| int totalTaskCapacity = |
| isMapTask ? clusterStatus.getMaxMapTasks() : |
| clusterStatus.getMaxReduceTasks(); |
| |
| Collection<JobInProgress> jobQueue = |
| jobQueueJobInProgressListener.getJobQueue(); |
| |
| boolean exceededPadding = false; |
| synchronized (jobQueue) { |
| int totalNeededTasks = 0; |
| for (JobInProgress job : jobQueue) { |
| if (job.getStatus().getRunState() != JobStatus.RUNNING || |
| job.numReduceTasks == 0) { |
| continue; |
| } |
| |
| // |
| // Beyond the highest-priority task, reserve a little |
| // room for failures and speculative executions; don't |
| // schedule tasks to the hilt. |
| // |
| totalNeededTasks += |
| isMapTask ? job.desiredMaps() : job.desiredReduces(); |
| int padding = 0; |
| if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) { |
| padding = |
| Math.min(maxTaskTrackerSlots, |
| (int) (totalNeededTasks * padFraction)); |
| } |
| if (totalTasks + padding >= totalTaskCapacity) { |
| exceededPadding = true; |
| break; |
| } |
| } |
| } |
| |
| return exceededPadding; |
| } |
| |
| @Override |
| public synchronized Collection<JobInProgress> getJobs(String queueName) { |
| return jobQueueJobInProgressListener.getJobQueue(); |
| } |
| } |