| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapred; |
| |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.mapred.FairScheduler.JobInfo; |
| import org.apache.hadoop.mapreduce.TaskType; |
| |
| public class PoolSchedulable extends Schedulable { |
| public static final Log LOG = LogFactory.getLog( |
| PoolSchedulable.class.getName()); |
| |
| private FairScheduler scheduler; |
| private Pool pool; |
| private TaskType taskType; |
| private PoolManager poolMgr; |
| private List<JobSchedulable> jobScheds = new LinkedList<JobSchedulable>(); |
| private int demand = 0; |
| |
| // Variables used for preemption |
| long lastTimeAtMinShare; |
| long lastTimeAtHalfFairShare; |
| |
| public PoolSchedulable(FairScheduler scheduler, Pool pool, TaskType type) { |
| this.scheduler = scheduler; |
| this.pool = pool; |
| this.taskType = type; |
| this.poolMgr = scheduler.getPoolManager(); |
| long currentTime = scheduler.getClock().getTime(); |
| this.lastTimeAtMinShare = currentTime; |
| this.lastTimeAtHalfFairShare = currentTime; |
| } |
| |
| public void addJob(JobInProgress job) { |
| JobInfo info = scheduler.getJobInfo(job); |
| jobScheds.add(taskType == TaskType.MAP ? |
| info.mapSchedulable : info.reduceSchedulable); |
| } |
| |
| public void removeJob(JobInProgress job) { |
| for (Iterator<JobSchedulable> it = jobScheds.iterator(); it.hasNext();) { |
| JobSchedulable jobSched = it.next(); |
| if (jobSched.getJob() == job) { |
| it.remove(); |
| break; |
| } |
| } |
| } |
| |
| /** |
| * Update demand by asking jobs in the pool to update |
| */ |
| @Override |
| public void updateDemand() { |
| demand = 0; |
| for (JobSchedulable sched: jobScheds) { |
| sched.updateDemand(); |
| demand += sched.getDemand(); |
| } |
| } |
| |
| /** |
| * Distribute the pool's fair share among its jobs |
| */ |
| @Override |
| public void redistributeShare() { |
| if (pool.getSchedulingMode() == SchedulingMode.FAIR) { |
| SchedulingAlgorithms.computeFairShares(jobScheds, getFairShare()); |
| } else { |
| for (JobSchedulable sched: jobScheds) { |
| sched.setFairShare(0); |
| } |
| } |
| } |
| |
| @Override |
| public int getDemand() { |
| return demand; |
| } |
| |
| @Override |
| public int getMinShare() { |
| return poolMgr.getAllocation(pool.getName(), taskType); |
| } |
| |
| @Override |
| public double getWeight() { |
| return poolMgr.getPoolWeight(pool.getName()); |
| } |
| |
| @Override |
| public JobPriority getPriority() { |
| return JobPriority.NORMAL; |
| } |
| |
| @Override |
| public int getRunningTasks() { |
| int ans = 0; |
| for (JobSchedulable sched: jobScheds) { |
| ans += sched.getRunningTasks(); |
| } |
| return ans; |
| } |
| |
| @Override |
| public long getStartTime() { |
| return 0; |
| } |
| |
| @Override |
| public Task assignTask(TaskTrackerStatus tts, long currentTime, |
| Collection<JobInProgress> visited) throws IOException { |
| SchedulingMode mode = pool.getSchedulingMode(); |
| Comparator<Schedulable> comparator; |
| if (mode == SchedulingMode.FIFO) { |
| comparator = new SchedulingAlgorithms.FifoComparator(); |
| } else if (mode == SchedulingMode.FAIR) { |
| comparator = new SchedulingAlgorithms.FairShareComparator(); |
| } else { |
| throw new RuntimeException("Unsupported pool scheduling mode " + mode); |
| } |
| Collections.sort(jobScheds, comparator); |
| for (JobSchedulable sched: jobScheds) { |
| Task task = sched.assignTask(tts, currentTime, visited); |
| if (task != null) |
| return task; |
| } |
| return null; |
| } |
| |
| @Override |
| public String getName() { |
| return pool.getName(); |
| } |
| |
| Pool getPool() { |
| return pool; |
| } |
| |
| public TaskType getTaskType() { |
| return taskType; |
| } |
| |
| public Collection<JobSchedulable> getJobSchedulables() { |
| return jobScheds; |
| } |
| |
| public long getLastTimeAtMinShare() { |
| return lastTimeAtMinShare; |
| } |
| |
| public void setLastTimeAtMinShare(long lastTimeAtMinShare) { |
| this.lastTimeAtMinShare = lastTimeAtMinShare; |
| } |
| |
| public long getLastTimeAtHalfFairShare() { |
| return lastTimeAtHalfFairShare; |
| } |
| |
| public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) { |
| this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare; |
| } |
| } |