blob: a3599724004015c0022611d19ae6866a7076dd65 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.schedule.default_assign;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.schedule.TopologyAssignContext;
import com.alibaba.jstorm.schedule.default_assign.Selector.ComponentNumSelector;
import com.alibaba.jstorm.schedule.default_assign.Selector.InputComponentNumSelector;
import com.alibaba.jstorm.schedule.default_assign.Selector.Selector;
import com.alibaba.jstorm.schedule.default_assign.Selector.TotalTaskNumSelector;
import com.alibaba.jstorm.utils.FailedAssignTopologyException;
public class TaskScheduler {
public static Logger LOG = LoggerFactory.getLogger(TaskScheduler.class);
private final TaskAssignContext taskContext;
private List<ResourceWorkerSlot> assignments = new ArrayList<ResourceWorkerSlot>();
private int workerNum;
/**
* For balance purpose, default scheduler is trying to assign the same number of tasks to a worker. e.g. There are 4 tasks and 3 available workers. Each
* worker will be assigned one task first. And then one worker is chosen for the last one.
*/
private int avgTaskNum;
private int leftTaskNum;
private Set<Integer> tasks;
private DefaultTopologyAssignContext context;
private Selector componentSelector;
private Selector inputComponentSelector;
private Selector totalTaskNumSelector;
public TaskScheduler(DefaultTopologyAssignContext context, Set<Integer> tasks, List<ResourceWorkerSlot> workers) {
this.tasks = tasks;
LOG.info("Tasks " + tasks + " is going to be assigned in workers " + workers);
this.context = context;
this.taskContext =
new TaskAssignContext(this.buildSupervisorToWorker(workers), Common.buildSpoutOutoputAndBoltInputMap(context), context.getTaskToComponent());
this.componentSelector = new ComponentNumSelector(taskContext);
this.inputComponentSelector = new InputComponentNumSelector(taskContext);
this.totalTaskNumSelector = new TotalTaskNumSelector(taskContext);
if (tasks.size() == 0)
return;
if (context.getAssignType() != TopologyAssignContext.ASSIGN_TYPE_REBALANCE || context.isReassign() != false){
// warning ! it doesn't consider HA TM now!!
if (context.getAssignSingleWorkerForTM() && tasks.contains(context.getTopologyMasterTaskId())) {
assignForTopologyMaster();
}
}
int taskNum = tasks.size();
Map<ResourceWorkerSlot, Integer> workerSlotIntegerMap = taskContext.getWorkerToTaskNum();
Set<ResourceWorkerSlot> preAssignWorkers = new HashSet<ResourceWorkerSlot>();
for (Entry<ResourceWorkerSlot, Integer> worker : workerSlotIntegerMap.entrySet()) {
if (worker.getValue() > 0) {
taskNum += worker.getValue();
preAssignWorkers.add(worker.getKey());
}
}
setTaskNum(taskNum, workerNum);
// Check the worker assignment status of pre-assigned workers, e.g user defined or old assignment workers.
// Remove the workers which have been assigned with enough workers.
for (ResourceWorkerSlot worker : preAssignWorkers) {
Set<ResourceWorkerSlot> doneWorkers = removeWorkerFromSrcPool(taskContext.getWorkerToTaskNum().get(worker), worker);
if (doneWorkers != null) {
for (ResourceWorkerSlot doneWorker : doneWorkers) {
taskNum -= doneWorker.getTasks().size();
workerNum--;
}
}
}
setTaskNum(taskNum, workerNum);
// For Scale-out case, the old assignment should be kept.
if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE && context.isReassign() == false) {
keepAssignment(taskNum, context.getOldAssignment().getWorkers());
}
}
private void keepAssignment(int taskNum, Set<ResourceWorkerSlot> keepAssignments) {
Set<Integer> keepTasks = new HashSet<Integer>();
ResourceWorkerSlot tmWorker = null;
for (ResourceWorkerSlot worker : keepAssignments) {
if (worker.getTasks().contains(context.getTopologyMasterTaskId()))
tmWorker = worker;
for (Integer taskId : worker.getTasks()) {
if (tasks.contains(taskId)) {
ResourceWorkerSlot contextWorker = taskContext.getWorker(worker);
if (contextWorker != null) {
if (tmWorker != null && tmWorker.getTasks().contains(taskId) && context.getAssignSingleWorkerForTM() ) {
if (context.getTopologyMasterTaskId() == taskId){
updateAssignedTasksOfWorker(taskId, contextWorker);
taskContext.getWorkerToTaskNum().remove(contextWorker);
contextWorker.getTasks().clear();
contextWorker.getTasks().add(taskId);
assignments.add(contextWorker);
tasks.remove(taskId);
taskNum--;
workerNum--;
LOG.info("assignForTopologyMaster: " + contextWorker);
}
}else {
String componentName = context.getTaskToComponent().get(taskId);
updateAssignedTasksOfWorker(taskId, contextWorker);
updateComponentsNumOfWorker(componentName, contextWorker);
keepTasks.add(taskId);
}
}
}
}
}
if ( tmWorker != null){
setTaskNum(taskNum, workerNum);
keepAssignments.remove(tmWorker);
}
// Try to find the workers which have been assigned too much tasks
// If found, remove the workers from worker resource pool and update
// the avgNum and leftNum
int doneAssignedTaskNum = 0;
while (true) {
boolean found = false;
Set<ResourceWorkerSlot> doneAssignedWorkers = new HashSet<ResourceWorkerSlot>();
for (ResourceWorkerSlot worker : keepAssignments) {
ResourceWorkerSlot contextWorker = taskContext.getWorker(worker);
if (contextWorker != null && isTaskFullForWorker(contextWorker)) {
found = true;
workerNum--;
taskContext.getWorkerToTaskNum().remove(contextWorker);
assignments.add(contextWorker);
doneAssignedWorkers.add(worker);
doneAssignedTaskNum += contextWorker.getTasks().size();
}
}
if (found) {
taskNum -= doneAssignedTaskNum;
setTaskNum(taskNum, workerNum);
keepAssignments.removeAll(doneAssignedWorkers);
} else {
break;
}
}
tasks.removeAll(keepTasks);
LOG.info("keep following assignment, " + assignments);
}
private boolean isTaskFullForWorker(ResourceWorkerSlot worker) {
boolean ret = false;
Set<Integer> tasks = worker.getTasks();
if (tasks != null) {
if ((leftTaskNum <= 0 && tasks.size() >= avgTaskNum) || (leftTaskNum > 0 && tasks.size() >= (avgTaskNum + 1))) {
ret = true;
}
}
return ret;
}
private Set<ResourceWorkerSlot> getRestAssignedWorkers() {
Set<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>();
for (ResourceWorkerSlot worker : taskContext.getWorkerToTaskNum().keySet()) {
if (worker.getTasks() != null && worker.getTasks().size() > 0) {
ret.add(worker);
}
}
return ret;
}
public List<ResourceWorkerSlot> assign() {
if (tasks.size() == 0) {
assignments.addAll(getRestAssignedWorkers());
return assignments;
}
// Firstly, assign workers to the components which are configured
// by "task.on.differ.node"
Set<Integer> assignedTasks = assignForDifferNodeTask();
// Assign for the tasks except system tasks
tasks.removeAll(assignedTasks);
Map<Integer, String> systemTasks = new HashMap<Integer, String>();
for (Integer task : tasks) {
String name = context.getTaskToComponent().get(task);
if (Common.isSystemComponent(name)) {
systemTasks.put(task, name);
continue;
}
assignForTask(name, task);
}
/*
* At last, make the assignment for system component, e.g. acker, topology master...
*/
for (Entry<Integer, String> entry : systemTasks.entrySet()) {
assignForTask(entry.getValue(), entry.getKey());
}
assignments.addAll(getRestAssignedWorkers());
return assignments;
}
private void assignForTopologyMaster() {
int taskId = context.getTopologyMasterTaskId();
// Try to find a worker which is in a supervisor with most workers,
// to avoid the balance problem when the assignment for other workers.
ResourceWorkerSlot workerAssigned = null;
int workerNumOfSuperv = 0;
for (ResourceWorkerSlot workerSlot : taskContext.getWorkerToTaskNum().keySet()){
List<ResourceWorkerSlot> workers = taskContext.getSupervisorToWorker().get(workerSlot.getNodeId());
if (workers != null && workers.size() > workerNumOfSuperv) {
for (ResourceWorkerSlot worker : workers) {
Set<Integer> tasks = worker.getTasks();
if (tasks == null || tasks.size() == 0) {
workerAssigned = worker;
workerNumOfSuperv = workers.size();
break;
}
}
}
}
if (workerAssigned == null)
throw new FailedAssignTopologyException("there's no enough workers for the assignment of topology master");
updateAssignedTasksOfWorker(taskId, workerAssigned);
taskContext.getWorkerToTaskNum().remove(workerAssigned);
assignments.add(workerAssigned);
tasks.remove(taskId);
workerNum--;
LOG.info("assignForTopologyMaster, assignments=" + assignments);
}
private void assignForTask(String name, Integer task) {
ResourceWorkerSlot worker = chooseWorker(name, new ArrayList<ResourceWorkerSlot>(taskContext.getWorkerToTaskNum().keySet()));
pushTaskToWorker(task, name, worker);
}
private Set<Integer> assignForDifferNodeTask() {
Set<Integer> ret = new HashSet<Integer>();
for (Integer task : tasks) {
Map conf = Common.getComponentMap(context, task);
if (ConfigExtension.isTaskOnDifferentNode(conf))
ret.add(task);
}
for (Integer task : ret) {
String name = context.getTaskToComponent().get(task);
ResourceWorkerSlot worker = chooseWorker(name, getDifferNodeTaskWokers(name));
LOG.info("Due to task.on.differ.node, push task-{} to {}:{}", task, worker.getHostname(), worker.getPort());
pushTaskToWorker(task, name, worker);
}
return ret;
}
private Map<String, List<ResourceWorkerSlot>> buildSupervisorToWorker(List<ResourceWorkerSlot> workers) {
Map<String, List<ResourceWorkerSlot>> supervisorToWorker = new HashMap<String, List<ResourceWorkerSlot>>();
for (ResourceWorkerSlot worker : workers) {
List<ResourceWorkerSlot> supervisor = supervisorToWorker.get(worker.getNodeId());
if (supervisor == null) {
supervisor = new ArrayList<ResourceWorkerSlot>();
supervisorToWorker.put(worker.getNodeId(), supervisor);
}
supervisor.add(worker);
}
this.workerNum = workers.size();
return supervisorToWorker;
}
private ResourceWorkerSlot chooseWorker(String name, List<ResourceWorkerSlot> workers) {
List<ResourceWorkerSlot> result = componentSelector.select(workers, name);
result = totalTaskNumSelector.select(result, name);
if (Common.isSystemComponent(name))
return result.iterator().next();
result = inputComponentSelector.select(result, name);
return result.iterator().next();
}
private void pushTaskToWorker(Integer task, String name, ResourceWorkerSlot worker) {
LOG.debug("Push task-" + task + " to worker-" + worker.getPort());
int taskNum = updateAssignedTasksOfWorker(task, worker);
removeWorkerFromSrcPool(taskNum, worker);
updateComponentsNumOfWorker(name, worker);
}
private int updateAssignedTasksOfWorker(Integer task, ResourceWorkerSlot worker) {
int ret = 0;
Set<Integer> tasks = worker.getTasks();
if (tasks == null) {
tasks = new HashSet<Integer>();
worker.setTasks(tasks);
}
tasks.add(task);
ret = taskContext.getWorkerToTaskNum().get(worker);
taskContext.getWorkerToTaskNum().put(worker, ++ret);
return ret;
}
/*
* Remove the worker from source worker pool, if the worker is assigned with enough tasks,
*/
private Set<ResourceWorkerSlot> removeWorkerFromSrcPool(int taskNum, ResourceWorkerSlot worker) {
Set<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>();
if (leftTaskNum <= 0) {
if (taskNum >= avgTaskNum) {
taskContext.getWorkerToTaskNum().remove(worker);
assignments.add(worker);
ret.add(worker);
}
} else {
if (taskNum > avgTaskNum ) {
taskContext.getWorkerToTaskNum().remove(worker);
leftTaskNum = leftTaskNum -(taskNum -avgTaskNum);
assignments.add(worker);
ret.add(worker);
}
if (leftTaskNum <= 0) {
List<ResourceWorkerSlot> needDelete = new ArrayList<ResourceWorkerSlot>();
for (Entry<ResourceWorkerSlot, Integer> entry : taskContext.getWorkerToTaskNum().entrySet()) {
if (entry.getValue() == avgTaskNum)
needDelete.add(entry.getKey());
}
for (ResourceWorkerSlot workerToDelete : needDelete) {
taskContext.getWorkerToTaskNum().remove(workerToDelete);
assignments.add(workerToDelete);
ret.add(workerToDelete);
}
}
}
return ret;
}
private void updateComponentsNumOfWorker(String name, ResourceWorkerSlot worker) {
Map<String, Integer> components = taskContext.getWorkerToComponentNum().get(worker);
if (components == null) {
components = new HashMap<String, Integer>();
taskContext.getWorkerToComponentNum().put(worker, components);
}
Integer componentNum = components.get(name);
if (componentNum == null) {
componentNum = 0;
}
components.put(name, ++componentNum);
}
private void setTaskNum(int taskNum, int workerNum) {
if (taskNum >= 0 && workerNum > 0) {
this.avgTaskNum = taskNum / workerNum;
this.leftTaskNum = taskNum % workerNum;
LOG.debug("avgTaskNum=" + avgTaskNum + ", leftTaskNum=" + leftTaskNum);
} else {
LOG.debug("Illegal parameters, taskNum=" + taskNum + ", workerNum=" + workerNum);
}
}
private List<ResourceWorkerSlot> getDifferNodeTaskWokers(String name) {
List<ResourceWorkerSlot> workers = new ArrayList<ResourceWorkerSlot>();
workers.addAll(taskContext.getWorkerToTaskNum().keySet());
for (Entry<String, List<ResourceWorkerSlot>> entry : taskContext.getSupervisorToWorker().entrySet()) {
if (taskContext.getComponentNumOnSupervisor(entry.getKey(), name) != 0)
workers.removeAll(entry.getValue());
}
if (workers.size() == 0)
throw new FailedAssignTopologyException("there's no enough supervisor for making component: " + name + " 's tasks on different node");
return workers;
}
}