| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tajo.querymaster; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.util.RackResolver; |
| import org.apache.tajo.TaskAttemptId; |
| import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; |
| import org.apache.tajo.conf.TajoConf; |
| import org.apache.tajo.engine.planner.global.ExecutionBlock; |
| import org.apache.tajo.engine.planner.global.MasterPlan; |
| import org.apache.tajo.engine.query.TaskRequest; |
| import org.apache.tajo.engine.query.TaskRequestImpl; |
| import org.apache.tajo.ipc.QueryCoordinatorProtocol; |
| import org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService; |
| import org.apache.tajo.ipc.TajoWorkerProtocol; |
| import org.apache.tajo.master.cluster.WorkerConnectionInfo; |
| import org.apache.tajo.master.event.*; |
| import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; |
| import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; |
| import org.apache.tajo.plan.serder.LogicalNodeSerializer; |
| import org.apache.tajo.resource.NodeResources; |
| import org.apache.tajo.rpc.*; |
| import org.apache.tajo.service.ServiceTracker; |
| import org.apache.tajo.storage.DataLocation; |
| import org.apache.tajo.storage.fragment.FileFragment; |
| import org.apache.tajo.storage.fragment.Fragment; |
| import org.apache.tajo.util.NetUtils; |
| import org.apache.tajo.util.RpcParameterFactory; |
| import org.apache.tajo.util.TUtil; |
| import org.apache.tajo.worker.FetchImpl; |
| |
| import java.net.InetSocketAddress; |
| import java.util.*; |
| import java.util.Map.Entry; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import static org.apache.tajo.ResourceProtos.*; |
| |
| public class DefaultTaskScheduler extends AbstractTaskScheduler { |
| private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class); |
| |
| private static final String REQUEST_MAX_NUM = "tajo.qm.task-scheduler.request.max-num"; |
| |
| private final TaskSchedulerContext context; |
| private Stage stage; |
| private TajoConf tajoConf; |
| private Properties rpcParams; |
| |
| private Thread schedulingThread; |
| private volatile boolean isStopped; |
| private AtomicBoolean needWakeup = new AtomicBoolean(); |
| |
| private ScheduledRequests scheduledRequests; |
| |
| private int minTaskMemory; |
| private int nextTaskId = 0; |
| private int scheduledObjectNum = 0; |
| private boolean isLeaf; |
| private int schedulerDelay; |
| private int maximumRequestContainer; |
| |
| // candidate workers for locality of high priority |
| private Set<Integer> candidateWorkers = Sets.newHashSet(); |
| |
| public DefaultTaskScheduler(TaskSchedulerContext context, Stage stage) { |
| super(DefaultTaskScheduler.class.getName()); |
| this.context = context; |
| this.stage = stage; |
| } |
| |
| @Override |
| public void init(Configuration conf) { |
| tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class); |
| rpcParams = RpcParameterFactory.get(new TajoConf()); |
| |
| scheduledRequests = new ScheduledRequests(); |
| minTaskMemory = tajoConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY); |
| schedulerDelay= tajoConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_TASK_SCHEDULER_DELAY); |
| isLeaf = stage.getMasterPlan().isLeaf(stage.getBlock()); |
| |
| this.schedulingThread = new Thread() { |
| public void run() { |
| |
| while (!isStopped && !Thread.currentThread().isInterrupted()) { |
| |
| try { |
| schedule(); |
| } catch (InterruptedException e) { |
| if (isStopped) { |
| break; |
| } else { |
| LOG.fatal(e.getMessage(), e); |
| stage.abort(StageState.ERROR); |
| } |
| } catch (Throwable e) { |
| LOG.fatal(e.getMessage(), e); |
| stage.abort(StageState.ERROR); |
| break; |
| } |
| } |
| LOG.info("TaskScheduler schedulingThread stopped"); |
| } |
| }; |
| super.init(conf); |
| } |
| |
| @Override |
| public void start() { |
| LOG.info("Start TaskScheduler"); |
| maximumRequestContainer = tajoConf.getInt(REQUEST_MAX_NUM, stage.getContext().getWorkerMap().size() * 2); |
| |
| if (isLeaf) { |
| candidateWorkers.addAll(getWorkerIds(getLeafTaskHosts())); |
| } else { |
| //find assigned hosts for Non-Leaf locality in children executionBlock |
| List<ExecutionBlock> executionBlockList = stage.getMasterPlan().getChilds(stage.getBlock()); |
| for (ExecutionBlock executionBlock : executionBlockList) { |
| Stage childStage = stage.getContext().getStage(executionBlock.getId()); |
| candidateWorkers.addAll(childStage.getAssignedWorkerMap().keySet()); |
| } |
| } |
| |
| this.schedulingThread.start(); |
| super.start(); |
| } |
| |
| @Override |
| public void stop() { |
| isStopped = true; |
| |
| if (schedulingThread != null) { |
| synchronized (schedulingThread) { |
| schedulingThread.interrupt(); |
| } |
| } |
| candidateWorkers.clear(); |
| scheduledRequests.clear(); |
| LOG.info("Task Scheduler stopped"); |
| super.stop(); |
| } |
| |
| private Fragment[] fragmentsForNonLeafTask; |
| private Fragment[] broadcastFragmentsForNonLeafTask; |
| |
| public void schedule() throws Exception { |
| try { |
| final int incompleteTaskNum = scheduledRequests.leafTaskNum() + scheduledRequests.nonLeafTaskNum(); |
| if (incompleteTaskNum == 0) { |
| needWakeup.set(true); |
| // all task is done or tasks is not scheduled |
| synchronized (schedulingThread) { |
| schedulingThread.wait(1000); |
| } |
| } else { |
| LinkedList<TaskRequestEvent> taskRequests = createTaskRequest(incompleteTaskNum); |
| |
| if (taskRequests.size() == 0) { |
| synchronized (schedulingThread) { |
| schedulingThread.wait(schedulerDelay); |
| } |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Get " + taskRequests.size() + " taskRequestEvents "); |
| } |
| |
| if (isLeaf) { |
| scheduledRequests.assignToLeafTasks(taskRequests); |
| } else { |
| scheduledRequests.assignToNonLeafTasks(taskRequests); |
| } |
| } |
| } |
| } catch (TimeoutException e) { |
| LOG.error(e.getMessage()); |
| } |
| } |
| |
| @Override |
| public void handle(TaskSchedulerEvent event) { |
| if (event.getType() == EventType.T_SCHEDULE) { |
| if (event instanceof FragmentScheduleEvent) { |
| FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event; |
| if (context.isLeafQuery()) { |
| TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext(); |
| Task task = Stage.newEmptyTask(context, taskContext, stage, nextTaskId++); |
| task.addFragment(castEvent.getLeftFragment(), true); |
| scheduledObjectNum++; |
| if (castEvent.hasRightFragments()) { |
| task.addFragments(castEvent.getRightFragments()); |
| } |
| stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); |
| } else { |
| fragmentsForNonLeafTask = new FileFragment[2]; |
| fragmentsForNonLeafTask[0] = castEvent.getLeftFragment(); |
| if (castEvent.hasRightFragments()) { |
| FileFragment[] rightFragments = castEvent.getRightFragments().toArray(new FileFragment[]{}); |
| fragmentsForNonLeafTask[1] = rightFragments[0]; |
| if (rightFragments.length > 1) { |
| broadcastFragmentsForNonLeafTask = new FileFragment[rightFragments.length - 1]; |
| System.arraycopy(rightFragments, 1, broadcastFragmentsForNonLeafTask, 0, broadcastFragmentsForNonLeafTask.length); |
| } else { |
| broadcastFragmentsForNonLeafTask = null; |
| } |
| } |
| } |
| } else if (event instanceof FetchScheduleEvent) { |
| FetchScheduleEvent castEvent = (FetchScheduleEvent) event; |
| Map<String, List<FetchImpl>> fetches = castEvent.getFetches(); |
| TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext(); |
| Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++); |
| scheduledObjectNum++; |
| for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) { |
| task.addFetches(eachFetch.getKey(), eachFetch.getValue()); |
| task.addFragment(fragmentsForNonLeafTask[0], true); |
| if (fragmentsForNonLeafTask[1] != null) { |
| task.addFragment(fragmentsForNonLeafTask[1], true); |
| } |
| } |
| if (broadcastFragmentsForNonLeafTask != null && broadcastFragmentsForNonLeafTask.length > 0) { |
| task.addFragments(Arrays.asList(broadcastFragmentsForNonLeafTask)); |
| } |
| stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); |
| } else if (event instanceof TaskAttemptToSchedulerEvent) { |
| TaskAttemptToSchedulerEvent castEvent = (TaskAttemptToSchedulerEvent) event; |
| if (context.isLeafQuery()) { |
| scheduledRequests.addLeafTask(castEvent); |
| } else { |
| scheduledRequests.addNonLeafTask(castEvent); |
| } |
| |
| if (needWakeup.getAndSet(false)) { |
| //wake up scheduler thread after scheduled |
| synchronized (schedulingThread) { |
| schedulingThread.notifyAll(); |
| } |
| } |
| } |
| } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) { |
| // when a stage is killed, unassigned query unit attmpts are canceled from the scheduler. |
| // This event is triggered by TaskAttempt. |
| TaskAttemptToSchedulerEvent castedEvent = (TaskAttemptToSchedulerEvent) event; |
| scheduledRequests.leafTasks.remove(castedEvent.getTaskAttempt().getId()); |
| LOG.info(castedEvent.getTaskAttempt().getId() + " is canceled from " + this.getClass().getSimpleName()); |
| ((TaskAttemptToSchedulerEvent) event).getTaskAttempt().handle( |
| new TaskAttemptEvent(castedEvent.getTaskAttempt().getId(), TaskAttemptEventType.TA_SCHEDULE_CANCELED)); |
| } |
| } |
| |
| private Set<Integer> getWorkerIds(Collection<String> hosts){ |
| Set<Integer> workerIds = Sets.newHashSet(); |
| if(hosts.isEmpty()) return workerIds; |
| |
| for (WorkerConnectionInfo worker : stage.getContext().getWorkerMap().values()) { |
| if(hosts.contains(worker.getHost())){ |
| workerIds.add(worker.getId()); |
| } |
| } |
| return workerIds; |
| } |
| |
| |
| protected LinkedList<TaskRequestEvent> createTaskRequest(final int incompleteTaskNum) throws Exception { |
| LinkedList<TaskRequestEvent> taskRequestEvents = new LinkedList<TaskRequestEvent>(); |
| |
| //If scheduled tasks is long-term task, cluster resource can be the worst load balance. |
| //This part is to throttle the maximum required container per request |
| int requestContainerNum = Math.min(incompleteTaskNum, maximumRequestContainer); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Try to schedule task resources: " + requestContainerNum); |
| } |
| |
| ServiceTracker serviceTracker = |
| context.getMasterContext().getQueryMasterContext().getWorkerContext().getServiceTracker(); |
| NettyClientBase tmClient = RpcClientManager.getInstance(). |
| getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true, rpcParams); |
| QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); |
| |
| CallFuture<NodeResourceResponse> callBack = new CallFuture<NodeResourceResponse>(); |
| NodeResourceRequest.Builder request = NodeResourceRequest.newBuilder(); |
| request.setCapacity(NodeResources.createResource(minTaskMemory, isLeaf ? 1 : 0).getProto()) |
| .setNumContainers(requestContainerNum) |
| .setPriority(stage.getPriority()) |
| .setQueryId(context.getMasterContext().getQueryId().getProto()) |
| .setType(isLeaf ? ResourceType.LEAF : ResourceType.INTERMEDIATE) |
| .setUserId(context.getMasterContext().getQueryContext().getUser()) |
| .setRunningTasks(stage.getTotalScheduledObjectsCount() - stage.getCompletedTaskCount()) |
| .addAllCandidateNodes(candidateWorkers) |
| .setQueue(context.getMasterContext().getQueryContext().get("queue", "default")); //TODO set queue |
| |
| masterClientService.reserveNodeResources(callBack.getController(), request.build(), callBack); |
| NodeResourceResponse response = callBack.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); |
| |
| for (AllocationResourceProto resource : response.getResourceList()) { |
| taskRequestEvents.add(new TaskRequestEvent(resource.getWorkerId(), resource, context.getBlockId())); |
| } |
| |
| return taskRequestEvents; |
| } |
| |
| @Override |
| public int remainingScheduledObjectNum() { |
| return scheduledObjectNum; |
| } |
| |
| public void releaseTaskAttempt(TaskAttempt taskAttempt) { |
| if (taskAttempt.isLeafTask() && taskAttempt.getWorkerConnectionInfo() != null) { |
| |
| HostVolumeMapping mapping = |
| scheduledRequests.leafTaskHostMapping.get(taskAttempt.getWorkerConnectionInfo().getHost()); |
| if (mapping != null && mapping.lastAssignedVolumeId.containsKey(taskAttempt.getId())) { |
| mapping.decreaseConcurrency(mapping.lastAssignedVolumeId.remove(taskAttempt.getId())); |
| } |
| } |
| } |
| /** |
| * One worker can have multiple running task runners. <code>HostVolumeMapping</code> |
| * describes various information for one worker, including : |
| * <ul> |
| * <li>host name</li> |
| * <li>rack name</li> |
| * <li>unassigned tasks for each disk volume</li> |
| * <li>last assigned volume id - it can be used for assigning task in a round-robin manner</li> |
| * <li>the number of running tasks for each volume</li> |
| * </ul>, each task runner and the concurrency number of running tasks for volumes. |
| * |
| * Here, we identifier a task runner by {@link ContainerId}, and we use volume ids to identify |
| * all disks in this node. Actually, each volume is only used to distinguish disks, and we don't |
| * know a certain volume id indicates a certain disk. If you want to know volume id, please read the below section. |
| * |
| * <h3>Volume id</h3> |
| * Volume id is an integer. Each volume id identifies each disk volume. |
| * |
| * This volume id can be obtained from org.apache.hadoop.fs.BlockStorageLocation#getVolumeIds()}. * |
| * HDFS cannot give any volume id due to unknown reason and disabled config 'dfs.client.file-block-locations.enabled'. |
| * In this case, the volume id will be -1 or other native integer. |
| * |
| * <h3>See Also</h3> |
| * <ul> |
| * <li>HDFS-3672 (https://issues.apache.org/jira/browse/HDFS-3672).</li> |
| * </ul> |
| */ |
| public class HostVolumeMapping { |
| private final String host; |
| private final String rack; |
| /** A key is disk volume, and a value is a list of tasks to be scheduled. */ |
| private Map<Integer, LinkedHashSet<TaskAttempt>> unassignedTaskForEachVolume = |
| Collections.synchronizedMap(new HashMap<Integer, LinkedHashSet<TaskAttempt>>()); |
| /** A value is last assigned volume id for each task runner */ |
| private HashMap<TaskAttemptId, Integer> lastAssignedVolumeId = Maps.newHashMap(); |
| /** |
| * A key is disk volume id, and a value is the load of this volume. |
| * This load is measured by counting how many number of tasks are running. |
| * |
| * These disk volumes are kept in an order of ascending order of the volume id. |
| * In other words, the head volume ids are likely to -1, meaning no given volume id. |
| */ |
| private SortedMap<Integer, Integer> diskVolumeLoads = new TreeMap<Integer, Integer>(); |
| /** The total number of remain tasks in this host */ |
| private AtomicInteger remainTasksNum = new AtomicInteger(0); |
| public static final int REMOTE = -2; |
| |
| |
| public HostVolumeMapping(String host, String rack){ |
| this.host = host; |
| this.rack = rack; |
| } |
| |
| public synchronized void addTaskAttempt(int volumeId, TaskAttempt attemptId){ |
| synchronized (unassignedTaskForEachVolume){ |
| LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId); |
| if (list == null) { |
| list = new LinkedHashSet<TaskAttempt>(); |
| unassignedTaskForEachVolume.put(volumeId, list); |
| } |
| list.add(attemptId); |
| } |
| |
| remainTasksNum.incrementAndGet(); |
| |
| if(!diskVolumeLoads.containsKey(volumeId)) diskVolumeLoads.put(volumeId, 0); |
| } |
| |
| /** |
| * Priorities |
| * 1. a task list in a volume of host |
| * 2. unknown block or Non-splittable task in host |
| * 3. remote tasks. unassignedTaskForEachVolume is only contained local task. so it will be null |
| */ |
| public synchronized TaskAttemptId getLocalTask() { |
| int volumeId = getLowestVolumeId(); |
| TaskAttemptId taskAttemptId = null; |
| |
| if (unassignedTaskForEachVolume.size() > 0) { |
| int retry = unassignedTaskForEachVolume.size(); |
| do { |
| //clean and get a remaining local task |
| taskAttemptId = getAndRemove(volumeId); |
| |
| if (taskAttemptId == null) { |
| //reassign next volume |
| volumeId = getLowestVolumeId(); |
| retry--; |
| } else { |
| lastAssignedVolumeId.put(taskAttemptId, volumeId); |
| break; |
| } |
| } while (retry > 0); |
| } else { |
| this.remainTasksNum.set(0); |
| } |
| |
| return taskAttemptId; |
| } |
| |
| public synchronized TaskAttemptId getTaskAttemptIdByRack(String rack) { |
| TaskAttemptId taskAttemptId = null; |
| |
| if (unassignedTaskForEachVolume.size() > 0 && this.rack.equals(rack)) { |
| int retry = unassignedTaskForEachVolume.size(); |
| do { |
| //clean and get a remaining task |
| int volumeId = getLowestVolumeId(); |
| taskAttemptId = getAndRemove(volumeId); |
| if (taskAttemptId == null) { |
| retry--; |
| } else { |
| break; |
| } |
| } while (retry > 0); |
| } |
| return taskAttemptId; |
| } |
| |
| private synchronized TaskAttemptId getAndRemove(int volumeId){ |
| TaskAttemptId taskAttemptId = null; |
| if(!unassignedTaskForEachVolume.containsKey(volumeId)) { |
| if (volumeId > REMOTE) { |
| diskVolumeLoads.remove(volumeId); |
| } |
| return taskAttemptId; |
| } |
| |
| LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId); |
| if (list != null && !list.isEmpty()) { |
| TaskAttempt taskAttempt; |
| synchronized (unassignedTaskForEachVolume) { |
| Iterator<TaskAttempt> iterator = list.iterator(); |
| taskAttempt = iterator.next(); |
| iterator.remove(); |
| } |
| |
| taskAttemptId = taskAttempt.getId(); |
| for (DataLocation location : taskAttempt.getTask().getDataLocations()) { |
| HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost()); |
| if (volumeMapping != null) { |
| volumeMapping.removeTaskAttempt(location.getVolumeId(), taskAttempt); |
| } |
| } |
| |
| increaseConcurrency(volumeId); |
| } |
| |
| return taskAttemptId; |
| } |
| |
| private synchronized void removeTaskAttempt(int volumeId, TaskAttempt taskAttempt){ |
| if(!unassignedTaskForEachVolume.containsKey(volumeId)) return; |
| |
| LinkedHashSet<TaskAttempt> tasks = unassignedTaskForEachVolume.get(volumeId); |
| if(tasks.remove(taskAttempt)) { |
| remainTasksNum.getAndDecrement(); |
| } |
| |
| if(tasks.isEmpty()){ |
| unassignedTaskForEachVolume.remove(volumeId); |
| if (volumeId > REMOTE) { |
| diskVolumeLoads.remove(volumeId); |
| } |
| } |
| } |
| |
| /** |
| * Increase the count of running tasks and disk loads for a certain task runner. |
| * |
| * @param volumeId Volume identifier |
| * @return the volume load (i.e., how many running tasks use this volume) |
| */ |
| private synchronized int increaseConcurrency(int volumeId) { |
| |
| int concurrency = 1; |
| if (diskVolumeLoads.containsKey(volumeId)) { |
| concurrency = diskVolumeLoads.get(volumeId) + 1; |
| } |
| |
| if (volumeId > -1) { |
| LOG.info("Assigned host : " + host + ", Volume : " + volumeId + ", Concurrency : " + concurrency); |
| } else if (volumeId == -1) { |
| // this case is disabled namenode block meta or compressed text file or amazon s3 |
| LOG.info("Assigned host : " + host + ", Unknown Volume : " + volumeId + ", Concurrency : " + concurrency); |
| } else if (volumeId == REMOTE) { |
| // this case has processed all block on host and it will be assigned to remote |
| LOG.info("Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize() |
| + ", Remote Concurrency : " + concurrency); |
| } |
| diskVolumeLoads.put(volumeId, concurrency); |
| return concurrency; |
| } |
| |
| /** |
| * Decrease the count of running tasks of a certain task runner |
| */ |
| private synchronized void decreaseConcurrency(int volumeId){ |
| if(diskVolumeLoads.containsKey(volumeId)){ |
| Integer concurrency = diskVolumeLoads.get(volumeId); |
| if(concurrency > 0){ |
| diskVolumeLoads.put(volumeId, concurrency - 1); |
| } else { |
| if (volumeId > REMOTE && !unassignedTaskForEachVolume.containsKey(volumeId)) { |
| diskVolumeLoads.remove(volumeId); |
| } |
| } |
| } |
| } |
| |
| /** |
| * volume of a host : 0 ~ n |
| * compressed task, amazon s3, unKnown volume : -1 |
| * remote task : -2 |
| */ |
| public int getLowestVolumeId(){ |
| Map.Entry<Integer, Integer> volumeEntry = null; |
| |
| for (Map.Entry<Integer, Integer> entry : diskVolumeLoads.entrySet()) { |
| if(volumeEntry == null) volumeEntry = entry; |
| |
| if (volumeEntry.getValue() >= entry.getValue()) { |
| volumeEntry = entry; |
| } |
| } |
| |
| if(volumeEntry != null){ |
| return volumeEntry.getKey(); |
| } else { |
| return REMOTE; |
| } |
| } |
| |
| public int getRemoteConcurrency(){ |
| return getVolumeConcurrency(REMOTE); |
| } |
| |
| public int getVolumeConcurrency(int volumeId){ |
| Integer size = diskVolumeLoads.get(volumeId); |
| if(size == null) return 0; |
| else return size; |
| } |
| |
| public int getRemainingLocalTaskSize(){ |
| return remainTasksNum.get(); |
| } |
| |
| public String getHost() { |
| return host; |
| } |
| |
| public String getRack() { |
| return rack; |
| } |
| } |
| |
| public void cancel(TaskAttempt taskAttempt) { |
| |
| if(taskAttempt.isLeafTask()) { |
| releaseTaskAttempt(taskAttempt); |
| |
| List<DataLocation> locations = taskAttempt.getTask().getDataLocations(); |
| |
| for (DataLocation location : locations) { |
| HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost()); |
| volumeMapping.addTaskAttempt(location.getVolumeId(), taskAttempt); |
| } |
| |
| scheduledRequests.leafTasks.add(taskAttempt.getId()); |
| } else { |
| scheduledRequests.nonLeafTasks.add(taskAttempt.getId()); |
| } |
| |
| context.getMasterContext().getEventHandler().handle( |
| new TaskAttemptEvent(taskAttempt.getId(), TaskAttemptEventType.TA_ASSIGN_CANCEL)); |
| } |
| |
| private class ScheduledRequests { |
| // two list leafTasks and nonLeafTasks keep all tasks to be scheduled. Even though some task is included in |
| // leafTaskHostMapping or leafTasksRackMapping, some task T will not be sent to a task runner |
| // if the task is not included in leafTasks and nonLeafTasks. |
| private final Set<TaskAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<TaskAttemptId>()); |
| private final Set<TaskAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<TaskAttemptId>()); |
| private Map<String, HostVolumeMapping> leafTaskHostMapping = Maps.newConcurrentMap(); |
| private final Map<String, HashSet<TaskAttemptId>> leafTasksRackMapping = Maps.newConcurrentMap(); |
| |
| protected void clear() { |
| leafTasks.clear(); |
| nonLeafTasks.clear(); |
| leafTaskHostMapping.clear(); |
| leafTasksRackMapping.clear(); |
| } |
| |
| private void addLeafTask(TaskAttemptToSchedulerEvent event) { |
| TaskAttempt taskAttempt = event.getTaskAttempt(); |
| List<DataLocation> locations = taskAttempt.getTask().getDataLocations(); |
| |
| for (DataLocation location : locations) { |
| String host = location.getHost(); |
| leafTaskHosts.add(host); |
| |
| HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host); |
| if (hostVolumeMapping == null) { |
| String rack = RackResolver.resolve(host).getNetworkLocation(); |
| hostVolumeMapping = new HostVolumeMapping(host, rack); |
| leafTaskHostMapping.put(host, hostVolumeMapping); |
| } |
| hostVolumeMapping.addTaskAttempt(location.getVolumeId(), taskAttempt); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Added attempt req to host " + host); |
| } |
| |
| HashSet<TaskAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack()); |
| if (list == null) { |
| list = new HashSet<TaskAttemptId>(); |
| leafTasksRackMapping.put(hostVolumeMapping.getRack(), list); |
| } |
| |
| list.add(taskAttempt.getId()); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Added attempt req to rack " + hostVolumeMapping.getRack()); |
| } |
| } |
| |
| leafTasks.add(taskAttempt.getId()); |
| } |
| |
| private void addNonLeafTask(TaskAttemptToSchedulerEvent event) { |
| nonLeafTasks.add(event.getTaskAttempt().getId()); |
| } |
| |
| public int leafTaskNum() { |
| return leafTasks.size(); |
| } |
| |
| public int nonLeafTaskNum() { |
| return nonLeafTasks.size(); |
| } |
| |
| private TaskAttemptId allocateLocalTask(String host){ |
| HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host); |
| |
| if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode |
| for (int i = 0; i < hostVolumeMapping.getRemainingLocalTaskSize(); i++) { |
| TaskAttemptId attemptId = hostVolumeMapping.getLocalTask(); |
| |
| if(attemptId == null) break; |
| //find remaining local task |
| if (leafTasks.contains(attemptId)) { |
| leafTasks.remove(attemptId); |
| return attemptId; |
| } |
| } |
| } |
| return null; |
| } |
| |
| private TaskAttemptId allocateRackTask(String host) { |
| |
| List<HostVolumeMapping> remainingTasks = Lists.newArrayList(leafTaskHostMapping.values()); |
| String rack = RackResolver.resolve(host).getNetworkLocation(); |
| TaskAttemptId attemptId = null; |
| |
| if (remainingTasks.size() > 0) { |
| synchronized (scheduledRequests) { |
| //find largest remaining task of other host in rack |
| Collections.sort(remainingTasks, new Comparator<HostVolumeMapping>() { |
| @Override |
| public int compare(HostVolumeMapping v1, HostVolumeMapping v2) { |
| // descending remaining tasks |
| if (v2.remainTasksNum.get() > v1.remainTasksNum.get()) { |
| return 1; |
| } else if (v2.remainTasksNum.get() == v1.remainTasksNum.get()) { |
| return 0; |
| } else { |
| return -1; |
| } |
| } |
| }); |
| } |
| |
| for (HostVolumeMapping tasks : remainingTasks) { |
| for (int i = 0; i < tasks.getRemainingLocalTaskSize(); i++) { |
| TaskAttemptId tId = tasks.getTaskAttemptIdByRack(rack); |
| |
| if (tId == null) break; |
| |
| if (leafTasks.contains(tId)) { |
| leafTasks.remove(tId); |
| attemptId = tId; |
| break; |
| } |
| } |
| if(attemptId != null) break; |
| } |
| } |
| |
| //find task in rack |
| if (attemptId == null) { |
| HashSet<TaskAttemptId> list = leafTasksRackMapping.get(rack); |
| if (list != null) { |
| synchronized (list) { |
| Iterator<TaskAttemptId> iterator = list.iterator(); |
| while (iterator.hasNext()) { |
| TaskAttemptId tId = iterator.next(); |
| iterator.remove(); |
| if (leafTasks.contains(tId)) { |
| leafTasks.remove(tId); |
| attemptId = tId; |
| break; |
| } |
| } |
| } |
| } |
| } |
| |
| return attemptId; |
| } |
| |
| public void assignToLeafTasks(LinkedList<TaskRequestEvent> taskRequests) { |
| Collections.shuffle(taskRequests); |
| LinkedList<TaskRequestEvent> remoteTaskRequests = new LinkedList<TaskRequestEvent>(); |
| String queryMasterHostAndPort = context.getMasterContext().getQueryMasterContext().getWorkerContext(). |
| getConnectionInfo().getHostAndQMPort(); |
| |
| TaskRequestEvent taskRequest; |
| while (leafTasks.size() > 0 && (!taskRequests.isEmpty() || !remoteTaskRequests.isEmpty())) { |
| int localAssign = 0; |
| int rackAssign = 0; |
| |
| taskRequest = taskRequests.pollFirst(); |
| if(taskRequest == null) { // if there are only remote task requests |
| taskRequest = remoteTaskRequests.pollFirst(); |
| } |
| |
| // checking if this container is still alive. |
| // If not, ignore the task request and stop the task runner |
| WorkerConnectionInfo connectionInfo = context.getMasterContext().getWorkerMap().get(taskRequest.getWorkerId()); |
| if(connectionInfo == null) continue; |
| |
| // getting the hostname of requested node |
| String host = connectionInfo.getHost(); |
| |
| // if there are no worker matched to the hostname a task request |
| if (!leafTaskHostMapping.containsKey(host) && !taskRequests.isEmpty()) { |
| String normalizedHost = NetUtils.normalizeHost(host); |
| |
| if (!leafTaskHostMapping.containsKey(normalizedHost)) { |
| // this case means one of either cases: |
| // * there are no blocks which reside in this node. |
| // * all blocks which reside in this node are consumed, and this task runner requests a remote task. |
| // In this case, we transfer the task request to the remote task request list, and skip the followings. |
| remoteTaskRequests.add(taskRequest); |
| continue; |
| } else { |
| host = normalizedHost; |
| } |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," + |
| "worker=" + connectionInfo.getHostAndPeerRpcPort()); |
| } |
| |
| ////////////////////////////////////////////////////////////////////// |
| // disk or host-local allocation |
| ////////////////////////////////////////////////////////////////////// |
| TaskAttemptId attemptId = allocateLocalTask(host); |
| |
| if (attemptId == null) { // if a local task cannot be found |
| HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host); |
| |
| if(!taskRequests.isEmpty()) { //if other requests remains, move to remote list for better locality |
| remoteTaskRequests.add(taskRequest); |
| candidateWorkers.remove(connectionInfo.getId()); |
| continue; |
| |
| } else { |
| if(hostVolumeMapping != null) { |
| int nodes = context.getMasterContext().getWorkerMap().size(); |
| //this part is to control the assignment of tail and remote task balancing per node |
| int tailLimit = 1; |
| if (remainingScheduledObjectNum() > 0 && nodes > 0) { |
| tailLimit = Math.max(remainingScheduledObjectNum() / nodes, 1); |
| } |
| |
| if (hostVolumeMapping.getRemoteConcurrency() >= tailLimit) { //remote task throttling per node |
| continue; |
| } else { |
| // assign to remote volume |
| hostVolumeMapping.increaseConcurrency(HostVolumeMapping.REMOTE); |
| } |
| } |
| } |
| |
| ////////////////////////////////////////////////////////////////////// |
| // rack-local allocation |
| ////////////////////////////////////////////////////////////////////// |
| attemptId = allocateRackTask(host); |
| |
| ////////////////////////////////////////////////////////////////////// |
| // random node allocation |
| ////////////////////////////////////////////////////////////////////// |
| if (attemptId == null && leafTaskNum() > 0) { |
| synchronized (leafTasks){ |
| attemptId = leafTasks.iterator().next(); |
| leafTasks.remove(attemptId); |
| } |
| } |
| |
| if (attemptId != null && hostVolumeMapping != null) { |
| hostVolumeMapping.lastAssignedVolumeId.put(attemptId, HostVolumeMapping.REMOTE); |
| } |
| rackAssign++; |
| } else { |
| localAssign++; |
| } |
| |
| if (attemptId != null) { |
| Task task = stage.getTask(attemptId.getTaskId()); |
| TaskRequest taskAssign = new TaskRequestImpl( |
| attemptId, |
| new ArrayList<FragmentProto>(task.getAllFragments()), |
| "", |
| false, |
| LogicalNodeSerializer.serialize(task.getLogicalPlan()), |
| context.getMasterContext().getQueryContext(), |
| stage.getDataChannel(), stage.getBlock().getEnforcer(), |
| queryMasterHostAndPort); |
| |
| if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) { |
| taskAssign.setInterQuery(); |
| } |
| |
| //TODO send batch request |
| BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder(); |
| requestProto.addTaskRequest(TaskAllocationProto.newBuilder() |
| .setResource(taskRequest.getResponseProto().getResource()) |
| .setTaskRequest(taskAssign.getProto()).build()); |
| |
| requestProto.setExecutionBlockId(attemptId.getTaskId().getExecutionBlockId().getProto()); |
| context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId, connectionInfo)); |
| |
| InetSocketAddress addr = stage.getAssignedWorkerMap().get(connectionInfo.getId()); |
| if (addr == null) addr = new InetSocketAddress(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()); |
| |
| AsyncRpcClient tajoWorkerRpc = null; |
| CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>(); |
| totalAttempts++; |
| try { |
| tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true, |
| rpcParams); |
| |
| TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); |
| tajoWorkerRpcClient.allocateTasks(callFuture.getController(), requestProto.build(), callFuture); |
| |
| BatchAllocationResponse responseProto = |
| callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); |
| |
| if (responseProto.getCancellationTaskCount() > 0) { |
| for (TaskAllocationProto proto : responseProto.getCancellationTaskList()) { |
| cancel(task.getAttempt(new TaskAttemptId(proto.getTaskRequest().getId()))); |
| cancellation++; |
| } |
| |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Canceled requests: " + responseProto.getCancellationTaskCount() + " from " + addr); |
| } |
| continue; |
| } |
| } catch (Exception e) { |
| LOG.error(e); |
| } |
| scheduledObjectNum--; |
| totalAssigned++; |
| hostLocalAssigned += localAssign; |
| rackLocalAssigned += rackAssign; |
| |
| if (rackAssign > 0) { |
| LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), " + |
| "Attempted Cancel/Assign/Total: (%d/%d/%d), " + |
| "Locality: %.2f%%, Rack host: %s", |
| hostLocalAssigned, rackLocalAssigned, totalAssigned, |
| cancellation, totalAssigned, totalAttempts, |
| ((double) hostLocalAssigned / (double) totalAssigned) * 100, host)); |
| } |
| |
| } else { |
| throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!"); |
| } |
| } |
| } |
| |
| private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) { |
| if (masterPlan.isRoot(block)) { |
| return false; |
| } |
| |
| ExecutionBlock parent = masterPlan.getParent(block); |
| if (masterPlan.isRoot(parent) && parent.isUnionOnly()) { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| public void assignToNonLeafTasks(LinkedList<TaskRequestEvent> taskRequests) { |
| Collections.shuffle(taskRequests); |
| String queryMasterHostAndPort = context.getMasterContext().getQueryMasterContext().getWorkerContext(). |
| getConnectionInfo().getHostAndQMPort(); |
| |
| TaskRequestEvent taskRequest; |
| while (!taskRequests.isEmpty()) { |
| taskRequest = taskRequests.pollFirst(); |
| LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId()); |
| |
| TaskAttemptId attemptId; |
| // random allocation |
| if (nonLeafTasks.size() > 0) { |
| synchronized (nonLeafTasks){ |
| attemptId = nonLeafTasks.iterator().next(); |
| nonLeafTasks.remove(attemptId); |
| } |
| LOG.debug("Assigned based on * match"); |
| |
| Task task; |
| task = stage.getTask(attemptId.getTaskId()); |
| |
| TaskRequest taskAssign = new TaskRequestImpl( |
| attemptId, |
| Lists.newArrayList(task.getAllFragments()), |
| "", |
| false, |
| LogicalNodeSerializer.serialize(task.getLogicalPlan()), |
| context.getMasterContext().getQueryContext(), |
| stage.getDataChannel(), |
| stage.getBlock().getEnforcer(), |
| queryMasterHostAndPort); |
| |
| if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) { |
| taskAssign.setInterQuery(); |
| } |
| for(Map.Entry<String, Set<FetchImpl>> entry: task.getFetchMap().entrySet()) { |
| Collection<FetchImpl> fetches = entry.getValue(); |
| if (fetches != null) { |
| for (FetchImpl fetch : fetches) { |
| taskAssign.addFetch(entry.getKey(), fetch); |
| } |
| } |
| } |
| |
| WorkerConnectionInfo connectionInfo = |
| context.getMasterContext().getWorkerMap().get(taskRequest.getWorkerId()); |
| |
| //TODO send batch request |
| BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder(); |
| requestProto.addTaskRequest(TaskAllocationProto.newBuilder() |
| .setResource(taskRequest.getResponseProto().getResource()) |
| .setTaskRequest(taskAssign.getProto()).build()); |
| |
| requestProto.setExecutionBlockId(attemptId.getTaskId().getExecutionBlockId().getProto()); |
| context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId, connectionInfo)); |
| |
| CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>(); |
| |
| InetSocketAddress addr = stage.getAssignedWorkerMap().get(connectionInfo.getId()); |
| if (addr == null) addr = new InetSocketAddress(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()); |
| |
| AsyncRpcClient tajoWorkerRpc; |
| try { |
| tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true, |
| rpcParams); |
| TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); |
| tajoWorkerRpcClient.allocateTasks(callFuture.getController(), requestProto.build(), callFuture); |
| |
| BatchAllocationResponse |
| responseProto = callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); |
| |
| if(responseProto.getCancellationTaskCount() > 0) { |
| for (TaskAllocationProto proto : responseProto.getCancellationTaskList()) { |
| cancel(task.getAttempt(new TaskAttemptId(proto.getTaskRequest().getId()))); |
| cancellation++; |
| } |
| |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Canceled requests: " + responseProto.getCancellationTaskCount() + " from " + addr); |
| } |
| continue; |
| } |
| |
| totalAssigned++; |
| scheduledObjectNum--; |
| } catch (Exception e) { |
| LOG.error(e); |
| } |
| } |
| } |
| } |
| } |
| } |